
    :jT                     0   d Z ddlZddlZddlZddlmZmZ ddlmZ ddl	m
Z
mZmZ ddlmZ ddlmZmZmZmZmZmZmZ ddlmZ dd	lmZ dd
lmZmZmZ ddlm Z  ddl!m"Z"  e d      Z# G d de      Z$ G d de$      Z% G d de"      Z&	 d3dede'dee$   de&fdZ(e)dk(  r4 eeeeeee      Z* e(e*      Z+ G d d      Z,eD ].  Z-e+j]                  e-j^                   e,e-j^                               0 e+ja                          	  e1d      D ]  Z2eD ]  Z-e-ejf                  k(  r ejh                  dd       Z5nYe-ejl                  k(  r ejh                  d!d"      Z5n6e-ejn                  k(  r ejh                  d d#      Z5n ejh                  dd      Z5 ee-j^                  e5e+jp                  e-j^                     jr                   ejt                  d$d       ejt                  d%d&      '      Z;e+jy                  e-j^                  e;        e+j{                         Z>e#j                  d(e2dz    d)       e>j                  j                         D ]N  \  ZBZCe#j                  d*eB d+eCjr                   d,eCj                   d-eCjj                   d.eCj                  d/
       P  ej                  d0        	 e+j                          e#j                  d2       yy# eG$ r e#j                  d1       Y ;w xY w# e+j                          e#j                  d2       w xY w)4z
Load balancing system for dynamic worker thread management.
Automatically adjusts worker counts based on queue sizes and processing speeds.
    N)ABCabstractmethod)deque)DictOptionalTuple)WorkerManagerConfig)DEFAULT_ADJUSTMENT_INTERVALDEFAULT_MAX_WORKERSDEFAULT_MIN_WORKERSDEFAULT_SCALE_DOWN_THRESHOLDDEFAULT_SCALE_UP_THRESHOLDDEFAULT_TARGET_QUEUE_SIZELB_RECENT_HISTORY_SIZE)PipelineStage)WorkerManageable)StageWorkerStatusWorkerMetricsWorkerStatus)
get_logger   )ConditionalTaskManagermanagerc                   *    e Zd ZdZededefd       Zy)ScalingStrategyz1Abstract base class for worker scaling strategiesmetricsreturnc                      y)zCalculate target worker count based on metrics

        Args:
            metrics: Current stage metrics snapshot

        Returns:
            int: Target worker count
        N )selfr   s     5/root/.openclaw/workspace/harvester/manager/worker.pycalculate_targetz ScalingStrategy.calculate_target&   s     	    N)__name__
__module____qualname____doc__r   r   intr"   r   r#   r!   r   r   #   s%    ;	 	# 	 	r#   r   c                   H    e Zd ZdZeeeeefde	de	de	de
de
f
dZded	e	fd
Zy)DefaultScalingz<Default scaling strategy based on queue size and utilizationmin_workersmax_workerstarget_queue_sizescale_up_thresholdscale_down_thresholdc                 J    || _         || _        || _        || _        || _        y Nr+   r,   r-   r.   r/   )r    r+   r,   r-   r.   r/   s         r!   __init__zDefaultScaling.__init__6   s,     '&!2"4$8!r#   r   r   c                    |j                   dkD  r-t        dt        |j                  | j                  z              }n|j
                  | j                  kD  r$t        | j                  |j                  dz         }nI|j
                  | j                  k  r$t        | j                  |j                  dz
        }n|j                  }t        | j                  t        | j                  |            S )z9Calculate target workers using queue size and utilizationr   r   )processing_ratemaxr(   
queue_sizer-   utilizationr.   minr,   current_workersr/   r+   )r    r   targets      r!   r"   zDefaultScaling.calculate_targetD   s     ""Q&C 2 2T5K5K KLMF ""T%<%<<T--w/F/F/JK$$t'@'@@T--w/F/F/JK 00 4##S)9)96%BCCr#   N)r$   r%   r&   r'   r   r   r   r   r   r(   floatr3   r   r"   r   r#   r!   r*   r*   3   sa    F /.!:$>&B99 9 	9
 "9 $9D D# Dr#   r*   c                       e Zd ZdZ	 	 ddededee   f fdZde	de
