
    :jX                     ^   d Z ddlZddlZddlZddlZddlZddlZddlZddlm	Z	 ddl
m
Z
 ddlmZ ddlmZmZmZmZ ddlmZmZmZmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlm Z  ddl!m"Z" ddl#m$Z$ ddl%m&Z&  e$d      Z'e	 G d d             Z(e	 G d d             Z) G d de&      Z* G d d      Z+e,dk(  r] ejZ                         Z.e'j_                  de.        	  e*e.d      Z0 ejb                  ddd       ejb                  d dd       ejd                  dd!d"d#i      gZ3e0ji                  d$e3dd        e0ji                  d%e3dd        e0jk                  d$      Z6e0jk                  d%      Z7e'j_                  d& e8e3dd        d' e8e6              e'j_                  d& e8e3dd        d( e8e7              e0js                         Z/e'j_                  d)e/        e0ju                          e'j_                  d*        ejv                  e.       yy#  ejv                  e.       w xY w)+z
Type-safe queue persistence system for task recovery and state management.
Handles serialization/deserialization with enum-based stage management.
    N)	dataclass)datetimePath)AnyDictListUnion)PipelineStageQueueStateFieldQueueStateProviderQueueStateStatus)ProviderTask)BasePipelineStage)TaskFactory)QueueStateMetrics)AtomicFileWriter)MultiResultManager)
get_logger   )PeriodicTaskManagermanagerc                       e Zd ZU dZeed<   dZeed<   dZe	ed<   dZ
e	ed<   d	Zeed
<   d ZdedefdZdede	defdZdeeef   fdZededd fd       Zy)QueueConfigz5Lightweight configuration for queue management systempersistence_dir      N@save_interval   max_age_hours   backup_countFcompression_enabledc                     t        | j                  t              rt        | j                        | _        | j                  j	                  dd       y)z#Ensure persistence directory existsT)parentsexist_okN)
isinstancer   strr   mkdirselfs    4/root/.openclaw/workspace/harvester/manager/queue.py__post_init__zQueueConfig.__post_init__1   s>    d**C0#'(<(<#=D ""4$"?    stagereturnc                 >    |j                    d}| j                  |z  S )z&Get file path for specific stage queue_queue.jsonvaluer   )r*   r.   filenames      r+   get_queue_file_pathzQueueConfig.get_queue_file_path7   s#    kk]+.##h..r-   backup_indexc                 D    |j                    d| d}| j                  |z  S )z'Get backup file path for specific stagez_queue.backup.z.jsonr2   )r*   r.   r6   r4   s       r+   get_backup_pathzQueueConfig.get_backup_path<   s)    kk].eD##h..r-   c                 T    t         D ci c]  }|| j                  |       c}S c c}w )z&Get all stage file paths as dictionary)r   r5   )r*   r.   s     r+   get_all_stage_fileszQueueConfig.get_all_stage_filesA   s&    DQR5t//66RRRs   %	workspacec                 2    t        |      dz  } | dd|i|S )z&Create config from workspace directoryqueue_stater    r   )clsr;   kwargsr   s       r+   from_workspacezQueueConfig.from_workspaceE   s%     y/M9=?=f==r-   N)__name__
__module____qualname____doc__r   __annotations__r   floatr   intr!   r"   boolr,   r   r5   r8   r   r:   classmethodr'   rA   r>   r-   r+   r   r   '   s    ?M5M3L# %%@/ /4 /
/] /# /$ /
ST-*=%> S >s > > >r-   r   c                       e Zd ZU dZeed<   eed<   dZeed<   dZ	e
ed<   dZeeeef      ed<   ej"                  Zeed	<   d
 Zdeeef   fdZedeeef   dd fd       Zy)QueueStateInfoz6Type-safe state information for task queue persistencer.   providerr   
