
    :j+                         d Z ddlZddlZddlZddlZddlmZmZ ddlm	Z	m
Z
mZmZ ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ  ed      Z G d de      Z G d de      Z G d de      Z G d d      Zy)z
Persistence strategies for different storage modes.

This module defines the strategy pattern implementation for handling
different persistence approaches: simple file-based and shard-based storage.
    N)ABCabstractmethod)AnyDictListOptional)PersistenceMetrics)
get_logger   )AtomicFileWriter)NDJSONShardWriter)SnapshotManagerstoragec            	       z    e Zd ZdZdedeeef   fdZededee	   de
dd	fd
       Zedefd       Zedd       Zy	)PersistenceStrategyzAbstract base class for persistence strategies.

    Defines the interface that all persistence strategies must implement.
    This allows for clean separation between simple file-based and
    shard-based persistence approaches.
    	directoryfilesc                      || _         || _        y)zInitialize strategy with directory and file mappings.

        Args:
            directory: Base directory for storage
            files: Mapping of result types to file paths
        N)r   r   )selfr   r   s      9/root/.openclaw/workspace/harvester/storage/strategies.py__init__zPersistenceStrategy.__init__#   s     #
    result_typeitemsstatsreturnNc                      y)zWrite data items to storage.

        Args:
            result_type: Type of result being stored
            items: List of items to store
            stats: Metrics object to update
        N )r   r   r   r   s       r   
write_datazPersistenceStrategy.write_data-   s     	r   c                      y)zCheck if this strategy supports snapshot generation.

        Returns:
            True if snapshots are supported, False otherwise
        Nr   r   s    r   supports_snapshotsz&PersistenceStrategy.supports_snapshots8   s     	r   c                      y)zCleanup strategy resources.Nr   r!   s    r   cleanupzPersistenceStrategy.cleanupA   s     	r   r   N)__name__
__module____qualname____doc__strr   r   r   r   r   r	   r   boolr"   r$   r   r   r   r   r      s    # d38n  c $s) DV [_   D    r   r   c                   B    e Zd ZdZdedee   deddfdZde	fdZ
d
d	Zy)SimpleFileStrategyzSimple text file persistence strategy.

    Stores data as plain text files, one item per line.
    Does not support snapshots or advanced features.
    r   r   r   r   Nc                    |sy| j                   j                  |      }|st        j                  d|        y	 g }|D ]H  }t	        |d      r |j                  |j                                /|j                  t        |             J t        j                  ||       t        j                  dt        |       d| d       y# t        $ r%}t        j                  d| d|        Y d}~yd}~ww xY w)	zWrite items to simple text file.

        Args:
            result_type: Type of result being stored
            items: List of items to store
            stats: Metrics object to update
        Nz)No file path configured for result type: 	serializeSaved  z items to simple fileFailed to write z to simple file: )r   getloggererrorhasattrappendr/   r*   r   append_atomicinfolen	Exception)r   r   r   r   filepathlinesitemes           r   r   zSimpleFileStrategy.write_dataN   s     ::>>+.LLD[MRS	OE ,4-LL!12LLT+	, **8U;KK&UAk]:OPQ 	OLL+K=8I!MNN	Os   B
C 	C4C//C4c                      y)z&Simple files do not support snapshots.Fr   r!   s    r   r"   z%SimpleFileStrategy.supports_snapshotso   s    r   c                      y)z#No cleanup needed for simple files.Nr   r!   s    r   r$   zSimpleFileStrategy.cleanups   s    r   r%   )r&   r'   r(   r)   r*   r   r   r	   r   r+   r"   r$   r   r   r   r-   r-   G   sE    Oc O$s) ODV O[_ OBD r   r-   c                   x     e Zd ZdZdedeeef   f fdZdedee   de	dd	fd
Z
defdZddZdedefdZ xZS )ShardStrategyzNDJSON shard persistence strategy.

    Stores data as NDJSON shard files with indexing and rotation.
    Supports snapshot generation and advanced features.
    r   r   c                 f    t         |   ||       i | _        t        j                         | _        y)zInitialize shard strategy.

        Args:
            directory: Base directory for storage
            files: Mapping of result types to file paths
        N)superr   _shard_writers	threadingLock_lock)r   r   r   	__class__s      r   r   zShardStrategy.__init__   s*     	E*<>^^%