fd	Zde	d
efdZddZddZde	fdZde	d
efdZded
efdZde	d
efdZde	d
efdZde	deded
dfdZddZd
efdZd
efdZddZde	ded
efdZ xZS )WorkerManagerz5Dynamic worker manager for pipeline thread managementNconfigshutdown_timeoutscaling_strategyc                    t         |   d|j                  dz  |       i | _        i | _        |j
                  | _        |j                  | _        |j                  | _        |j                  | _        |j                  | _	        |j                  | _        |j                  | _
        |xsB t        | j
                  | j                  | j                  | j                  | j                        | _        t        j                         | _        i | _        i | _        i | _        d| _        d | _        d| _        t,        j/                  d       y )Nr>      r2   g        zInitialized worker manager)superr3   adjustment_intervalworker_metricsstagesr+   r,   r-   r.   r/   log_recommendationsr*   rA   	threadingLocklockmetrics_historylast_recommendationspending_recommendationslast_batch_log_timelast_known_statslast_stats_update_timeloggerinfo)r    r?   r@   rA   	__class__s       r!   r3   zWorkerManager.__init__Z   s1    	&*D*Dq*HJZ[8:35 "--!--!'!9!9"(";";$*$?$?!#)#=#= #)#=#=  !1 !
N(((("44#66!%!:!:5
 NN$	 24 HJ! DF$*-  9=-0#01r#   
stage_namestage_instancec                     | j                   5  || j                  |<   t        |      | j                  |<   t	        d      | j
                  |<   ddd       t        j                  d|        y# 1 sw Y   "xY w)zRegister a pipeline stage for worker management

        Args:
            stage_name: Name of the stage
            stage_instance: Stage instance (implementing WorkerManageable interface)
        )stage2   )maxlenNz(Registered stage for worker management: )rK   rG   r   rF   r   rL   rR   rS   )r    rU   rV   s      r!   register_stagezWorkerManager.register_stage   sr     YY 	@&4DKK
#.;*.MD
+/4B/?D  ,	@
 	>zlKL	@ 	@s   AA00A9r   c                     | j                   5  || j                  vr
	 ddd       y| j                  |   }t        |t              cddd       S # 1 sw Y   yxY w)zCheck if a stage supports worker adjustment

        Args:
            stage_name: Name of the stage to check

        Returns:
            bool: True if stage supports worker adjustment
        NF)rK   rG   
isinstancer   )r    rU   rX   s      r!   is_stage_adjustablez!WorkerManager.is_stage_adjustable   sT     YY 	7,	7 	7 KK
+Ee%56	7 	7 	7s   AAAc                 $    | j                          y)z*Flush pending recommendations when stoppedN)_flush_recommendation_batchr    s    r!   _on_stoppedzWorkerManager._on_stopped   s    ((*r#   c                 $    | j                          y)z$Handle task manager completion eventN)mark_finishedra   s    r!   _on_task_completionz!WorkerManager._on_task_completion   s    r#   c           	      `   | j                   5  || j                  vr
	 ddd       y| j                  |   }|j                  |_        |j                  |_        |j                  |_        |j
                  |_        |j                  dkD  r5t        d|j                  |j                  | j                  z  z        |_        | j                  |   j                  t        j                         |j                  |j                  |j                  |j                  d       ddd       y# 1 sw Y   yxY w)z#Update metrics for a specific stageNr   g      ?)	timestampr7   workersr8   r5   )rK   rF   r7   r:   r5   avg_processing_timer9   r-   r8   rL   appendtime)r    rU   metrics_datar   s       r!   update_metricszWorkerManager.update_metrics   s   YY 	!4!44	 	 ))*5G ".!8!8G&2&B&BG#&2&B&BG#*6*J*JG' &&*&)#w/A/AWE\E\_c_u_uEu/v&w#   ,33!%")"4"4&66#*#6#6'.'>'>#	 	 	s   D$C5D$$D-c                    | j                   5  || j                  vr| j                  cddd       S | j                  |   }t        |j                  |j
                  |j                  |j                  |j                        }ddd       | j                        }| j                   5  || j                  v r|| j                  |   _
        ddd       |S # 1 sw Y   SxY w# 1 sw Y   |S xY w)z(Get recommended worker count for a stageNrX   r:   r7   r5   r8   )rK   rF   r+   r   rX   r:   r7   r5   r8   _calculate_target_workerstarget_workers)r    rU   r   snapshotrq   s        r!   get_recommended_workersz%WorkerManager.get_recommended_workers   s    YY 	!4!44''	 	
 ))*5G$mm ' 7 7"-- ' 7 7#//H	 77A YY 	PT000AO##J/>	P -	 	$	P s   CAC'#C C C*r   c                 t    | j                   j                  |      }| j                  |j                  |      }|S )z/Calculate target workers using scaling strategy)rA   r"   _apply_trend_analysisrX   )r    r   rq   s      r!   rp   z'WorkerManager._calculate_target_workers   s8     ..??H 33GMM>Rr#   c                    | j                   5  || j                  vr
	 ddd       y| j                  |   }t        j                         }||j                  z
  | j
                  k  r
	 ddd       yt        |j                  |j                  |j                  |j                  |j                        }ddd       | j                        }||j                  k7  S # 1 sw Y   )xY w)z(Check if worker count should be adjustedNFro   )rK   rF   rk   	monotoniclast_adjustmentrE   r   rX   r:   r7   r5   r8   rp   )r    rU   r   current_timerr   recommendeds         r!   should_adjust_workersz#WorkerManager.should_adjust_workers   s    YY 	!4!44	 	 ))*5G>>+L g5558P8PP	 	 %mm ' 7 7"-- ' 7 7#//H	* 44X>h6666-	 	s   CA C/ACC"c                    | j                   5  || j                  vr
	 ddd       y| j                  |   }|| j                  vr
	 ddd       y| j                  |   }|j                  }t	        |j
                  |j                  |j                  |j                  |j                        }ddd       | j                        }|k(  ry	 t        t              r|j                  |      }n| j                  |||       d}|r|| j                   5  || j                  v r:t        j                         | j                  |   _        || j                  |   _        ddd       t$        j'                  d| d| d|        y	 y# 1 sw Y   xY w# 1 sw Y   6xY w# t(        $ r%}t$        j+                  d| d	|        Y d}~yd}~ww xY w)
