o
    +i5                     @  s  d dl mZ d dlmZ d dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlZd dlmZ d dlmZmZ d dlmZmZmZmZmZ d dlmZ d dlmZ d dlmZ d d	lm Z m!Z!m"Z"m#Z# d d
l$m%Z%m&Z&m'Z' er~d dl(m)Z) d dl*m+Z+ e,dZ-ej.e dZ/dRddZ0ej1G dd dZ2	 dSdTd d!Z3	"dUdVd)d*Z4dWd.d/Z5		dXdYd4d5Z6		dXdZd6d7Z7	d[d\d8d9Z8d]d:d;Z9d^d=d>Z:d_dBdCZ;d`dEdFZ<dadIdJZ=	KdbdcdNdOZ>dddPdQZ?dS )e    )annotationsN)	cpu_count)EmptyQueue)TYPE_CHECKINGAnyOptionalUnioncast)schemas)utils)CompressedTraces)_AUTO_SCALE_DOWN_NEMPTY_TRIGGER_AUTO_SCALE_UP_NTHREADS_LIMIT_AUTO_SCALE_UP_QSIZE_TRIGGER	_BOUNDARY)SerializedFeedbackOperationSerializedRunOperation#combine_serialized_queue_operations)Context)Clientzlangsmith.clientmax_workersbatchlist[TracingQueueItem]returnAdict[tuple[Optional[str], Optional[str]], list[TracingQueueItem]]c                 C  s<   ddl m} |t}| D ]}|j|jf}|| | q|S )z4Group batch items by (api_url, api_key) combination.r   )defaultdict)collectionsr   listapi_urlapi_keyappend)r   r   groupeditemkey r&   c/var/www/html/psymed-ai/venv/lib/python3.10/site-packages/langsmith/_internal/_background_thread.py_group_batch_by_api_endpoint)   s   r(   c                   @  sd   e Zd ZU dZded< ded< ded< ded< d	ed
< dZ			ddddZdddZdddZdS )TracingQueueItemzAn item in the tracing queue.

    Attributes:
        priority (str): The priority of the item.
        item (Any): The item itself.
        otel_context (Optional[Context]): The OTEL context of the item.
    strpriority:Union[SerializedRunOperation, SerializedFeedbackOperation]r$   Optional[str]r    r!   Optional[Context]otel_contextr+   r$   r!   r    r/   Nr   Nonec                 C  s"   || _ || _|| _|| _|| _d S Nr0   )selfr+   r$   r!   r    r/   r&   r&   r'   __init__H   s
   
zTracingQueueItem.__init__otherboolc                 C  s   | j | jjf|j |jjfk S r2   )r+   r$   	__class__r3   r5   r&   r&   r'   __lt__V   s   zTracingQueueItem.__lt__objectc                 C  s&   t |to| j| jjf|j|jjfkS r2   )
isinstancer)   r+   r$   r7   r8   r&   r&   r'   __eq__\   s   
zTracingQueueItem.__eq__)NNN)r+   r*   r$   r,   r!   r-   r    r-   r/   r.   r   r1   )r5   r)   r   r6   )r5   r:   r   r6   )	__name__
__module____qualname____doc____annotations__	__slots__r4   r9   r<   r&   r&   r&   r'   r)   6   s   
 
r)   d   Ttracing_queuer   limitintblockr6   max_size_bytesc                 C  s   g }d}zY| j |dd }r%|| |dkr%||j 7 }||kr%|W S 	 z	| j |dd}W n ty:   Y W |S w || |dkrR||j 7 }||krRW |S |r]t||kr]W |S q& tyg   Y |S w )Nr   g      ?)rG   timeoutT皙?)getr"   r$   calculate_serialized_sizer   len)rD   rE   rG   rH   
next_batchcurrent_sizer$   r&   r&   r'   _tracing_thread_drain_queuec   s<   

