
    :j3                     .   d Z ddlZddlmZmZmZ ddlmZ ddlm	Z	m
Z
 ddlmZmZ ddlmZ ddlmZ dd	lmZmZ dd
lmZ ddlmZmZmZmZ ddlmZ ddlmZ ddl m!Z! ddl"m#Z#m$Z$m%Z% ddl&m'Z' ddl(m)Z) ddl*m+Z+ ddl,m-Z-  e'd      Z. G d deee+      Z/y)z
Dynamic pipeline system for asynchronous multi-provider task processing.
Implements producer-consumer pattern with configurable worker threads and dynamic stage management.
    N)DictListOptional)Config)configure_authget_auth_provider)PipelineStageSystemState)PipelineStatus)ProviderTask)IPipelineStats	IProvider)client)BasePipelineStageStageOutputStageResources
StageUtils)StageRegistryMixin)DependencyResolver)MultiResultManager)get_session	get_tokenget_user_agent)
get_logger)RateLimiter   )LifecycleManager)QueueManagermanagerc                       e Zd ZdZdedeeef   fdZde	e   fdZ
ddZdd	Zdd
ZdefdZdefdZdefdZdefdZde	e   ddfdZdedee   fdZddZde	e   fdZdeddfdZdedefdZy)Pipelinea  Dynamic pipeline coordinator with registry-based stage management

    Inherits from PipelineBase to provide type-safe statistics interface,
    from StageRegistryMixin for stage management capabilities,
    and from LifecycleManager for lifecycle management.
    config	providersc           
         t        j                  | d       || _        || _        t	        t
        t        t               t        |j                  j                  ||j                  j                  |j                  j                  |j                  j                  t        |j                  j                               | _        t%        |j&                        | _        t+        j,                  |j                  j.                         t+        j0                  |j&                         t3        |j                  j                  |j                  j4                  t        |j                  j                               | _        |j                  j                  s1	 | j"                  j9                  |j                  j:                         nt>        jC                  d       |jD                  D ci c]  }|jF                  s|jH                  | c}| _%        i | _&        tO        | jP                        | _)        | jU                          d | _+        | jY                          t[        jZ                         | _.        d| _/        t>        ja                  dtc        | jL                         d	te        | jL                  jg                                       y # t<        $ r#}t>        jA                  d|        Y d }~d }~ww xY wc c}w )
Nr!   )session_providertoken_provideruser_agent_provider)	workspacer#   
batch_sizesave_intervalsimpleshutdown_timeout)r(   r*   r,   z$Failed to start periodic snapshots: z*Skipping periodic snapshots in simple moder   z"Initialized dynamic pipeline with z	 stages: )4r   __init__r"   r#   r   r   r   r   r   global_configr(   persistencer)   r*   r+   floatr,   result_managerr   
ratelimitsrate_limiterr   	set_proxyproxyinit_github_clientr   queue_intervalqueue_managerstart_periodic_snapshotssnapshot_interval	Exceptionloggererrordebugtasksenablednametask_configsstagesr   registryresolver_create_stages_order_cache_init_order_cachetime
start_timeinitial_tasks_countinfolenlistkeys)selfr"   r#   etasks        7/root/.openclaw/workspace/harvester/manager/pipeline.pyr-   zPipeline.__init__(   sD   !!$
3/8 	Icqr 1**44))44 ,,::%%,,"6#5#5#F#FG
 ((9(9: 	--334 	!!&"3"34)**44 ,,;;"6#5#5#F#FG
 !!((I##<<V=O=O=a=ab LLEF :@VTYY_V 57*4==9 	 !  ))+#$ 8T[[9I8J)TXY]YdYdYiYiYkTlSmno/  ICA3GHHI Ws$   /K &K68K6	K3K..K3returnc           	      f   t               }| j                  j                  D ]j  }|j                  st	        j
                  |      }|j                  |       t        j                  d|j                   ddj                  |       d       l t        |      }t        j                  d|        |S )z3Aggregate stage requirements from all enabled tasksz  z: [z, ]zAggregated stages to create: )setr"   r?   r@   r   get_enabledupdater<   r>   rA   joinrN   rL   )rP   	requestedrR   r@   results        rS   _aggregate_stageszPipeline._aggregate_stagesg   s    E	 KK%% 	GD||$006  )r$))C		'0B/C1EF	G i3F8<=    Nc                    | j                         }|st        j                  d       y	 | j                  j	                  |      }t        | j                  | j                  | j                  | j                  t                     }| j                  j                  j                  }| j                  j                  j                  }|D ]  }| j!                  |      }|st        j                  d|        /	 |j#                  || j$                  t'        |j)                  |d      d      t'        |j)                  |d      d      | j                  j*                  j,                        }	|	| j.                  |<    y# t
        $ r}t        j                  d|         d}~ww xY w# t
        $ r!}t        j                  d	| d