zAdjust worker count for a stageNFro   z	Adjusted 
 workers:  -> TzFailed to adjust workers for : )rK   rG   rF   r:   r   rX   r7   r5   r8   rp   r]   r   adjust_workers_log_worker_recommendationrk   rw   rx   rq   rR   rS   	Exceptionerror)	r    rU   rX   r   r:   rr   rq   successes	            r!   r   zWorkerManager.adjust_workers  s    YY 	,	 	 KK
+E!4!44	 	 ))*5G%55O %mm ' 7 7"-- ' 7 7#//H	* 77A_,	L%!12..~> //
O^\YY X!T%8%88JN..JZ++J7GIW++J7FX
 i
|:o=NdSaRbcd  a	 	JX X  	LLL8BqcJKK	LsI   FFAF
AF A	F&F F
FF 	G"GGr:   rq   c                     | j                   syt        j                         }||f| j                  |<   || j                  z
  | j
                  k\  r| j                          || _        yy)z9Log worker recommendation with batching and deduplicationN)rH   rk   rw   rN   rO   rE   r`   )r    rU   r:   rq   ry   s        r!   r   z(WorkerManager._log_worker_recommendationA  si     ''~~' 5D^3T$$Z0 $222d6N6NN,,.'3D$ Or#   c                    | j                   syg }| j                   j                         D ]'  \  }\  }}||k7  s|j                  | d| d|        ) |rQt        |      dk(  rt        j                  d|d           n't        j                  ddj                  |              | j                   j                          y)	z9Flush pending recommendations as a single batch log entryNr   r~   r   z"Worker adjustment recommendation: r   z#Worker adjustment recommendations: z, )rN   itemsrj   lenrR   rS   joinclear)r    adjustmentsrU   currentr;   s        r!   r`   z)WorkerManager._flush_recommendation_batchQ  s    ++ -1-I-I-O-O-Q 	K)J)&& ""j\G9D#IJ	K ;1$@Q@PQRA$))KBXAYZ[ 	$$**,r#   c                    	 | j                   j                  d      r1	 t        | j                        }| j                   j	                          nt        j
                         }| j                  W|| j                  z
  dk  rE| j                  }t        ||j                  |j                  |j                  |j                  d      S t        |i dddd      S i }d}d}d}|j                         D ]  \  }}	t        |	j                  |	j                   |	j"                  |	j$                  |	j&                  |	j(                  	      ||<   ||	j                  z  }||	j                   z  }||	j"                  z  } t        j
                         }t        |||||
      }