rP     @clientr   
size_limitsize_limit_bytes
int | None6tuple[Optional[io.BytesIO], Optional[tuple[int, int]]]c                 C  sX  z| j d u r	W dS | j j| | j j}| jp|}|d ur&|dkr&td| |d ur5|dk r5td| |d u s=||k rQ|d u sG| j j|k rQ	 W d    W dS | j jdt d	  | j j
  | j j }| j j}ttt|d| j j ||f}| j   W d    n1 sw   Y  |d ||fW S  ty   tjdd	d
 Y dS w )N)NNr   z!size_limit must be positive; got z*size_limit_bytes must be nonnegative; got z--z--
contextuLangSmith tracing error: Failed to submit trace data.
This does not affect your application's runtime.
Error details:Texc_info)compressed_traceslockuncompressed_size_max_batch_size_bytes
ValueErrortrace_countcompressor_writerwriter   encodeclosebuffertellsetattrr
   r   _contextresetseek	Exceptionloggererror)rR   rS   rT   pre_compressed_sizerO   filled_buffercompressed_traces_infor&   r&   r'   '_tracing_thread_drain_compressed_buffer   sN   



%
rq   batch_to_processlist[tuple[str, dict]]r1   c           	      C  s  zvdd |D }dd |D }| j du rtdt|  |}t|t|kr5tdt| dt| dd |D }||krJtd	| d| t||D ]\\}}}|d
kr_| | qO|dkrh| | qO| jrt| j	  W dS W dS  t
y   tjddd Y dS w )z1Process a batch of run operations asynchronously.c                 S  s   g | ]\}}|qS r&   r&   ).0_run_datar&   r&   r'   
<listcomp>       z3_process_buffered_run_ops_batch.<locals>.<listcomp>c                 S     g | ]}| d qS idrK   rt   runr&   r&   r'   rw          NzAprocess_buffered_run_ops should not be None when processing batchzGprocess_buffered_run_ops must return the same number of runs. Expected z, got c                 S  ry   rz   r|   r}   r&   r&   r'   rw      r   zKprocess_buffered_run_ops must preserve run IDs in the same order. Expected postpatchzLangSmith buffered run ops processing error: Failed to process batch.
This does not affect your application's runtime.
Error details:TrY   )_process_buffered_run_opsRuntimeErrorr   rM   r_   zip_create_run_update_run_data_available_eventsetrk   rl   rm   )	rR   rr   	run_dictsoriginal_idsprocessed_runsprocessed_ids	operationru   processed_runr&   r&   r'   _process_buffered_run_ops_batch   sP   

r   use_multipartmark_task_doneopsJOptional[list[Union[SerializedRunOperation, SerializedFeedbackOperation]]]c                   s  zzZt |}| D ]O\\}}}	|stdd |	D }
ndd |	D   fdd|D }
|r7| j|
||d q
tdd |
D rLtd	 d
d |
D }
| jtt	t
 |
||d q
W n tyk   tjddd Y nw W |r|dur|D ].}z|  W qu ty } zdt|v rtd|  n W Y d}~qud}~ww dS dS dS |r|dur|D ],}z|  W q ty } zdt|v rtd|  n W Y d}~qd}~ww w w w )a  Handle a batch of tracing queue items by sending them to LangSmith.

    Args:
        client: The LangSmith client to use for sending data.
        tracing_queue: The queue containing tracing items (used for task_done calls).
        batch: List of tracing queue items to process.
        use_multipart: Whether to use multipart endpoint for sending data.
        mark_task_done: Whether to mark queue tasks as done after processing.
            Set to False when called from parallel execution to avoid double counting.
        ops: Pre-combined serialized operations to use instead of combining from batch.
            If None, operations will be combined from the batch items.
    c                 S     g | ]}|j qS r&   r$   rt   r$   r&   r&   r'   rw         z0_tracing_thread_handle_batch.<locals>.<listcomp>c                 S  s   h | ]}|j jqS r&   )r$   r{   r   r&   r&   r'   	<setcomp>  rx   z/_tracing_thread_handle_batch.<locals>.<setcomp>c                   s   g | ]	}|j  v r|qS r&   rz   rt   op	group_idsr&   r'   rw         )r    r!   c                 s  s    | ]}t |tV  qd S r2   r;   r   r   r&   r&   r'   	<genexpr>$  s    z/_tracing_thread_handle_batch.<locals>.<genexpr>z;Feedback operations are not supported in non-multipart modec                 S  s   g | ]	}t |ts|qS r&   r   r   r&   r&   r'   rw   (  s    rX   TrY   N!task_done() called too many times3Ignoring harmless task_done error during shutdown: )r(   itemsr   _multipart_ingest_opsanyrl   warning_batch_ingest_run_opsr
   r   r   rk   rm   	task_doner_   r*   debug)rR   rD   r   r   r   r   grouped_batchesr    r!   group_batch	group_opsru   er&   r   r'   _tracing_thread_handle_batch   s~   