|         d}~ww xY w)z;Create pipeline stages dynamically with hybrid architecturez+No stages requested, pipeline will be emptyNz&Failed to resolve stage dependencies: )limiterr#   r"   rB   authzStage definition not found: r   i  )	resourceshandlerthread_count
queue_sizemax_retrieszFailed to create stage z: )r]   r<   warningrE   resolve_orderr;   r=   r   r3   r#   r"   rB   r   pipelinethreadsqueue_sizesget_stage_defstage_class_handle_stage_outputmaxgetr.   max_retries_requeuedrC   )
rP   requested_stagesordered_stagesrQ   rb   thread_configqueue_configrA   
definitionstages
             rS   rF   zPipeline._create_stagesw   s     113NNHI	!]]889IJN #%%nn;;**"$
	 ,,44{{++77" 	D++D1J;D6BC"..' 55!$]%6%6tQ%?!C"<#3#3D$#?C $ 9 9 N N /  %*D!!	#  	LLA!EF	F  6tfBqcBCs0   E; 7BF%;	F"FF"%	G.G

Gc                 *   | j                   st        j                  d       y| j                         }|D ]0  }| j                   j	                  |      }|s!|j                          2 t        j                  dt        | j                          d       y)zStart all pipeline stageszNo stages to startNzStarted z pipeline stages)rC   r<   rg   	get_orderrp   startrL   rM   )rP   rs   
stage_namerw   s       rS   	_on_startzPipeline._on_start   sv    {{NN/0 )( 	JKKOOJ/E	
 	hs4;;/00@ABr^   c                    | j                   sy| j                         }| j                   rdt        | j                         z  nd}t        |      D ]1  }| j                   j	                  |      }|s!|j                  |       3 | j                  j                          | j                  j                          t        j                  d       y)zStop all pipeline stagesNg      >@zStopped all pipeline stages)rC   ry   rM   reversedrp   stopr8   r1   stop_allr<   rL   )rP   rs   stage_timeoutr{   rw   s        rS   _on_stopzPipeline._on_stop   s    {{ )37;;s4;;//D">2 	*JKKOOJ/E

=)	* 	!$$&12r^   c                 @   | j                   sy| j                         }d}|D ]y  }| j                   j                  |      }|s!|j                  r:| j	                  |      r)|j                          t        j                  d| d       |j                         rxd}{ |S )z5Check if pipeline is finished and manage stage statesT[z] stopped accepting new tasksF)	rC   ry   rp   	accepting_can_stage_stop_acceptingstop_acceptingr<   rL   is_finished)rP   rs   all_finishedr{   rw   s        rS   r   zPipeline.is_finished   s    {{) ) 	%JKKOOJ/E 4#A#A*#M$$&a
|+HIJ $$&$	% r^   c                 "    | j                         S )zGet statistics for all stages_get_pipeline_statusrP   s    rS   get_all_statszPipeline.get_all_stats       ((**r^   c                 "    | j                         S )z%Get dynamic statistics for all stagesr   r   s    rS   get_dynamic_statszPipeline.get_dynamic_stats   r   r^   c           	         i }| j                   j                         D ]  \  }}|j                         ||<    t        | j                   rt        j
                  nt        j                  t        | j                   j                         D cg c]  }|j                  s| c}      t        | j                         |t        j                         | j                  z
        }|S c c}w )z,Get pipeline status as PipelineStatus object)stateactivetotalrC   runtime)rC   items	get_statsr   r
   RUNNINGSTOPPEDrM   valuesrunningrI   rJ   )rP   stage_statusr{   rw   spipeline_statuss         rS   r   zPipeline._get_pipeline_status   s     "&!2!2!4 	9J','8L$	9 ))-+%%+:M:M4;;#5#5#7Ea199EFdkk"IIK$//1
  Fs   	C"C"initial_tasksc                 .   t        |      | _        | j                  j                  t        j
                  j                        }|r|D ]  }|j                  |        nt        j                  d       t        j                  dt        |       d       y)z$Add initial search tasks to pipelinez2Search stage not created, cannot add initial taskszAdded z initial tasks to pipelineN)rM   rK   rC   rp   r	   SEARCHvalueput_taskr<   rg   rL   )rP   r   search_stagerR   s       rS   add_initial_taskszPipeline.add_initial_tasks  sx    #&}#5 {{}';';'A'AB% ,%%d+, NNOPfS/00JKLr^   rA   c                 8    | j                   j                  |      S )zGet stage by name)rC   rp   )rP   rA   s     rS   	get_stagezPipeline.get_stage  s    {{t$$r^   c                     | j                   rd| j                  j                  t        | j                   j	                                     | _        t        j                  d| j
                          yy)z Initialize and cache stage orderzCached stage order: N)rC   rE   rh   rN   rO   rG   r<   r>   r   s    rS   rH   zPipeline._init_order_cache  sR    ;; $ ; ;DAQAQAS<T UDLL/0A0A/BCD r^   c                     | j                   | j                          | j                   r| j                   j                         S g S )zGet cached stage order)rG   rH   copyr   s    rS   ry   zPipeline.get_order  s<    $""$+/+<+<t  %%'D"Dr^   outputc                    |j                   D ]#  \  }}}| j                  j                  |||       % |j                  D ]!  \  }}| j                  j	                  ||       # |j
                  D ]#  \  }}}| j                  j                  |||       % |j                  D ]  \  }}	| j                  j                  |j                        }