|
| _        || _        |
S # | j                   j	                          w xY w# t*        $ r7}t        t        j
                         i ddddt-        |            cY d}~S d}~ww xY w)z@Get current worker management statistics with timeout protection      ?)timeoutNg      >@lock_timeout_cached)rg   rG   total_workerstotal_target_workerstotal_queue_sizestatusr   lock_timeout_no_cache)r:   rq   r7   r8   r5   rx   )rg   rG   r   r   r   r   )rg   rG   r   r   r   r   r   )rK   acquiredictrF   releaserk   rP   rQ   r   rG   r   r   r   r   r   r:   rq   r7   r8   r5   rx   r   str)r    worker_metrics_copyry   cached_statsrG   r   r   r   rU   r   statsr   s               r!   get_worker_statszWorkerManager.get_worker_statse  s   P	yy   -(*.t/B/B*C'II%%'  $yy{))5,IdId:dgk:k $(#8#8L'".+22&2&@&@-9-N-N)5)F)F4  (".!&'-.)*6  FM#$  ':'@'@'B 7#
G%6$+$;$;#*#9#9&11 ' 3 3$+$;$;$+$;$;&z" !8!88$(>(>>$ G$6$66 7  99;L &+%9!1E %*D!*6D'L} II%%'~  
	))+%&!"!f 
	sA   G F( BG G CG (GG 	H,H<HHc                 n     t         fdt         j                  j                               D              S )z*Check if any stage needs worker adjustmentc              3   @   K   | ]  }j                  |        y wr1   )r{   ).0rU   r    s     r!   	<genexpr>z0WorkerManager._should_execute.<locals>.<genexpr>  s     mj4--j9ms   )anylistrF   keysra   s   `r!   _should_executezWorkerManager._should_execute  s)    mDQUQdQdQiQiQkLlmmmr#   c                     t        | j                  j                               D ]%  }| j                  |      s| j	                  |       ' y)z1Handle worker adjustments for stages that need itN)r   rF   r   r{   r   )r    rU   s     r!   _handle_conditionzWorkerManager._handle_condition  sA    t22779: 	0J))*5##J/	0r#   c                    || j                   vr|S | j                   |   }t        |      dk  r|S t        |      t         d D cg c]  }|d   	 c}t              dk\  rt	        fdt        t              dz
        D              rt        | j                  |dz         }|S t	        fdt        t              dz
        D              r5|| j                  |   j                  k  r| j                  |   j                  }|S c c}w )z3Apply trend analysis to worker count recommendation   Nr7      c              3   :   K   | ]  }|   |d z      k    ywr   Nr   r   irecent_queue_sizess     r!   r   z6WorkerManager._apply_trend_analysis.<locals>.<genexpr>  s&     r!%a(,>q1u,EEr   r   c              3   :   K   | ]  }|   |d z      k\    ywr   r   r   s     r!   r   z6WorkerManager._apply_trend_analysis.<locals>.<genexpr>  s&     tA'*.@Q.GGtr   )
rL   r   r   r   allranger9   r,   rF   r:   )r    rU   rq   historyentryr   s        @r!   ru   z#WorkerManager._apply_trend_analysis  s   T111!!&&z2w<!!! @DG}NdMdMe?fgeeL1g !"a'ruUXYkUlopUpOqrr!$T%5%5~7I!J  tQVWZ[mWnqrWrQstt!D$7$7
$C$S$SS%)%8%8%D%T%TN hs   D      @N)r   N)r$   r%   r&   r'   r	   r<   r   r   r3   r   r   r[   boolr^   rb   re   rm   r(   rs   r   rp   r{   r   r   r`   r   r   r   r   ru   __classcell__)rT   s   @r!   r>   r>   W   s&   ?
 #&6:	/2#/2  /2 #?3	/2bM M>N M7c 7d 7 + :# # 4 3 7 7 743 3 3j4S 43 4`c 4hl 4 -(R, Rhn n0 S S r#   r>   r   r?   r@   rA   r   c                     t        | ||      S )z)Factory function to create worker manager)r@   rA   )r>   )r?   r@   rA   s      r!   create_worker_managerr     s     2BUeffr#   __main__)r+   r,   r-   rE   r.   r/   c                       e Zd Zd Zd Zy)	MockStagec                      || _         d| _        y )NrC   )nameworker_count)r    r   s     r!   r3   zMockStage.__init__  s    DI !Dr#   c                 v    t         j                  d| j                   d| j                   d|        || _        y)Nz
Adjusting r}   r~   T)rR   rS   r   r   )r    counts     r!   r   zMockStage.adjust_workers  s9    KK*TYYKz$:K:K9LDQVPWXY %Dr#   N)r$   r%   r&   r3   r   r   r#   r!   r   r     s    	"	r#   r      rY   
   d      r   g?g       @)rX   r7   r:   r5   ri   z
Iteration :z  z
: workers=z->z, queue=z, util=z.2frC   zStopping worker manager test...zWorker manager test completed!r   )Ir'   randomrI   rk   abcr   r   collectionsr   typingr   r   r   config.schemasr	   constant.systemr
   r   r   r   r   r   r   
core.enumsr   
stage.baser   state.modelsr   r   r   tools.loggerr   baser   rR   r   r*   r>   r<   r   r$   r?   r   r   rX   r[   valuestartr   r   SEARCHrandintr7   GATHERCHECKrF   r:   uniformmetrics_updaterm   r   r   rS   rG   r   rU   stage_statsrq   r8   sleepKeyboardInterruptstopr   r#   r!   <module>r      s1  
    #  ( ( .   % ' G G # (	I	c  !D_ !DHB* BL osgg38gRZ[jRkgg z !''3759F $F+G   Du{{Iekk,BCD MMO)6r  	A& DM000!/2!6Jm222!/C!8Jm111!/C!8J!/2!6J "/++)$+$:$:5;;$G$W$W$2FNN3$<(6sC(@" &&u{{NC)D. ,,.EKK*QqSE+,+0<<+=+=+? '
KJ{/J/J.K2kNhNhMi j(334GK<S<STW;XZ DJJqMA 	L 	45Q F  7567 	45s+   F-K 2K2 K/,K2 .K//K2 2#L