r   c           	      C  sf  z|z1|du rt dd |D }dd |D }dd |D }|r1| jdur,| j|| ntd W n tyB   tjdd	d
 Y nw W |rw|dury|D ].}z|  W qL tyv } zdt|v rkt	d|  n W Y d}~qLd}~ww dS dS dS |r|dur|D ],}z|  W q ty } zdt|v rt	d|  n W Y d}~qd}~ww w w w )ad  Handle a batch of tracing queue items by exporting them to OTEL.

    Args:
        client: The LangSmith client containing the OTEL exporter.
        tracing_queue: The queue containing tracing items (used for task_done calls).
        batch: List of tracing queue items to process.
        mark_task_done: Whether to mark queue tasks as done after processing.
            Set to False when called from parallel execution to avoid double counting.
        ops: Pre-combined serialized operations to use instead of combining from batch.
            If None, operations will be combined from the batch items.
    Nc                 S  r   r&   r   r   r&   r&   r'   rw   `  r   z5_otel_tracing_thread_handle_batch.<locals>.<listcomp>c                 S  s   g | ]	}t |tr|qS r&   )r;   r   r   r&   r&   r'   rw   b  r   c                 S  s$   i | ]}t |jtr|jj|jqS r&   )r;   r$   r   r{   r/   r   r&   r&   r'   
<dictcomp>c  s    

z5_otel_tracing_thread_handle_batch.<locals>.<dictcomp>zLangSmith tracing error: Failed to submit OTEL trace data.
This does not affect your application's runtime.
Error details: client.otel_exporter is NonezpOTEL tracing error: Failed to submit trace data.
This does not affect your application's runtime.
Error details:TrY   r   r   )
r   otel_exporterexport_batchrl   rm   rk   r   r_   r*   r   )	rR   rD   r   r   r   run_opsotel_context_mapru   r   r&   r&   r'   !_otel_tracing_thread_handle_batchJ  sf   

r   c                 C  sd  t dd |D }t|}t|}z5tjdd%}|t| |||d|}	|t| ||d|}
|	  |
  W d   n1 sBw   Y  W n0 t	yx } z$dt
|v rmtd t| |||d| t| ||d| n W Y d}~nd}~ww |r|dur|D ].}z|  W q ty } zd	t
|v rtd
|  n W Y d}~qd}~ww dS dS dS )a  Handle a batch of tracing queue items by sending to both both LangSmith and OTEL.

    Args:
        client: The LangSmith client to use for sending data.
        tracing_queue: The queue containing tracing items (used for task_done calls).
        batch: List of tracing queue items to process.
        use_multipart: Whether to use multipart endpoint for LangSmith.
        mark_task_done: Whether to mark queue tasks as done after processing.
            Set to False primarily for testing when items weren't actually queued.
    c                 S  r   r&   r   r   r&   r&   r'   rw     r   z7_hybrid_tracing_thread_handle_batch.<locals>.<listcomp>   r   FNz6cannot schedule new futures after interpreter shutdownz@Interpreter shutting down, falling back to sequential processingr   r   )r   copydeepcopycfThreadPoolExecutorsubmitr   r   resultr   r*   rl   r   r   r_   )rR   rD   r   r   r   r   langsmith_opsotel_opsexecutorfuture_langsmithfuture_otelr   ru   r&   r&   r'   #_hybrid_tracing_thread_handle_batch  sp   

	


r   c              
   C  s   t | dr