|
rkt        j                  |
|	      rU| j                  j                  |	      }|r|j                  |       rt        j!                  d|	 d|j                          t        j#                  d|	 d|j                   d        y)z>Handle pure functional stage output - core orchestration logiczTarget stage z not found for task zStage z disabled for z, skipping taskN)resultsr1   
add_resultlinks	add_linksmodels
add_models	new_tasksrB   rp   providerr   checkrC   r   r<   rg   r>   )rP   r   r   result_typedatar   keyr   rR   target_stager"   rw   s               rS   rn   zPipeline._handle_stage_output  sJ    ,2>> 	H'Hk4**8[$G	H  &|| 	;OHe))(E:	; &,]] 	B!Hc6**8S&A	B #)"2"2 		bD,&&**4==9F***6<@5NN4(NN]<.@TUYUbUbTc#devl^>$--P_`a		br^   r{   c                    | j                   j                  |      }|sy|j                  j                         sy|j                  dkD  ryg }| j                   j                         D ]6  }| j                  |      }|s||j                  v s&|j                  |       8 |sy|D ][  }| j                   j                  |      }|s!|j                  j                         s y|j                  dkD  r y|j                  s[ y y)zICheck if a stage can stop accepting new tasks based on precise conditionsFr   T)
rC   rp   queueemptyactive_workersrO   rl   produces_forappendr   )rP   r{   rw   upstream_stagesother_stage_namerv   upstream_nameupstream_stages           rS   r   z"Pipeline._can_stage_stop_accepting:  s    
+ {{  " !#  $ 0 0 2 	9++,<=JjJ,C,CC&&'78	9  - 
	!M![[__];N%++113 !0014 !++ 
	! r^   )rT   N)__name__
__module____qualname____doc__r   r   strr   r-   r   r]   rF   r|   r   boolr   r   r   r   r   r   r   r   r   r   rH   ry   r   rn   r    r^   rS   r!   r!       s    =pv =p$sI~2F =p~49  2hC 3(T 2+~ ++> +n $MtL/A Md M%c %h/@&A %EE49 Eb; b4 b6,C ,D ,r^   r!   )0r   rI   typingr   r   r   config.schemasr   	core.authr   r   
core.enumsr	   r
   core.metricsr   core.modelsr   
core.typesr   r   searchr   
stage.baser   r   r   r   stage.registryr   stage.resolverr   storage.persistencer   tools.coordinatorr   r   r   tools.loggerr   tools.ratelimitr   baser   r   r   r<   r!   r   r^   rS   <module>r      si   
  ' ' ! 7 1 ' $ 0  Q Q - - 2 D D # ' " 	I	F~13C Fr^   