
    :jlW                        d Z ddlZddlZddlZddlmZmZ ddlmZ ddl	m
Z
mZ ddlmZmZmZmZmZmZmZmZmZ ddlmZmZmZ ddlmZ dd	lmZ dd
lmZ ddl m!Z! ddl"m#Z#m$Z$ ddl%m&Z& ddl'm(Z( ddl)m*Z*m+Z+  e&d      Z,e
 G d d             Z-e
 G d d             Z.ee.gdf   Z/e G d de             Z0 G d dee0      Z1 G d d      Z2y)zq
Base classes for pipeline stages.
Hybrid architecture with dependency injection and pure functional processing.
    N)ABCabstractmethod)deque)	dataclassfield)	AnyCallableDictListOptionalProtocolTupleUnionruntime_checkable)ConfigStageConfig
TaskConfig)DEFAULT_SHUTDOWN_TIMEOUT)PipelineStage)StageMetrics)ProviderTask)IAuthProvider	IProvider)
get_logger)RateLimiter)ExponentialBackoffRetryPolicystagec                   l    e Zd ZU dZeed<   eeef   ed<   e	ed<   eee
f   ed<   eed<   deded	efd
Zy)StageResourcesz7Resources injected into stages for dependency inversionlimiter	providersconfigtask_configsauthproviderr   returnc                 j    | j                   j                  |      }|syt        j                  ||      S )z&Check if stage is enabled for providerF)r$   get
StageUtilscheck)selfr&   r   r#   s       1/root/.openclaw/workspace/harvester/stage/base.py
is_enabledzStageResources.is_enabled1   s1    ""&&x0..    N)__name__
__module____qualname____doc__r   __annotations__r
   strr   r   r   r   boolr.    r/   r-   r    r    '   sN    ACN##NsJ''
/3 /s /t /r/   r    c                   @   e Zd ZU dZeed<    ee      Ze	e
eef      ed<    ee      Ze	e
eeef      ed<    ee      Ze	e
ee	e   f      ed<    ee      Ze	e
eee	e   f      ed<   deded	d
fdZdededed	d
fdZdede	e   d	d
fdZdedede	e   d	d
fdZy
)StageOutputz,Pure functional output from stage processingtask)default_factory	new_tasksresultslinksmodelstargetr'   Nc                 >    | j                   j                  ||f       y)zAdd new task to be routedN)r<   append)r,   r:   r@   s      r-   add_taskzStageOutput.add_taskC   s    tVn-r/   r&   result_typedatac                 @    | j                   j                  |||f       y)zAdd result to be savedN)r=   rB   )r,   r&   rD   rE   s       r-   
add_resultzStageOutput.add_resultG   s    X{D9:r/   c                 >    | j                   j                  ||f       y)zAdd links to be savedN)r>   rB   )r,   r&   r>   s      r-   	add_linkszStageOutput.add_linksK   s    