| jdu rdS z3tdsW dS ddlm} | }t |dr=t |jdr=|jj	d	d}t
d
|  |W S W dS  ty[ } zt
d| d W Y d}~dS d}~ww )zCheck if client is using LangSmith's internal OTLP provider.

    Returns True if using LangSmith's internal provider, False if user
    provided their own.
    r   NFOTEL_ENABLEDr   )traceresource
attributeszlangsmith.internal_providerz;TracerProvider resource check: langsmith.internal_provider=z)Could not determine TracerProvider type: z, assuming user-provided)hasattrr   ls_utilsis_env_var_truishopentelemetryr   get_tracer_providerr   r   rK   rl   r   rk   )rR   r   tracer_provideris_internalr   r&   r&   r'    _is_using_internal_otlp_provider  s6   

r   Optional[int]c                  C  sH   t d} | d ur"zt| W S  ty!   td|  d Y d S w d S )NBATCH_INGEST_SIZE_LIMITz+Invalid value for BATCH_INGEST_SIZE_LIMIT: z, continuing with default)r   get_env_varrF   r_   rl   r   )size_limit_strr&   r&   r'   get_size_limit_from_env  s   

r   infols_schemas.LangSmithInfols_schemas.BatchIngestConfigc                 C  sb   t jdd dtttd}| s|S z| js|W S t }|d ur"|| jd< | jW S  ty0   | Y S w )NFrC   )use_multipart_endpointrT   rS   scale_up_nthreads_limitscale_up_qsize_triggerscale_down_nempty_triggerrS   )
ls_schemasBatchIngestConfigr   r   r   batch_ingest_configr   BaseException)r   default_configenv_size_limitr&   r&   r'   _ensure_ingest_config  s(   
r   tuple[bool, bool]c                  C  s.   t d} t d}| sdS | }|}||fS )an  Get the current tracing mode configuration.

    Returns:
        tuple[bool, bool]:
            - hybrid_otel_and_langsmith: True if both OTEL and LangSmith tracing
              are enabled, which is default behavior if OTEL_ENABLED is set to
              true and OTEL_ONLY is not set to true
            - is_otel_only: True if only OTEL tracing is enabled
    r   	OTEL_ONLY)FF)r   r   )otel_enabled	otel_onlyhybrid_otel_and_langsmithis_otel_onlyr&   r&   r'   get_tracing_mode(  s   