task_countNsaved_attasksstatusc                 v    | j                   g | _         | j                  t        j                         | _        y y N)rP   rO   r   nowr)   s    r+   r,   zQueueStateInfo.__post_init__W   s/    ::DJ== $LLNDM !r-   r/   c                    t         j                  j                  | j                  j                  t         j                  j                  | j
                  j                  t         j                  j                  | j                  t         j                  j                  | j                  j                         t         j                  j                  | j                  t         j                  j                  | j                  j                  iS )z,Convert to dictionary for JSON serialization)r   STAGEr3   r.   PROVIDERrM   
TASK_COUNTrN   SAVED_ATrO   	isoformatTASKSrP   STATUSrQ   r)   s    r+   to_dictzQueueStateInfo.to_dict]   s     !!'')9)9$$**DMM,?,?&&,,doo$$**DMM,C,C,E!!''""(($++*;*;
 	
r-   datac                    |t         j                  j                     }t        |t              rt        j                  |      }n@t        |t        t        f      rt        j                  |      }nt        j                         } | t        |t         j                  j                           t        |t         j                  j                           |t         j                  j                     ||t         j                   j                     t#        |j%                  t         j&                  j                  t"        j(                  j                                    S )z@Create instance from dictionary with flexible timestamp handlingr.   rM   rN   rO   rP   rQ   )r   rY   r3   r&   r'   r   fromisoformatrH   rG   fromtimestamprT   r   rV   r   rW   rX   r[   r   getr\   ACTIVE)r?   r^   saved_at_rawrO   s       r+   	from_dictzQueueStateInfo.from_dicth   s     O44::;lC(--l;HsEl3--l;H||~H_%:%:%@%@ AB'_-E-E-K-K(LMO66<<=,,223#DHH_-C-C-I-IK[KbKbKhKh$ij
 	
r-   )rB   rC   rD   rE   r   rF   r   rN   rH   rO   r   rP   r	   r   r'   r   r   rd   rQ   r,   r]   rJ   rf   r>   r-   r+   rL   rL   L   s    @  JHh"&E4S#X&/66F6+	
c3h 	
 
T#s(^ 
0@ 
 
r-   rL   c                   \    e Zd ZdZddededef fdZdeeef   de	fdZ
d	eeef   dd
fdZdeeef   dee   dd
fdZdeeef   dee   fdZd	eeef   dd
fdZdeeee   f   fdZdeeef   dd
fdZddZdeeef   fdZdeeef   defdZddZdedd
fdZdee   fdZ xZS )QueueManagerz8Type-safe queue manager with enum-based stage managementr;   r   shutdown_timeoutc                 6   t         |   d||       t        j                  ||      | _        | j                  j                         | _        t        j                         | _	        d | _
        t        j                  d| j                  j                          y )Nrh   r   z(Initialized type-safe queue manager at: )super__init__r   rA   configr:   stage_files	threadingLocklockstagesloggerinfor   )r*   r;   r   ri   	__class__s       r+   rm   zQueueManager.__init__   s}    8HI "00-0X  ;;::< NN$	 >t{{?Z?Z>[\]r-   r.   r/   c                     t        |t              r	 t        |      }| j                  |   S | j                  |   S # t        $ r | j
                  j                  | dz  cY S w xY w)z4Get filepath for a stage with type-safe enum supportr1   )r&   r'   r   ro   
ValueErrorrn   r   )r*   r.   
stage_enums      r+   _get_queue_filepathz QueueManager._get_queue_filepath   so    eS!K*51
''
33 ##E**  K{{22wk5JJJKs   ; %A#"A#rs   Nc                 2    || _         | j                          y)z!Start periodic queue state savingN)rs   start)r*   rs   s     r+   start_periodic_savez QueueManager.start_periodic_save   s    