8U+,r/   keyc                 @    | j                   j                  |||f       y)zAdd models to be savedN)r?   rB   )r,   r&   rJ   r?   s       r-   
add_modelszStageOutput.add_modelsO   s    Hc623r/   )r0   r1   r2   r3   r   r4   r   listr<   r   r   r5   r=   r   r>   r?   rC   rG   rI   rL   r7   r/   r-   r9   r9   9   s   6
05d0KItE,+,-K*/*EGT%S#&'E).t)DE4c49n%&D/4T/JFDsCc*+,J.\ .3 .4 .;3 ;S ; ; ;-# -d3i -D -43 4S 4$s) 4 4r/   r9   c                   <    e Zd ZdZdedefdZdedefdZdefdZy)WorkerManageablez:Protocol for stages that support dynamic worker managementcountr'   c                      y)Adjust worker count for this stage

        Args:
            count: Target number of workers

        Returns:
            bool: True if adjustment was successful
        Nr7   r,   rP   s     r-   adjust_workerszWorkerManageable.adjust_workers\        	r/   c                      y)Set worker count for this stage

        Args:
            count: Target number of workers

        Returns:
            bool: True if setting was successful
        Nr7   rS   s     r-   set_worker_countz!WorkerManageable.set_worker_countg   rU   r/   c                      y)Get current number of workersNr7   r,   s    r-   get_worker_countz!WorkerManageable.get_worker_countr   s    r/   N)	r0   r1   r2   r3   intr6   rT   rX   r\   r7   r/   r-   rO   rO   X   s7    D	C 	D 		c 	d 	# r/   rO   c                      e Zd ZdZ	 	 	 	 	 d*dedededededed	ed
ee	   ddfdZ
d+dZefdeddfdZdedefdZdefdZd,dZdefdZdefdZdee   fdZdefdZd+dZdefdZdedefdZdedefdZdedee   fdZededefd       Z ededee   fd       Z!dedefd Z"ded!edefd"Z#ded#e$dee   fd$Z%ededefd%       Z&d+d&Z'defd'Z(dedefd(Z)dedefd)Z*y)-BasePipelineStagez?Base class for pipeline stages with hybrid architecture supportNname	resourceshandlerthread_count
queue_sizemax_retriesdedup_max_sizeretry_policyr'   c	                    || _         || _        || _        || _        t	        j
                  |      | _        t               | _        t               | _	        t        dt        |            | _        t        j                         | _        g | _        d| _        d| _        t        |d      | _        |xs t)        | j&                        | _        d| _        d| _        t1        j0                         | _        t1        j0                         | _        d| _        t        j                         | _        t        j                         | _        g | _        t>        jA                  d| d| d	|        y )
N)maxsize  FTr   )re   zCreated stage: z, threads: z	, queue: )!r`   ra   rb   rc   queueQueueset	processedr   processed_ordermaxr]   rf   	threadingLock
dedup_lockworkersrunning	acceptingre   r   rg   total_processedtotal_errorstimelast_activity
start_timeactive_workers	work_lock
stats_lockzombie_threadsloggerinfo)	r,   r`   ra   rb   rc   rd   re   rf   rg   s	            r-   __init__zBasePipelineStage.__init__z   s+    	"( [[4
 "e$w!$N(;<#..* 02 {A. )\,>4K[K[,\  !!YY[))+  ") $..* !odV;|nIj\Z[r/   c                    | j                   ryd| _         d| _        t        | j                        D ]a  }t	        j
                  | j                  | j                   d|dz    d      }|j                          | j                  j                  |       c t        j                  d| j                   dt        | j                         d       y)	zStart worker threadsNT-worker-   r@   r`   daemon[z
] started  workers)ru   rv   rangerc   rq   Thread_worker_loopr`   startrt   rB   r   r   len)r,   iworkers      r-   r   zBasePipelineStage.start   s    <<t(() 	(A%%T->->		{RZ[\]^[^Z_E`imnFLLNLL'	(
 	a		{*S->,?xHIr/   timeoutc           	         | j                   syd| _        t        j                  d| j                   dt        | j                         d       |dz  }t        j                         }| j                  j                         sdt        j                         |z
  |k  rJt        j                  d       | j                  j                         st        j                         |z
  |k  rJd| _         |dz  }g }| j                  D ]  }|j                         s|j                  |t        t        | j                        d	      z  
       |j                         sWt        |d      r|j                  ndt        |       }|j!                  |        |r7|| _        t        j%                  d| j                   dt        |       d       yg | _        t        j                  d| j                   d       y)z*Stop worker threads with enhanced trackingNFr   z] stopping, waiting for r   g333333?g?g333333?r   r   r`   zworker-z] z  workers did not stop gracefullyz ] all workers stopped gracefully)ru   rv   r   r   r`   r   rt   ry   rk   emptysleepis_alivejoinrp   hasattridrB   r   warning)r,   r   queue_timeoutr{   worker_timeoutalive_workersr   worker_names           r-   stopzBasePipelineStage.stop   s   || a		{":3t||;L:MXVW  #YY[
**""$z)AM)QJJsO **""$z)AM)Q  3ll 	6F NST\\9JA5N$NO??$181H&++PWXZ[aXbWcNdK!((5	6 "/DNNQtyykC,>+??_`a"$DKK!DII;&FGHr/   r:   c           
      2   | j                   s&t        j                  d| j                   d|        y| j	                  |      }| j
                  5  || j                  v r~|j                  dk(  s|j                  | j                  kD  rV|j                  | j                  kD  r3t        j                  d| j                   d| d| j                   d       	 ddd       yddd       	 | j                  j                  |d	
       | j
                  5  || j                  vr| j                  j                  |       | j                  j                  |       t        | j                        | j                  kD  rF| j                  r:| j                  j!                         }||k7  r| j                  j#                  |       ddd       y# 1 sw Y   xY w# 1 sw Y   yxY w# t        j$                  $ r& t        j                  d| j                   d       Y yw xY w)z*Add task to queue with deduplication checkr   z ] not accepting tasks, discard: Fr   z] task=[z] discarded, max retries=[z	] reachedN      ?r   Tz] queue is full)rv   r   r   r`   _generate_idrs   rn   attemptsre   rk   putaddro   rB   r   rf   popleftdiscardFull)r,   r:   task_idoldests       r-   put_taskzBasePipelineStage.put_task   s   ~~NNQtyyk)I$PQ ##D) __ 	$..(dmmq.@DMMTXTdTdDd==4#3#33NNDII;hwi7QRVRbRbQcclm 	 	 		JJNN4N- ;$..0NN&&w/((//84>>*T-@-@@TEYEY!%!5!5!=!=!?!W, NN226:; -	 	; zz 	NNQtyyk9:	s>   BG&)G B-G<G GGG G 6HHc                 \    | j                   j                         xr | j                          S )z%Check if stage is finished processingrk   r   _has_active_workersr[   s    r-   is_finishedzBasePipelineStage.is_finished  s(    
 zz!D$*B*B*D&DDr/   c                 j   | j                   5  t        | j                  | j                  d| j                  j                         | j                  t        | j                              }| j                  |j                  _        | j                  |j                  _        |cddd       S # 1 sw Y   yxY w)zGet stage statisticsF)r`   ru   disabledrd   rz   rt   N)r~   r   r`   ru   rk   qsizerz   r   rt   rw   tasks	completedrx   failed)r,   metricss     r-   	get_statszBasePipelineStage.get_stats  s     __ 	"YY::++-"00DLL)G '+&:&:GMM##'#4#4GMM 	 	 	s   BB))B2c                 2    t        | j                        dkD  S )z.Check if there are threads that failed to stopr   r   r   r[   s    r-   has_zombie_threadsz$BasePipelineStage.has_zombie_threads   s    4&&'!++r/   c                 ,    t        | j                        S )zGet count of zombie threadsr   r[   s    r-   get_zombie_countz"BasePipelineStage.get_zombie_count$  s    4&&''r/   c                    g }g }| j                   j                         sX	 | j                   j                         }|j                  |       |j                  |       | j                   j                         sX|D ]  }	 | j                   j                  |         |S # t         j                  $ r Y :w xY w# t         j                  $ r2 t        j                  d| j                   d|j                          Y w xY w)z'Get all pending tasks (for persistence)r   z ] lost task during persistence: )rk   r   
get_nowaitrB   Empty
put_nowaitr   r   r   r`   r   )r,   r   
temp_tasksr:   s       r-   get_pending_tasksz#BasePipelineStage.get_pending_tasks(  s    
 **""$zz,,.T"!!$'	 **""$  	^D^

%%d+	^  ;;  :: ^499+-Mdll^\]^s$   <B =B5B21B25AC:9C:c                 \    | j                   j                          xs | j                         S )z,Check if stage is currently processing tasksr   r[   s    r-   is_busyzBasePipelineStage.is_busy?  s&    ::##%%C)A)A)CCr/   c                     d| _         y)zStop accepting new tasksFN)rv   r[   s    r-   stop_acceptingz BasePipelineStage.stop_acceptingC  s	    r/   c                 ,    t        | j                        S )rZ   )r   rt   r[   s    r-   r\   z"BasePipelineStage.get_worker_countG  s    4<<  r/   rP   c                     |dk  r&t         j                  d| j                   d|        yt        | j                        }||k(  ry||kD  r| j                  ||z
        S | j                  ||z
        S )rR   r   r   z] invalid worker count: FT)r   r   r`   r   rt   _add_workers_remove_workers)r,   rP   current_counts      r-   rT   z BasePipelineStage.adjust_workersK  sy     19NNQtyyk)A%IJDLL)M!= $$U]%:;;''(=>>r/   c                 $    | j                  |      S )rW   )rT   rS   s     r-   rX   z"BasePipelineStage.set_worker_counta  s     ""5))r/   c                    | j                  |      s/t        j                  d| j                   dt	        |              y| j                  |      sy	 | j                  |      }|r| j                  ||      }|S # t        $ rA}t        j                  d| j                   d|        | j                  ||      cY d}~S d}~ww xY w)z9Template method for task processing with common workflow.r   z] invalid task type: Nz] task processing failed: )
_validate_task_typer   errorr`   type_pre_process_execute_task_post_process	Exception_handle_processing_error)r,   r:   resultes       r-   process_taskzBasePipelineStage.process_taskl  s     ''-LL1TYYK'<T$ZLIJ   &	:''-F ++D&9M 	:LL1TYYK'A!EF00q99	:s   &A; ;	C6C :C Cc                      y)z=Validate that the task is of the correct type for this stage.Nr7   r,   r:   s     r-   r   z%BasePipelineStage._validate_task_type       	r/   c                      y)z'Execute the core task processing logic.Nr7   r   s     r-   r   zBasePipelineStage._execute_task  r   r/   c                      y)z5Pre-processing hook. Return False to skip processing.Tr7   r   s     r-   r   zBasePipelineStage._pre_process      r/   r   c                     |S )z,Post-processing hook. Can modify the result.r7   )r,   r:   r   s      r-   r   zBasePipelineStage._post_process  s    r/   r   c                      y)z1Handle processing errors. Return None by default.Nr7   )r,   r:   r   s      r-   r   z*BasePipelineStage._handle_processing_error  r   r/   c                      y)z1Generate unique task identifier for deduplicationNr7   r   s     r-   r   zBasePipelineStage._generate_id  r   r/   c                    | j                   r	 | j                  j                  d      }| j                  5  | xj                  dz  c_        ddd       | j
                  5  t        j                         | _        ddd       	 | j                  |      }|r| j                  |       | j
                  5  | xj                  dz  c_
        ddd       | j                  5  | xj                  dz  c_        ddd       | j                  j/                          	 | j                   ryy# 1 sw Y   xY w# 1 sw Y   xY w# 1 sw Y   vxY w# t        $ r9}t        j                  d| j                   d|        | j                  j!                  |j"                  |      r| j                  j%                  |j"                        }|dkD  rt        j&                  |       |xj"                  dz  c_        | j)                  |      }|rdnd	}t        j+                  d| j                   d
| d|dd|        | j
                  5  | xj,                  dz  c_        | xj                  dz  c_
        ddd       n# 1 sw Y   nxY wY d}~d}~ww xY w# 1 sw Y   xY w# | j                  5  | xj                  dz  c_        ddd       n# 1 sw Y   nxY w| j                  j/                          w xY w# t        j0                  $ r Y 	t        $ r0}t        j                  d| j                   d|        Y d}~&d}~ww xY w)z7Main worker thread loop with pure functional processingr   r   r   Nr   z] error processing task: r   successfullyr   z] requeued z after z.1fzs delay, task: z] worker error: )ru   rk   r)   r}   r|   r~   ry   rz   r   rb   rw   r   r   r   r`   rg   should_retryr   	get_delayr   r   r   rx   	task_doner   )r,   r:   outputr   delaysuccessstatuss          r-   r   zBasePipelineStage._worker_loop  s   ll8Azz~~c~2 ^^ -''1,'- __ 5)-D&5%+!..t4F V,  2,,1,22  1++q0+1 JJ((*g ll- -5 52 2 ! 2LL1TYYK/H!LM ((55dmmQG $ 1 1 ; ;DMM J 19 JJu-*"&--"53:499+[PUVY{Zijnio'pq  2))Q.),,1,2 2 2!2,1 1 1++q0+1 1 1 JJ((*;;  Aq+;A3?@@As   (K4 D'K4 !D3;K4 0E 4D?
E K4 J4"K4 'D0,K4 3D<8K4 ?EE 
JC.J	+I7.	J	7J 	<J	J 	JJ JK4 K1+K
	K1
K"K11K4 4M
M%L==Mc                 d    | j                   5  | j                  dkD  cddd       S # 1 sw Y   yxY w)z)Check if any workers are currently activer   N)r}   r|   r[   s    r-   r   z%BasePipelineStage._has_active_workers  s,    ^^ 	+&&*	+ 	+ 	+s   &/c           
      f   | j                   s$t        j                  d| j                   d       y	 t	        | j
                        }t        |      D ]f  }||z   dz   }t        j                  | j                  | j                   d| d      }|j                          | j
                  j                  |       h t        j                  d| j                   d| d	t	        | j
                         d
       y# t        $ r/}t        j                  d| j                   d|        Y d}~yd}~ww xY w)zAdd new worker threads

        Args:
            count: Number of workers to add

        Returns:
            bool: True if workers were added successfully
        r   z'] cannot add workers: stage not runningFr   r   Tr   z] added  workers (total: )z] failed to add workers: N)ru   r   r   r`   r   rt   r   rq   r   r   r   rB   r   r   r   )r,   rP   current_worker_countr   	worker_idr   r   s          r-   r   zBasePipelineStage._add_workers  s    ||NNQtyyk)PQR	#&t||#4 5\ ,014q8	"))1B1BDII;V^_h^iIjswx##F+	, KK!DII;hug5Fs4<<GXFYYZ[\ 	LL1TYYK'@DE	s   CC8 8	D0%D++D0c           
         |dk  ryt        |t        | j                              }|dk(  ry	 | j                  | d }| j                  d|  | _        d}|D ]%  }|j                         s|j	                  |       ' t
        j                  d| j                   d| dt        | j                         d	       y# t        $ r/}t
        j                  d| j                   d
|        Y d}~yd}~ww xY w)zRemove worker threads gracefully

        Args:
            count: Number of workers to remove

        Returns:
            bool: True if workers were removed successfully
        r   TNg       @r   r   z
] removed r   r   z] failed to remove workers: F)
minr   rt   r   r   r   r   r`   r   r   )r,   rP   workers_to_removetimeout_per_workerr   r   s         r-   r   z!BasePipelineStage._remove_workers  s     A: E3t||,-A:	 !%eVW 5<<%0DL "%+ <??$KK(:K;< KK!DII;j7HT\\IZH[[\]^ 	LL1TYYK'CA3GH	s   <B; *AB; ;	C3%C..C3)r   rj   r   i Nr'   N)r'   r   )+r0   r1   r2   r3   r5   r    OutputHandlerr]   r   r   r   r   r   floatr   r   r6   r   r   r   r   r   r   r   r   r   r\   rT   rX   r9   r   r   r   r   r   r   r   r   r   r   r   r   r   r7   r/   r-   r_   r_   w   sD   I %.24\4\ "4\ 	4\
 4\ 4\ 4\ 4\ {+4\ 
4\lJ %= !IE !I !IF#\ #d #JET E$,D ,(# (4#5 .D D!# !?C ?D ?,	*c 	*d 	*: :(;2G :2     , 8K3H   $ ,   \ ) PXYdPe   #  ;Az+T +
# $ 8"S "T "r/   r_   c                   B   e Zd ZU dZdZeee      ed<   e	dee   fd       Z
e	dedee   fd       Ze	dd       Ze	ded	eeef   defd
       Ze	dedee   fd       Ze	dedeeeef      defd       Ze	dedeeeef      defd       Ze	d	eeef   defd       Zy)r*   z!Stage configuration utility classN_names_cacher'   c                    | j                   g }t        t              D ]  }|j                  d      r	 t	        t        |      }t        |t              s<t        t        d      r=|t        j                  v r+t        j                  |   t        k(  r|j                  |        || _         | j                   j                         S # t        t        f$ r Y w xY w)zGet all possible stage names_r4   )r   dirr   
startswithgetattr
isinstancer6   r   r4   rB   AttributeError	TypeErrorcopy)clsnamesr`   attrs       r-   	get_nameszStageUtils.get_names(  s     #E K( !s+
!&{D9%dD1#K1BC $(C(C C + ; ;D AT I!LL.!  %C$$&& +I6 ! !s   A-CCCr#   c                 ^    | j                  |      D cg c]  }|j                   c}S c c}w )z:Get enabled stage names from task config (based on list()))_listvalue)r  r#   r   s      r-   get_enabledzStageUtils.get_enabledA  s%     *-6):;;;;s   *c                     d| _         y)z&Clear cached stage names (for testing)N)r   )r  s    r-   clear_cachezStageUtils.clear_cacheF  s      r/   r   c                 v    |r|j                   sy| j                  |      }|syt        |j                   |d      S )a  Check if stage is enabled in configuration

        Args:
            config: Task configuration to check
            stage: Stage to check (PipelineStage enum or string name)

        Returns:
            bool: True if stage is enabled, False otherwise
        F)stages_get_attr_namer   )r  r#   r   	attr_names       r-   r+   zStageUtils.checkK  s=     V]] &&u-	 v}}i77r/   c                     |r|j                   sg S g }t        D ]5  }t        |j                   |j                  d      s%|j	                  |       7 |S )zGet list of enabled stages as PipelineStage enums

        Args:
            config: Task configuration to check

        Returns:
            List[PipelineStage]: List of enabled stages
        F)r  r   r   r	  rB   )r  r#   enabled_stages
stage_enums       r-   r  zStageUtils._lista  sR     V]]I' 	2Jv}}j&6&6>%%j1	2 r/   r  c                 6     |syt         fd|D              S )zCheck if all specified stages are enabled

        Args:
            config: Task configuration to check
            stages: List of stages to check

        Returns:
            bool: True if all stages are enabled, False otherwise
        Tc              3   B   K   | ]  }j                  |        y wNr+   .0r   r  r#   s     r-   	<genexpr>z!StageUtils.all.<locals>.<genexpr>       @399VU+@   )allr  r#   r  s   `` r-   r  zStageUtils.allu  s     @@@@r/   c                 6     |syt         fd|D              S )zCheck if any of the specified stages are enabled

        Args:
            config: Task configuration to check
            stages: List of stages to check

        Returns:
            bool: True if any stage is enabled, False otherwise
        Fc              3   B   K   | ]  }j                  |        y wr  r  r  s     r-   r  z!StageUtils.any.<locals>.<genexpr>  r  r  )anyr  s   `` r-   r!  zStageUtils.any  s     @@@@r/   c                     t        |t              r|j                  S t        |t              r	 t        |       |S y# t        $ r Y yw xY w)zConvert stage to StageConfig attribute name

        Args:
            stage: Stage as enum or string

        Returns:
            str: Attribute name or empty string if invalid
         )r   r   r	  r5   
ValueError)r  r   s     r-   r  zStageUtils._get_attr_name  sM     e]+;;s#e$   s   < 	AAr   )r0   r1   r2   r3   r   r   r   r5   r4   classmethodr  r   r
  r  r   r   r6   r+   r  r  r!  r  r7   r/   r-   r*   r*   #  sq   +(,L(49%,'$s) ' '0 < <S	 < <     8: 8eM34F.G 8D 8 8* : $}*=  & A AT%s8J2K-L AQU A A A AT%s8J2K-L AQU A A 5);#<   r/   r*   )3r3   rk   rq   ry   abcr   r   collectionsr   dataclassesr   r   typingr   r	   r
   r   r   r   r   r   r   config.schemasr   r   r   constant.systemr   
core.enumsr   core.metricsr   core.modelsr   
core.typesr   r   tools.loggerr   tools.ratelimitr   tools.retryr   r   r   r    r9   r   rO   r_   r*   r7   r/   r-   <module>r3     s   
    #  (
 
 
 ; : 4 $ % $ / # ' 7	G	 / / /" 4 4 46 +,- x  <i- iXF Fr/   