r   r   r   r   r   Nc                    |sy	 g }|D ]`  }t        |d      r6|j                         }	 |j                  t        j                  |             E|j                  dt        |      i       b | j                  |      }|j                  ||       t        j                  dt        |       d| d       y# t
        $ r |j                  d|i       Y w xY w# t
        $ r%}t        j                  d| d|        Y d}~yd}~ww xY w)	zWrite items to NDJSON shard files.

        Args:
            result_type: Type of result being stored
            items: List of items to store
            stats: Metrics object to update
        Nr/   valuer0   r1   z items to shardr2   z to shard: )r6   r/   r7   jsonloadsr;   r*   _get_shard_writerappend_recordsr4   r9   r:   r5   )	r   r   r   r   recordsr>   
serializedwriterr?   s	            r   r   zShardStrategy.write_data   s     	IG 
94-!%!1J>tzz*'=>
 NNGSY#78
9 ++K8F!!'51KK&Wa}OLM % >'<=>  	ILL+K=A3GHH	Is;   #C $B5A'C 5CC CC 	D D  Dc                      y)zShard files support snapshots.Tr   r!   s    r   r"   z ShardStrategy.supports_snapshots   s    r   c                 z    | j                   5  | j                  j                          ddd       y# 1 sw Y   yxY w)z$Cleanup shard writers and resources.N)rI   rF   clearr!   s    r   r$   zShardStrategy.cleanup   s0    ZZ 	(%%'	( 	( 	(s   1:c                    | j                   5  | j                  j                  |      }|sEt        j                  j                  | j                  d      }t        ||      }|| j                  |<   |cddd       S # 1 sw Y   yxY w)zGet or create shard writer for result type.

        Args:
            result_type: Type of result

        Returns:
            NDJSONShardWriter instance
        shardsN)rI   rF   r3   ospathjoinr   r   )r   r   rS   
shard_roots       r   rO   zShardStrategy._get_shard_writer   sp     ZZ 	((,,[9FWW\\$..(C
*:{C39##K0	 	 	s   A$A;;Br%   )r&   r'   r(   r)   r*   r   r   r   r   r	   r   r+   r"   r$   r   rO   __classcell__)rJ   s   @r   rC   rC   x   su    	&# 	&d38n 	&!Ic !I$s) !IDV !I[_ !IFD (
S 5F r   rC   c                       e Zd ZdZdedee   defdZdedefdZde	eef   fd	Z
dd
eddfdZddZde	eef   fdZd
eddfdZy)r   zDedicated snapshot management with periodic building support.

    Manages the lifecycle of snapshot generation including periodic
    background building and proper cleanup.
    r   result_typesprovider_namec                     || _         || _        || _        d| _        d| _        t        j                         | _        ddddd| _        y)zInitialize snapshot manager.

        Args:
            directory: Base directory containing shards
            result_types: List of result types to manage
            provider_name: Name of the provider for logging
        NFg        r   )last_snapshotsnapshot_counttotal_snapshot_timesnapshot_operations)	r   r_   r`   _periodic_thread_runningrG   rH   rI   r   )r   r   r_   r`   s       r   r   zSnapshotManager.__init__   sO     #(* =A^^%
 (+aX[tuv
r   r   r   c           	         t         j                  j                  | j                  d|      }t         j                  j	                  |      s&t
        j                  d| d| j                          yt         j                  j                  | j                  d      }t        j                  |d       t         j                  j                  || d      }	 t        ||      }t        j                         }|j                         }t        j                         |z
  }| j                  5  t        j                         | j                  d	<   | j                  d
xx   dz  cc<   | j                  dxx   |z  cc<   | j                  dxx   dz  cc<   ddd       t
        j                  d| d| d|dd       |S # 1 sw Y   +xY w# t        $ r%}	t
        j!                  d| d|	        Y d}	~	yd}	~	ww xY w)zBuild snapshot for specific result type.

        Args:
            result_type: Type of result to build snapshot for

        Returns:
            Number of records in the snapshot
        rX   zNo shard directory for z in r   	snapshotsT)exist_okz.jsonrb   rc   r   rd   re   NzBuilt snapshot for z with z records in z.3fsFailed to build snapshot for : )rY   rZ   r[   r   isdirr4   debugr`   makedirsBaseSnapshotManagertimebuild_snapshotrI   r   r9   r;   r5   )