r-   	task_listc                    t        |t              r	 t        |      }n|}|s| j                  |       y	 i }|D ]9  }|j                  }||vrg ||<   ||   j                  |j                                ; t        |t        j                  t        |      t        j                         g t         j"                        }|j%                         D ]   \  }}|j&                  j)                  |       " | j+                  |      }	t-        j.                  |j                         dd      }
t1        j2                  t        |	      |
       t        j5                  dt        |       d|j6                   d	       y# t        $ r t        j                  d|        Y yw xY w# t8        $ r/}t        j                  d
|j6                   d|        Y d}~yd}~ww xY w)z/Type-safe save queue state for a specific stageInvalid stage name: Nr`      F)indentensure_asciiSaved  tasks for z stagezFailed to save queue state for : )r&   r'   r   rx   rt   error_save_empty_staterM   appendr]   rL   r   MULTIlenr   rT   r   rd   itemsrP   extendrz   jsondumpsr   write_atomicru   r3   	Exception)r*   r.   r~   ry   provider_taskstaskrM   stateprovider_task_listfilepathcontentes               r+   save_queue_statezQueueManager.save_queue_state   s    eS!*51

 J"":.	TN! @==>1/1N8,x(//?	@ # +11y>!'..E 1?0D0D0F 7,,""#567 //
;HjjOG))#h-AKK&Y 0J<L<L;MVTUO  3E7;<R  	TLL::;K;K:LBqcRSS	Ts)   E6 E F 6!FF	G&%GGc           	         t        |t              r	 t        |      }n|}| j                  |      }|j                         sg S 	 t        |d      5 }t        j                  |      }ddd       	 t        j                        }g }|j                  D ])  }	 t        j                  |      }	|j                  |	       + t%        j&                         |j(                  z
  j+                         dz  }|| jB                  jD                  kD  r)t        j#                  d|j4                   d	|d
d       g S |r3t        jG                  dtI        |       d|j4                   d|d
d       |S # t        $ r t        j                  d|        g cY S w xY w# 1 sw Y   .xY w# t         $ r#}
t        j#                  d|
        Y d}
~
4d}
~
ww xY w# t,        t        f$ r9 g }j/                  t0        j2                  j4                  g       D ]W  }	 t        j                  |      }	|j                  |	       +# t         $ r"}
t        j#                  d|
        Y d}
~
Qd}
~
ww xY w |j/                  t0        j6                  j4                  d      }t        |t              rC	 t%        j8                  |      j;                         }n6# t        $ r |rt=        |      nd}Y nw xY wt        |t>        t<        f      sd}tA        j@                         |z
  dz  }Y &w xY w# t         $ r1}
t        j                  d|j4                   d|
        g cY d}
~
S d}
~
ww xY w)z/Type-safe load queue state for a specific stager   utf-8encodingNzFailed to deserialize task: i  r   zQueue state for z is .1fz hours old, skipping recoveryLoaded r   z stage (age: zh)zFailed to load queue state for r   )%r&   r'   r   rx   rt   r   rz   existsopenr   loadrL   rf   rP   r   r   r   warningr   rT   rO   total_secondsKeyErrorrc   r   r[   r3   rY   ra   	timestamprG   rH   timern   r   ru   r   )r*   r.   ry   r   fr^   r   r~   	task_datar   r   	age_hoursrO   s                r+   load_queue_statezQueueManager.load_queue_state   s    eS!*51

 J++J7 I4	h1 $Qyy|$#<&006	!& !I!*44Y?!((.! &\\^enn<KKMPTT	2 4;;444!1*2B2B1C4	RUVstu	gc)n%5[AQAQ@RR_`ijm_nnpqrw  3E7;<	$ $ % !)EaS'IJ ! j) <	!%/*?*?*E*Er!J !I!*44Y?!((.$ !)EaS'IJ !	!  88O$<$<$B$BAFh,F#+#9#9(#C#M#M#O% F6>5?AF $HsEl; H!YY[83t;	+<>  	LL::;K;K:LBqcRSI	s   E* L F'L 0&G &F =4G 1AL 36L *#FFFL  	G)GG GG ?L&H65L6	I!?ILI!!>L #KLKLK5LL LL 	M$&M
