
    :j                         d Z ddlZddlZddlZddlZddlZddlmZmZm	Z	m
Z
 ddlmZ ddlmZ ddlmZmZ  ed      Z G d	 d
      Zy)z1
NDJSON shard writer with rotation and indexing.
    N)AnyDictListOptional)PersistenceMetrics)
get_logger   )AtomicFileWriter_exclusive_file_lockstoragec            	           e Zd ZdZddedededefdZdefdZd	edefd
Zdd	edededdfdZ	dde
eeef      dee   ddfdZy)NDJSONShardWriterzBWrite NDJSON records to rotating shard files with sidecar indexes.
shard_rootresult_type	max_linesmax_age_secc                 H   || _         || _        || _        || _        t	        j
                         | _        d | _        d| _        t        j                         | _
        t        j                  j                  ||      | _        t        j                  | j                  d       y )Nr   T)exist_ok)r   r   r   r   	threadingLock_lock_current_path_current_linestime_current_start_timeospathjoin	shard_dirmakedirs)selfr   r   r   r   s        4/root/.openclaw/workspace/harvester/storage/shard.py__init__zNDJSONShardWriter.__init__   sv    $&"&^^%
,0#'99;  j+>
DNNT2    returnc                    t        j                          }| j                  du xs7 | j                  | j                  k\  xs || j                  z
  | j
                  k\  }|rt        j                  j                         j                  d      dd }| j                   d| d}t        j                  j                  | j                  |      | _        d| _        || _        | j                  S )z:Ensure current shard file exists and is valid for writing.Nz%Y%m%d_%H%M%S_%f_z.ndjsonr   )r   r   r   r   r   r   datetimenowstrftimer   r   r   r   r   )r!   r*   needs_rotation	timestampfilenames        r"   _ensure_currentz!NDJSONShardWriter._ensure_current%   s    iik$& D""dnn4Dd...43C3CC 	  ))--/889KLSbQI**+1YKw?H!#dnnh!GD"#D'*D$!!!r$   
shard_pathc                 P    t         j                  j                  |      \  }}|dz   S )Nz.index.json)r   r   splitext)r!   r0   baser(   s       r"   _get_index_pathz!NDJSONShardWriter._get_index_path7   s%    ''"":.am##r$   added_lines	bad_linesNc                 F   | j                  |      }i }	 t        |dd      5 }t        j                  |      }d d d        t
        j
                  j                  t
        j                  j                        j                         j                  dd      }d|vr||d<   ||d<   t        |j                  dd	            }t        |j                  d
d	            }	|t        |      z   |d<   |	t        |      z   |d
<   d|d<   t        j                  j                  |      |d<   	 t        j                  j!                  |      |d<   t        j"                  |dd      }
t%        j&                  ||
       y # 1 sw Y   >xY w# t        $ r i }Y Ow xY w# t        $ r Y Yw xY w)Nrutf-8encodingz+00:00Zfirst_tslast_tslinesr   r6   z1.0schema_versionfile	file_size   F)indentensure_ascii)r4   openjsonload	Exceptionr)   r*   timezoneutc	isoformatreplaceintgetr   r   basenamegetsizedumpsr
   write_atomic)r!   r0   r5   r6   idx_pathidxfnow_iso
prev_linesprev_badcontents              r"   _update_indexzNDJSONShardWriter._update_index;   s   ''
3 	hg6 #!iil# ##''(9(9(=(=>HHJRRS[]`aS %C
O I !,-
sww{A./!C$44G#c)n4K !&gg&&z2F	!wwz:C **S?%%h85# # 	C	(  		s9   F E5F $"F 5E?:F FF	F F recordsstatsc                 L   |syt        j                          }| j                  5  | j                         }t        |dd      5 }t	        |      5  |D ]P  }t        j                  |d      }|j                  |       |j                  d       | xj                  dz  c_        R |j                          t        j                  |j                                ddd       ddd       	 | j                  |t        |             ddd       |rBt        j                          |z
  }	|xj$                  |	z  c_        |xj&                  dz  c_        yy# 1 sw Y   zxY w# 1 sw Y   ~xY w# t        $ r%}t         j#                  d	| d
|        Y d}~d}~ww xY w# 1 sw Y   xY w)z;Append a list of JSON-serializable records as NDJSON lines.Nar9   r:   F)rE   
r	   z%[storage] failed to update index for z: )r   r   r/   rF   r   rG   rR   writer   flushr   fsyncfilenor[   lenrI   loggererrortotal_append_timeappend_operations)
r!   r\   r]   t0r   rV   reclineedts
             r"   append_recordsz NDJSONShardWriter.append_records[   sj   YY[ZZ 	R'')DdC'2 %a9Ma9P %" -C::c>DGGDMGGDM''1,'	-
 	$% %R""4W6	R$ r!B##r)###q(# % % % %  RDTF"QCPQQR	R 	Rs`   FEB	EE F)E)EEE&	"F)	F2FFFFF#)i'  i  )r   )N)__name__
__module____qualname____doc__strrN   r#   r/   r4   r[   r   r   r   r   r   ro    r$   r"   r   r      s    L33 3S 3S 3_b 3" "$$# $# $9 9# 9# 9VZ 9@)d4S>&: )8L^C_ )ko )r$   r   )rs   r)   rG   r   r   r   typingr   r   r   r   state.modelsr   tools.loggerr   atomicr
   r   rf   r   ru   r$   r"   <module>rz      s@      	   , , + # :	I	b) b)r$   