r   r   r\   snapshots_dirsnapshot_pathmanager
start_timecountdurationr?   s
             r   rs   zSnapshotManager.build_snapshot   s    WW\\$..(KH
ww}}Z(LL2;-tDDVDVCWXYT^^[A
MD1]{m54IJ	)*mDGJ**,Eyy{Z/H  7.2iik

?+

+,1,

01X=1

01Q61	7 KK-k]&|T\]`SaabcdL7 7  	LL8RsKL	s2   AG )A'F:)G :G?G 	G4G//G4c                     i }| j                   D ]  }	 | j                  |      ||<    |S # t        $ r*}t        j	                  d| d|        d||<   Y d}~Jd}~ww xY w)z}Build snapshots for all result types.

        Returns:
            Dictionary mapping result types to record counts
        rl   rm   r   N)r_   rs   r;   r4   r5   )r   resultsr   r?   s       r   build_all_snapshotsz#SnapshotManager.build_all_snapshots	  su     ,, 	)K)'+':':;'G$	)   )<[MA3OP'($)s   +	A AAinterval_secNc                    | j                   5  | j                  rF| j                  j                         r,t        j	                  d| j
                          	 ddd       yd| _        t        j                  | j                  |fdd| j
                         | _        | j                  j                          t        j                  d| j
                   d| d       ddd       y# 1 sw Y   yxY w)	zyStart periodic snapshot building.

        Args:
            interval_sec: Interval between snapshots in seconds
        z&Periodic snapshot already running for NTz	snapshot-)targetargsdaemonnamezStarted periodic snapshots for z (interval: zs))rI   rf   is_aliver4   ro   r`   rg   rG   Thread_periodic_loopstartr9   )r   r}   s     r   start_periodiczSnapshotManager.start_periodic  s     ZZ 
	l$$)>)>)G)G)IEdFXFXEYZ[
	l 
	l
 !DM$-$4$4**,V_`d`r`r_sTt%D! !!'')KK9$:L:L9M\ZfYggijk
	l 
	l 
	ls   A
C& A=C&&C/c                    | j                   5  | j                  s
	 ddd       yd| _        | j                  rs| j                  j                         rY| j                  j	                  d       | j                  j                         r#t
        j                  d| j                   d       d| _        t
        j                  d| j                          ddd       y# 1 sw Y   yxY w)z,Stop periodic snapshot building and cleanup.NFg      @)timeoutzSnapshot thread for z did not stop gracefullyzStopped periodic snapshots for )	rI   rg   rf   r   r[   r4   warningr`   r9   r!   s    r   stopzSnapshotManager.stop*  s    ZZ 	P==	P 	P "DM$$)>)>)G)G)I%%**3*7((113NN%9$:L:L9MMe#fg$(D!KK9$:L:L9MNO	P 	P 	Ps   CB/CC%c                 z    | j                   5  | j                  j                         cddd       S # 1 sw Y   yxY w)zcGet snapshot statistics.

        Returns:
            Dictionary with snapshot statistics
        N)rI   r   copyr!   s    r   	get_statszSnapshotManager.get_stats;  s.     ZZ 	%::??$	% 	% 	%s   1:c                    | j                   rA	 t        j                  |       | j                   sy| j                          | j                   r@yy# t        $ r/}t
        j                  d| j                   d|        Y d}~Bd}~ww xY w)zmPeriodic snapshot building loop.

        Args:
            interval_sec: Interval between snapshots
        zError in periodic snapshot for rm   N)rg   rr   sleepr|   r;   r4   r5   r`   )r   r}   r?   s      r   r   zSnapshotManager._periodic_loopD  sw     mmZ

<(}}((* mm  Z>t?Q?Q>RRTUVTWXYYZs   !A A 	B%BB)i,  r%   )r&   r'   r(   r)   r*   r   r   intrs   r   r|   r   r   r   r   r   r   r   r   r   r      s    w# wT#Y ws w($# $# $LT#s(^ l3 l l$P"%4S> %Z3 Z4 Zr   r   )r)   rM   rY   rG   rr   abcr   r   typingr   r   r   r   state.modelsr	   tools.loggerr
   atomicr   shardr   snapshotr   rq   r4   r   r-   rC   r   r   r   <module>r      ss     	   # , , + # $ $ <	I	)# )X., .bM' M`KZ KZr   