MMc                     |j                         D ]3  \  }}	 t        |      }|j                         }| j                  ||       5 y# t        $ r t
        j                  d| d       Y [w xY w)z7Save state for all queues with type-safe stage handlingzUnknown stage name: z, skipping saveN)r   r   get_pending_tasksr   rx   rt   r   )r*   rs   
stage_namer.   ry   rP   s         r+   save_all_queueszQueueManager.save_all_queues  ss    !' 		J*:6
//1%%j%8		  !5j\QRs   -A"A-,A-c                     i }t         D ]%  }| j                  |      }|s|||j                  <   ' t        d |j	                         D              }|dkD  rt
        j                  d| d       |S )z:Load state for all queues with type-safe stage enumerationc              3   2   K   | ]  }t        |        y wrS   )r   ).0r~   s     r+   	<genexpr>z/QueueManager.load_all_queues.<locals>.<genexpr>4  s     MY#i.Ms   r   r   z" total tasks from previous session)r   r   r3   sumvaluesrt   ru   )r*   	all_tasksry   r~   total_taskss        r+   load_all_queueszQueueManager.load_all_queues+  sw    	' 	8J--j9I.7	***+	8
 M):J:J:LMM?KK'+.PQRr-   c                    t        |t              r	 t        |      }n|}| j                  |      }|j                         r4	 |j                          t        j                  d|j                          yy# t        $ r t        j                  d|        Y yw xY w# t        $ r/}t        j                  d|j                   d|        Y d}~yd}~ww xY w)z9Clear saved state for a stage with type-safe enum supportr   NzCleared queue state for z Failed to clear queue state for r   )r&   r'   r   rx   rt   r   rz   r   unlinkru   r3   r   )r*   r.   ry   r   r   s        r+   clear_queue_statezQueueManager.clear_queue_state:  s    eS!*51

 J++J7??Y!6z7G7G6HIJ   3E7;<  Y?
@P@P?QQSTUSVWXXYs)   A6 2B 6!BB	C&%CCc                 <    t         D ]  }| j                  |        y)z8Clear all saved queue states using type-safe enumerationN)r   r   )r*   ry   s     r+   clear_all_stateszQueueManager.clear_all_statesM  s    ' 	/J"":.	/r-   c                    i }t         D ]M  }| j                  |      }|j                         r	 t        |d      5 }t	        j
                  |      }ddd       	 t        j                        }|j                  }|j                  }|j                  j                  }	t#        |	t$              r	 t5        |	      }n|	}t;        |j                  |||j=                         j>                  |      }|jA                          |||j                  <   t;        |j                  dt'        j0                         dt4        jF                        ||j                  <   P |S # 1 sw Y   xY w# t        t        f$ r
 j                  t        j                   j                  d      }
t#        |
t$              rH	 t'        j(                  |
      }nq# t        $ r% t'        j*                  |
rt-        |
      nd      }Y nDw xY wt#        |
t.        t,        f      rt'        j*                  |
      }nt'        j0                         }|j                  t        j2                  j                  d      }t4        j6                  j                  }	Y w xY w# t        $ r t4        j8                  }Y w xY w# tB        $ r\}t;        |j                  dt'        j0                         dt4        jD                  t%        |            ||j                  <   Y d}~d}~ww xY w)z2Get type-safe information about saved queue statesr   r   Nr   r.   rP   rO   	file_sizerQ   r.   rP   rO   r   rQ   error_message)$r   rz   r   r   r   r   rL   rf   rO   rN   rQ   r3   r   rx   rc   r   rY   r&   r'   r   ra   rb   rG   rH   rT   rX   r   rd   UNKNOWNr   statst_sizecalculate_ager   ERROREMPTY)r*   ru   ry   r   r   r^   r   rO   rN   rQ   re   status_enummetricsr   s                 r+   get_state_infozQueueManager.get_state_infoR  s   ' C	J//