r   
client_refweakref.ref[Client]c                   s`  |    d u r	d S  j }|d usJ t j}|d }|d }|d }|dd}g dtdp6 jd u}|sj|rj jjp@i ddsKt	d	 nt
  _t  _t  _tjtt
 fd
  d7 d fdd}| rڈD ]}	|	 s|	 qxt|k r| |krtjtt
 |fd
}
|
 |
  t \}} jp|dpd}t|||d }r|rt ||| n|rt  || nt! ||| | svt \}} jp|dpd}t||d|d }r)|rt"d t ||| n|rt"d t  || nt"d t! ||| t||d|d }st"d d S )NrS   r   r   r   F   DISABLE_RUN_COMPRESSIONzstd_compression_enabledz~Run compression is not enabled. Please update to the latest version of LangSmith. Falling back to regular multipart ingestion.)targetargs   r   r6   c                    sr    r
t  dr jrtd dS t  std dS t tdr7t t	 k} | s5td | S dS )N_manual_cleanupz3Client is being cleaned up, stopping tracing threadFz,Main thread is dead, stopping tracing threadgetrefcountzeClient refs count indicates we're the only remaining reference to the client, stopping tracing threadT)
r   r   rl   r   	threadingmain_threadis_alivesysr   rM   should_keep_threadrR   num_known_refssub_threadsr&   r'   keep_thread_activeg  s(   


z7tracing_control_thread_func.<locals>.keep_thread_activerT   r   rE   rH   rE   rG   rH   zHybrid mode cleanupOTEL-only cleanupLangSmith-only cleanupz'Tracing control thread is shutting downr   r6   )#rD   r   r   rK   r   r   r   instance_flagsrl   r   weakrefWeakSet_futuresr   r[   r   Eventr   Thread-tracing_control_thread_func_compress_parallelrefstartr   removerM   qsize_tracing_sub_thread_funcr"   r   r^   rP   r   r   r   r   )r   rD   r   rS   r   r   r   disable_compressionr   thread
new_threadr   r   max_batch_sizerN   r&   r   r'   tracing_control_thread_func?  s   









'



r        ?flush_intervalfloatc                   s
  |    d u r	d S t d  jd u s jd u s jd u r$t d d S t j}|d } jp5|	dd}dd fd	d
}t
 }	  jjdd}| sPnp|r j  t ||\}}	|d urzt j||	}
 j|
 W n ty    ||	 Y nw t
 }n7t
 | |krt ddd\}}	|d urztt j||	g W n ty    ||	 Y nw t
 }qEz-t ddd\}}	|d urztt j||	g W n ty    ||	 Y nw W n ty   t jddd Y nw t d d S )Nz4Tracing control thread func compress parallel calledzLangSmith tracing error: Required compression attributes not initialized.
This may affect trace submission but does not impact your application's runtime.rS   rT   rQ      r   r6   c                    sj    r
t  dr jrtd dS t  std dS t tdr3t k} | s1td | S dS )Nr   z7Client is being cleaned up, stopping compression threadFz0Main thread is dead, stopping compression threadr   ziClient refs count indicates we're the only remaining reference to the client, stopping compression threadT)	r   r   rl   r   r   r   r   r   r   r   rR   r   r&   r'   r     s$   


zItracing_control_thread_func_compress_parallel.<locals>.keep_thread_activeTrJ   )rI   r   )rS   rT   zuLangSmith tracing error: Failed during final cleanup.
This does not affect your application's runtime.
Error details:rY   z1Compressed traces control thread is shutting downr   )rl   r   r[   r   r  rm   r   r   r^   rK   time	monotonicwaitclearrq   LANGSMITH_CLIENT_THREAD_POOLr   _send_compressed_multipart_reqaddr   r   rk   )r   r  r   rS   rT   r   last_flush_time	triggereddata_streamrp   futurefinal_data_streamr&   r  r'   r    s   







	7	
r  c              
   C  s  |  }|d u r	d S z|j sW d S W n ty* } ztd| W Y d }~d S d }~ww |j}|d us4J t|j }|dd}d}t 	 r||d kr|j
pV|dpVd}t|||d }	rd}t \}
}|
rqt|||	| n|rzt|||	 nt|||	| n|d7 }t 	 r||d ksMt \}
}|j
p|dpd}t||d	|d
 }	r|
rt|||	| n|rtd t|||	 ntd t|||	| t||d	|d
 }	std d S )Nz#Error in tracing control thread: %srS   rC   r   r   rT   r   r   Fr   r   r   z+Tracing control sub-thread is shutting down)r   r   rl   r   rD   r   rK   r   r   r   r^   rP   r   r   r   r   )r   r   rR   r   rD   r   rS   seen_successive_empty_queuesr  rN   r   r   r&   r&   r'   r  W  s   









r  )r   r   r   r   )rC   Tr   )
rD   r   rE   rF   rG   r6   rH   rF   r   r   )rC   rQ   )rR   r   rS   rF   rT   rU   r   rV   )rR   r   rr   rs   r   r1   )TN)rR   r   rD   r   r   r   r   r6   r   r6   r   r   r   r1   )rR   r   rD   r   r   r   r   r6   r   r   r   r1   )T)rR   r   rD   r   r   r   r   r6   r   r6   r   r1   )rR   r   r   r6   )r   r   )r   r   r   r   )r   r   )r   r   r   r1   )r  )r   r   r  r  r   r1   )r   r   r   r6   r   r1   )@
__future__r   concurrent.futuresfuturesr   r   	functoolsiologgingr   r   r  r  multiprocessingr   queuer   r   typingr   r   r   r	   r
   	langsmithr   r   r   r   &langsmith._internal._compressed_tracesr   langsmith._internal._constantsr   r   r   r   langsmith._internal._operationsr   r   r   opentelemetry.context.contextr   langsmith.clientr   	getLoggerrl   r   r  r(   total_orderingr)   rP   rq   r   r   r   r   r   r   r   r   r  r  r  r&   r&   r&   r'   <module>   sb    

-/
97RD
Q
'


 	 