;H 6h9 ,Q#yy|,? . 8 8 >#(>>%*%5%5
!&!3!3$ "&#.C*:6*BK '-/(..(!)"*--/"9"9*G ))+-4D))* *;$**%\\^+11*Z%%&{C	J A, , %j1 ?'+xx0H0H0N0NPQ'R%lC8n+3+A+A,+O#- n+3+A+AYe%BUkl+mn (sElC'/'='=l'KH'/||~H%)XXo.H.H.N.NPQ%R
!1!8!8!>!>?(  * C*:*B*BKC ! 	->(..!)"#/55&)!f.D))*	s   J&EJ&AE)J&/J:AJ&E&	!J&)A
J4G
	J
+G85J7G88BJ?J&JJ&J#J&"J##J&&	L/ALLc           
         t        |t              r	 t        |      }n|}| j                         }|j                  |j                  t        |j                  dt        j                         dt        j                              S # t        $ rO t        j                  d|        t        |dt        j                         dt        j                  d|       cY S w xY w)z&Get queue metrics for a specific stager   r   zInvalid stage: r   r   )r&   r'   r   rx   rt   r   r   r   rT   r   r   r   rc   r3   r   )r*   r.   ry   
state_infos       r+   get_queue_metricszQueueManager.get_queue_metrics  s    eS!*51
 J((*
~~ &&!'--	
 		
  	3E7;<(%\\^+11$3E7"; 	s   B ACCc                 T    | j                   r| j                  | j                          yy)z Execute periodic queue save taskN)rs   r   r)   s    r+   _execute_periodic_taskz#QueueManager._execute_periodic_task  s     ;;  - r-   c                    | j                  |      }	 t        |t        j                  dt	        j
                         g t        j                        }t        j                  |j                         d      }t        j                  t        |      |       y# t        $ r/}t        j!                  d|j"                   d|        Y d}~yd}~ww xY w)z9Save empty state to indicate clean stage with type safetyr   r`   r   )r   zFailed to save empty state for r   N)rz   rL   r   r   r   rT   r   r   r   r   r]   r   r   r'   r   rt   r   r3   )r*   r.   r   r   r   r   s         r+   r   zQueueManager._save_empty_state  s    ++E2	O"+11!'--E jj;G))#h-A 	OLL:5;;-r!MNN	Os   BB 	C %C

Cc                 t   g }g }|j                   j                         sh	 |j                   j                         }t        |t              r"|j                  |       |j                  |       |j                   j                         sh|D ]  }	 |j                   j                  |         |S #  Y )xY w#  Y .xY w)z3Extract tasks from a queue object (fallback method))queueempty
get_nowaitr&   r   r   
put_nowait)r*   r.   r~   
temp_tasksr   s        r+   _extract_tasks_from_queuez&QueueManager._extract_tasks_from_queue  s    	
 ++##%{{--/dL1$$T*%%d+ ++##%  	D&&t,	 s   AB, B3,B03B7)r   g      @r/   N)rB   rC   rD   rE   r'   rG   rm   r
   r   r   rz   r   r   r}   r	   r   r   r   r   r   r   r   r   r   r   r   r   r   __classcell__)rv   s   @r+   rh   rh   ~   sx   B^# ^e ^V[ ^$+}c/A)B +t +$s4E/E*F 4 
0TeM3,>&? 0TDQ]L^ 0Tcg 0TdDeM3,>&? DDDV DLd30A+A&B t c4+=&=!> Yu]C-?'@ YT Y&/
IS*;%; < IV
u]C-?'@ 
EV 
<.
O} O O($|2D r-   rh   c                   L    e Zd ZdZdededeeef   fdZ	d Z
dd	Zd
eddfdZy)GracefulShutdownz7Handles graceful shutdown with queue state preservationqueue_managerresult_managerrs   c                    || _         || _        || _        d| _        t	        j                  t        j
                  | j                         t	        j                  t        j                  | j                         t        j                  d       y )N   z%Registered graceful shutdown handlers)
r   r   rs   ri   signalSIGINT_signal_handlerSIGTERMrt   ru   )r*   r   r   rs   s       r+   rm   zGracefulShutdown.__init__  sb     +, " 	fmmT%9%9:fnnd&:&:;;<r-   c                 V    t         j                  d| d       | j                          y)zHandle shutdown signalszReceived signal z!, initiating graceful shutdown...N)rt   ru   shutdown)r*   signum_s      r+   r   z GracefulShutdown._signal_handler  s"    &vh.OPQr-   r/   Nc                    t        j                          }	 t        j                  d       | j                  j	                         D ]  }|j                           t        j                  d       | j                  j                          t        j                  d       | j                  j                  | j                         | j                  t        j                          |z
  z
  }|dkD  r+t        j                  d|dd       | j                  |       t        j                  d       | j                  j                  | j                         | j                  j                          | j                  j                          t        j                  d	       t!        j"                  d       y# t        $ r"}t        j                  d
|        Y d}~<d}~ww xY w# t!        j"                  d       w xY w)zPerform graceful shutdownzStopping task acceptance...zFlushing result buffers...zSaving queue states...r   zWaiting up to r   zs for tasks to complete...zFinal state save...zGraceful shutdown completedz Error during graceful shutdown: N)r   rt   ru   rs   r   stop_acceptingr   	flush_allr   r   ri   _wait_for_completionstopstop_allr   r   sysexit)r*   
start_timer.   remaining_timer   s        r+   r   zGracefulShutdown.shutdown  s{   YY[
"	KK56++- '$$&' KK45))+ KK01..t{{; "22diikJ6NON!n^C,@@Z[\)).9 KK-...t{{; ##%((*KK56 HHQK	  	ALL;A3?@@	A HHQKs*   FF0 0	G9GG GG G5timeoutc                    t        j                          }t        j                          |z
  |k  rd}| j                  j                         D ]4  }|j                         rd} n |j                  j                         r2d} n |rt        j                  d       yt        j                  d       t        j                          |z
  |k  ryy)zWait for tasks to completeTFz%All stages idle, shutdown can proceedr   N)	r   rs   r   is_busyr   r   rt   ru   sleep)r*   r   r   all_idler.   s        r+   r   z%GracefulShutdown._wait_for_completion,  s    YY[
iikJ&0H++- ==?$H**,$H CDJJqM iikJ&0r-   r   )rB   rC   rD   rE   rh   r   r   r'   r   rm   r   r   rG   r   r>   r-   r+   r   r     sL    A=)=;M=W[\_ar\rWs=
&PE d r-   r   __main__zTesting in workspace: r   rk   openaiz"test"geminizhttp://example.comkey_patternzsk-.*searchgatherr   z search tasks, loaded z gather tasks, loaded zState info: zQueue manager test completed!)<rE   r   shutilr   r   tempfilerp   r   dataclassesr   r   pathlibr   typingr   r   r	   r
   
core.enumsr   r   r   r   core.modelsr   
stage.baser   stage.factoryr   state.modelsr   storage.atomicr   storage.persistencer   tools.loggerr   baser   rt   r   rL   rh   r   rB   mkdtempr;   ru   qmcreate_search_taskcreate_acquisition_task
test_tasksr   r   loaded_searchloaded_gatherr   r   r   rmtreer>   r-   r+   <module>r     sG  
    
    !   ) )  % ( % * + 2 # %	I	 !> !> !>H .
 .
 .
bm& m`Q Qh z   "I
KK(45 !)15 +K**8XqA*K**8XqA/K//:NQ^`gPhi

 	Hj!n5
Hjn5 ++H5++H5fSBQ011GMHZG[\]fSAB011GMHZG[\]   "l4&)* 		34 	i K J 	i s    D$H H,