o
    +i                     @  sp  d Z ddlmZ ddlZddlmZ ddlZddlZddl	Z	ddl
Z
ddlmZmZmZmZmZ ddlmZmZmZmZmZmZmZmZ ddlZddlmZ ddlmZmZ ddlmZ dd	lm Z! dd
l"m#Z$ ddl%m&Z& ddl'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z.m/Z/m0Z0m1Z1m2Z2m3Z3m4Z4m5Z5m6Z6m7Z7m8Z8m9Z9m:Z:m;Z; ddl<m=Z=m>Z>m?Z?m@Z@ erddlAZBddlCmDZD eBjEZEneZEeFeGZHeeeIgeeI f eeIeIgeeI f f ZJ													d\d]d0d1ZK						2	d^d_d5d6ZL												d`dad9d:ZMG d;d< d<e-ZNG d=d/ d/ZO	2	dbdcdHdIZPdddKdLZQd2dMdedOdPZRedQZSdfdUdVZTdgdZd[ZUdS )hzV2 Evaluation Interface.    )annotationsN)AsyncIterableAsyncIterator	AwaitableIterableSequence)TYPE_CHECKINGAnyCallableLiteralOptionalTypeVarUnioncast)run_helpers)	run_treesschemas)r   )utils)_aiter)
_warn_once)AEVALUATOR_TDATA_TEVALUATOR_TExperimentResultRow_evaluators_include_attachments_ExperimentManagerMixin_extract_feedback_keys_ForwardResults_get_target_args_is_langchain_runnable_load_examples_map_load_experiment
_load_tqdm_load_traces_resolve_data_resolve_evaluators_resolve_experiment_target_include_attachments
_to_pandas_wrap_summary_evaluators)SUMMARY_EVALUATOR_TEvaluationResultEvaluationResultsRunEvaluator)Runnable   TlogdataNUnion[DATA_T, AsyncIterable[schemas.Example], Iterable[schemas.Example], None]
evaluators4Optional[Sequence[Union[EVALUATOR_T, AEVALUATOR_T]]]summary_evaluators'Optional[Sequence[SUMMARY_EVALUATOR_T]]metadataOptional[dict]experiment_prefixOptional[str]descriptionmax_concurrencyOptional[int]num_repetitionsintclientOptional[langsmith.Client]blockingbool
experiment6Optional[Union[schemas.TracerSession, str, uuid.UUID]]upload_resultserror_handlingLiteral['log', 'ignore']targetVUnion[ATARGET_T, AsyncIterable[dict], Runnable, str, uuid.UUID, schemas.TracerSession]kwargsr	   returnAsyncExperimentResultsc                  sd  t | ttjtjfr]|dkt|| t|t|d}t| r5dt	dd |
 D  d}t|t | ttjfr?| n| j}td| d t| f|||||	|
d	|I d
H S t | tt	frjd}t||rvd| d}t||s~d}t||r|rd| d| }t||std td|  d t| |||||||||	|
|||dI d
H S )a  Evaluate an async target system on a given dataset.

    Args:
        target (AsyncCallable[[dict], dict] | AsyncIterable[dict] | Runnable | EXPERIMENT_T | Tuple[EXPERIMENT_T, EXPERIMENT_T]):
            The target system or experiment(s) to evaluate.

            Can be an async function that takes a `dict` and returns a `dict`, a
            langchain `Runnable`, an existing experiment ID, or a two-tuple of experiment IDs.
        data (Union[DATA_T, AsyncIterable[schemas.Example]]): The dataset to evaluate on.

            Can be a dataset name, a list of examples, an async generator of examples, or an async iterable of examples.
        evaluators (Optional[Sequence[EVALUATOR_T]]): A list of evaluators to run
            on each example.
        summary_evaluators (Optional[Sequence[SUMMARY_EVALUATOR_T]]): A list of summary
            evaluators to run on the entire dataset.
        metadata (Optional[dict]): Metadata to attach to the experiment.
        experiment_prefix (Optional[str]): A prefix to provide for your experiment name.
        description (Optional[str]): A description of the experiment.
        max_concurrency (int | None): The maximum number of concurrent
            evaluations to run.

            If `None` then no limit is set. If `0` then no concurrency.
        num_repetitions (int): The number of times to run the evaluation.
            Each item in the dataset will be run and evaluated this many times.
        client (Optional[langsmith.Client]): The LangSmith client to use.
        blocking (bool): Whether to block until the evaluation is complete.
        experiment (Optional[schemas.TracerSession]): An existing experiment to
            extend.

            If provided, `experiment_prefix` is ignored. For advanced usage only.
        error_handling (str, default="log"): How to handle individual run errors.

            `'log'` will trace the runs with the error message as part of the
            experiment, `'ignore'` will not count the run as part of the experiment at
            all.

    Returns:
        An async iterator over the experiment results.

    Environment:
        - `LANGSMITH_TEST_CACHE`: If set, API calls will be cached to disk to save time and
            cost during testing.

            Recommended to commit the cache files to your repository for faster CI/CD runs.

            Requires the `'langsmith[vcr]'` package to be installed.

    Examples:
        >>> from typing import Sequence
        >>> from langsmith import Client, aevaluate
        >>> from langsmith.schemas import Example, Run
        >>> client = Client()
        >>> dataset = client.clone_public_dataset(
        ...     "https://smith.langchain.com/public/419dcab2-1d66-4b94-8901-0357ead390df/d"
        ... )
        >>> dataset_name = "Evaluate Examples"

        Basic usage:

        >>> def accuracy(run: Run, example: Example):
        ...     # Row-level evaluator for accuracy.
        ...     pred = run.outputs["output"]
        ...     expected = example.outputs["answer"]
        ...     return {"score": expected.lower() == pred.lower()}

        >>> def precision(runs: Sequence[Run], examples: Sequence[Example]):
        ...     # Experiment-level evaluator for precision.
        ...     # TP / (TP + FP)
        ...     predictions = [run.outputs["output"].lower() for run in runs]
        ...     expected = [example.outputs["answer"].lower() for example in examples]
        ...     # yes and no are the only possible answers
        ...     tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"])
        ...     fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)])
        ...     return {"score": tp / (tp + fp)}

        >>> import asyncio
        >>> async def apredict(inputs: dict) -> dict:
        ...     # This can be any async function or just an API call to your app.
        ...     await asyncio.sleep(0.1)
        ...     return {"output": "Yes"}
        >>> results = asyncio.run(
        ...     aevaluate(
        ...         apredict,
        ...         data=dataset_name,
        ...         evaluators=[accuracy],
        ...         summary_evaluators=[precision],
        ...         experiment_prefix="My Experiment",
        ...         description="Evaluate the accuracy of the model asynchronously.",
        ...         metadata={
        ...             "my-prompt-version": "abcd-1234",
        ...         },
        ...     )
        ... )  # doctest: +ELLIPSIS
        View the evaluation results for experiment:...

        Evaluating over only a subset of the examples using an async generator:

        >>> async def example_generator():
        ...     examples = client.list_examples(dataset_name=dataset_name, limit=5)
        ...     for example in examples:
        ...         yield example
        >>> results = asyncio.run(
        ...     aevaluate(
        ...         apredict,
        ...         data=example_generator(),
        ...         evaluators=[accuracy],
        ...         summary_evaluators=[precision],
        ...         experiment_prefix="My Subset Experiment",
        ...         description="Evaluate a subset of examples asynchronously.",
        ...     )
        ... )  # doctest: +ELLIPSIS
        View the evaluation results for experiment:...

        Streaming each prediction to more easily + eagerly debug.

        >>> results = asyncio.run(
        ...     aevaluate(
        ...         apredict,
        ...         data=dataset_name,
        ...         evaluators=[accuracy],
        ...         summary_evaluators=[precision],
        ...         experiment_prefix="My Streaming Experiment",
        ...         description="Streaming predictions for debugging.",
        ...         blocking=False,
        ...     )
        ... )  # doctest: +ELLIPSIS
        View the evaluation results for experiment:...

        >>> async def aenumerate(iterable):
        ...     async for elem in iterable:
        ...         print(elem)
        >>> asyncio.run(aenumerate(results))

        Running without concurrency:

        >>> results = asyncio.run(
        ...     aevaluate(
        ...         apredict,
        ...         data=dataset_name,
        ...         evaluators=[accuracy],
        ...         summary_evaluators=[precision],
        ...         experiment_prefix="My Experiment Without Concurrency",
        ...         description="This was run without concurrency.",
        ...         max_concurrency=0,
        ...     )
        ... )  # doctest: +ELLIPSIS
        View the evaluation results for experiment:...

        Using Async evaluators:

        >>> async def helpfulness(run: Run, example: Example):
        ...     # Row-level evaluator for helpfulness.
        ...     await asyncio.sleep(5)  # Replace with your LLM API call
        ...     return {"score": run.outputs["output"] == "Yes"}

        >>> results = asyncio.run(
        ...     aevaluate(
        ...         apredict,
        ...         data=dataset_name,
        ...         evaluators=[helpfulness],
        ...         summary_evaluators=[precision],
        ...         experiment_prefix="My Helpful Experiment",
        ...         description="Applying async evaluators example.",
        ...     )
        ... )  # doctest: +ELLIPSIS
        View the evaluation results for experiment:...


    !!! warning "Behavior changed in `langsmith` 0.2.0"

        'max_concurrency' default updated from None (no limit on concurrency)
        to 0 (no concurrency at all).
    r/   )r>   rD   rF   r9   r1   zReceived invalid arguments. c                 s  s    | ]	\}}|r|V  qd S N ).0kvrO   rO   Z/var/www/html/psymed-ai/venv/lib/python3.10/site-packages/langsmith/evaluation/_arunner.py	<genexpr>      zaevaluate.<locals>.<genexpr>z? should not be specified when target is an existing experiment.z,Running evaluation over existing experiment z...)r3   r5   r7   r<   r@   rB   NzRunning a comparison of two existing experiments asynchronously is not currently supported. Please use the `evaluate()` method instead and make sure that your evaluators are defined as synchronous functions.zReceived unsupported arguments zC. These arguments are not supported when creating a new experiment.zDMust specify 'data' when running evaluations over a target function.zeExpected at most one of 'experiment' or 'experiment_prefix', but both were provided. Got: experiment=z, experiment_prefix=z&'upload_results' parameter is in beta.z&Running evaluation over target system )r1   r3   r5   r7   r9   r;   r<   r>   r@   rB   rD   rF   rG   )
isinstancestruuidUUIDr   TracerSessionrC   anyvaluestupleitems
ValueErroridloggerdebugaevaluate_existinglistr   
_aevaluate)rI   r1   r3   r5   r7   r9   r;   r<   r>   r@   rB   rD   rF   rG   rK   invalid_argsmsg	target_idrO   rO   rS   	aevaluateJ   s    D

ri   Fload_nested,Union[str, uuid.UUID, schemas.TracerSession]c                  s   |pt  }t| tjr| n	tt| |I dH }tjt| ||dI dH }	tt	||I dH   fdd|	D }
t
|	|
|||||||d	I dH S )a  Evaluate existing experiment runs asynchronously.

    Args:
        experiment (Union[str, uuid.UUID]): The identifier of the experiment to evaluate.
        evaluators (Optional[Sequence[EVALUATOR_T]]): Optional sequence of evaluators to use for individual run evaluation.
        summary_evaluators (Optional[Sequence[SUMMARY_EVALUATOR_T]]): Optional sequence of evaluators
            to apply over the entire dataset.
        metadata (Optional[dict]): Optional metadata to include in the evaluation results.
        max_concurrency (int | None): The maximum number of concurrent
            evaluations to run.

            If `None` then no limit is set. If `0` then no concurrency.
        client (Optional[langsmith.Client]): Optional Langsmith client to use for evaluation.
        load_nested: Whether to load all child runs for the experiment.

            Default is to only load the top-level root runs.
        blocking (bool): Whether to block until evaluation is complete.

    Returns:
        An async iterator over the experiment results.

    Examples:
        Define your evaluators

        >>> from typing import Sequence
        >>> from langsmith.schemas import Example, Run
        >>> def accuracy(run: Run, example: Example):
        ...     # Row-level evaluator for accuracy.
        ...     pred = run.outputs["output"]
        ...     expected = example.outputs["answer"]
        ...     return {"score": expected.lower() == pred.lower()}
        >>> def precision(runs: Sequence[Run], examples: Sequence[Example]):
        ...     # Experiment-level evaluator for precision.
        ...     # TP / (TP + FP)
        ...     predictions = [run.outputs["output"].lower() for run in runs]
        ...     expected = [example.outputs["answer"].lower() for example in examples]
        ...     # yes and no are the only possible answers
        ...     tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"])
        ...     fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)])
        ...     return {"score": tp / (tp + fp)}

        Load the experiment and run the evaluation.

        >>> import asyncio
        >>> import uuid
        >>> from langsmith import Client, aevaluate, aevaluate_existing
        >>> client = Client()
        >>> dataset_name = "__doctest_aevaluate_existing_" + uuid.uuid4().hex[:8]
        >>> dataset = client.create_dataset(dataset_name)
        >>> example = client.create_example(
        ...     inputs={"question": "What is 2+2?"},
        ...     outputs={"answer": "4"},
        ...     dataset_id=dataset.id,
        ... )
        >>> async def apredict(inputs: dict) -> dict:
        ...     await asyncio.sleep(0.001)
        ...     return {"output": "4"}
        >>> results = asyncio.run(
        ...     aevaluate(
        ...         apredict, data=dataset_name, experiment_prefix="doctest_experiment"
        ...     )
        ... )  # doctest: +ELLIPSIS
        View the evaluation results for experiment:...
        >>> experiment_id = results.experiment_name
        >>> # Consume all results to ensure evaluation is complete
        >>> async def consume_results():
        ...     result_list = [r async for r in results]
        ...     return len(result_list) > 0
        >>> asyncio.run(consume_results())
        True
        >>> import time
        >>> time.sleep(3)
        >>> results = asyncio.run(
        ...     aevaluate_existing(
        ...         experiment_id,
        ...         evaluators=[accuracy],
        ...         summary_evaluators=[precision],
        ...     )
        ... )  # doctest: +ELLIPSIS
        View the evaluation results for experiment:...
        >>> client.delete_dataset(dataset_id=dataset.id)


    N)rj   c                   s   g | ]} |j  qS rO   )reference_example_id)rP   rundata_maprO   rS   
<listcomp>      z&aevaluate_existing.<locals>.<listcomp>)r1   r3   r5   r7   r<   r@   rB   rD   )r   get_cached_clientrV   r   rZ   
aitertoolsaio_to_threadr!   r#   r    re   )rD   r3   r5   r7   r<   r@   rj   rB   projectrunsr1   rO   rn   rS   rc   U  s,   _
rc   -Union[DATA_T, AsyncIterable[schemas.Example]]FUnion[ATARGET_T, AsyncIterable[dict], Iterable[schemas.Run], Runnable]c                  s  t | pt| dot |  pt| }|	pt }	|r d ntt	t
j | }tt|||	I d H \}}tt| t| }t||	||pF|||||dk|| dk||d I d H }td }|d urv| I d H }t|| d }nd }tj||	jgdX |r|r|jtt| ||dI d H }n|jtt| |dI d H }|r||I d H }n|r|j||dI d H }|r||I d H }t |}|
r|! I d H  |W  d    S 1 sw   Y  d S )N	__aiter__r   r/   )
r@   r7   rD   r;   r>   rv   include_attachmentsreuse_attachmentsrF   rG   z.yaml)ignore_hostsr<   )"asyncioiscoroutinefunctionhasattriscoroutinery   r   rtrr   r   r   r   Runrs   rt   r&   r?   r'   r   _AsyncExperimentManagerastartls_utilsget_cache_dirget_dataset_idpathlibPathwith_optional_cacheapi_url awith_predictions_and_evaluators	ATARGET_Tawith_predictionsawith_summary_evaluatorsawith_evaluatorsrM   wait)rI   r1   r3   r5   r7   r9   r;   r<   r>   r@   rB   rD   rF   rG   is_async_targetrv   experiment_num_include_attachmentsmanager	cache_dirdsid
cache_pathresultsrO   rO   rS   re     s|   



$re   c                      s(  e Zd ZdZ													dhdi fdd Zdjd$d%Zdkd'd(Zdld*d+Zdmd-d.Zdnd0d1Z	dod2d3Z
djd4d5Z	dpdqd<d=Z	dpdrd>d?Zdd@dsdAdBZdtdEdFZdudHdIZdvdKdLZ		dwdxdNdOZ	dpdydQdRZdzdWdXZd{dYdZZd|d[d\Zd}d^d_Zd~dadbZddfdgZ  ZS )r   a  Manage the execution of experiments asynchronously.

    Supports lazily running predictions and evaluations in parallel to facilitate
    result streaming and early debugging.

    Args:
        data (DATA_T): The data used for the experiment. Can be a dataset name or ID OR
            a generator of examples.
        runs (Optional[Iterable[schemas.Run]]): The runs associated with the experiment
            predictions.
        experiment (Optional[schemas.TracerSession]): The tracer session
            associated with the experiment.
        experiment_prefix (Optional[str]): The prefix for the experiment name.
        description (Optional[str]): The description for the experiment.
        metadata (Optional[dict]): Additional metadata for the experiment.
        client (Optional[langsmith.Client]): The Langsmith client used for
             the experiment.
        evaluation_results (Optional[Iterable[EvaluationResults]]): The evaluation
            sresults for the experiment.
        summary_results (Optional[Iterable[EvaluationResults]]): The aggregate results
            for the experiment.
        num_repetitions (Optional[int], default=1): The number of repetitions for
            the experiment.
        include_attachments (Optional[bool], default=False): Whether to include
            attachments. This is used for when we pull the examples for the experiment.
        reuse_attachments (Optional[bool], default=False): Whether to reuse attachments
            from examples. This is True if we need to reuse attachments across multiple
            target/evaluator functions.
        upload_results (Optional[bool], default=True): Whether to upload results
            to Langsmith.
        attachment_raw_data_dict (Optional[dict]): A dictionary to store raw data
            for attachments. Only used if we reuse attachments across multiple
            target/evaluator functions.
        error_handling (str, default="log"): How to handle individual run errors.

            `'log'` will trace the runs with the error message as part of the
            experiment, `'ignore'` will not count the run as part of the experiment at
            all.
    Nr/   FTr0   rD   +Optional[Union[schemas.TracerSession, str]]r7   r8   rv   BOptional[Union[Iterable[schemas.Run], AsyncIterable[schemas.Run]]]r@   rA   evaluation_results*Optional[AsyncIterable[EvaluationResults]]summary_resultsr;   r:   r>   r?   rz   rC   r{   rF   attachment_raw_data_dictrG   rH   r1   rw   c                  sl   t  j||||d || _d | _|d urt|nd | _|| _|| _|	| _	|
| _
|| _|| _|| _|| _d S )N)rD   r7   r@   r;   )super__init___data	_examplesrs   ensure_async_iterator_runs_evaluation_results_summary_results_num_repetitions_include_attachments_reuse_attachments_upload_results_attachment_raw_data_dict_error_handling)selfr1   rD   r7   rv   r@   r   r   r;   r>   rz   r{   rF   r   rG   	__class__rO   rS   r   C  s$   
z _AsyncExperimentManager.__init__exampleschemas.ExamplerL   c                 C  s   t |dr|js
|S i }|j D ]/\}}| jdur<t|j| | jv r<|d t| jt|j|  |d d||< q|||< qtj	|j|j
|j|j|j|j|j|j||j|jdS )a  Reset attachment readers for an example.

        This is only in the case that an attachment is going to be used by more
        than 1 callable (target + evaluators). In that case we keep a single copy
        of the attachment data in self._attachment_raw_data_dict, and create
        readers from that data. This makes it so that we don't have to keep
        copies of the same data in memory, instead we can just create readers
        from the same data.
        attachmentsNpresigned_url	mime_typer   readerr   r`   
created_at
dataset_idinputsoutputsr7   modified_atsource_run_idr   	_host_url
_tenant_id)r   r   r^   r   rW   r`   ioBytesIOr   Exampler   r   r   r   r7   r   r   r   r   )r   r   new_attachmentsname
attachmentrO   rO   rS   _reset_example_attachmentsi  s4   


z2_AsyncExperimentManager._reset_example_attachmentsAsyncIterator[schemas.Example]c                   s   j d u rMtjjjd_ jr-jd u r-tj \}_ dd |2 I d H _j	dkrMdd j 2 I d H  t
 fddtj	D _ tjtj dt d	\_ }|S )
Nr@   rz   c                   sH   i | z3 d H W }|j pi  D ]\}}t|j| |d  qq6 S )Nr   )r   r^   rW   r`   read)rP   er   valuerO   rO   rS   
<dictcomp>  s    
z9_AsyncExperimentManager.aget_examples.<locals>.<dictcomp>r/   c                   s   g | z3 d H W }|q6 S rN   rO   rP   r   rO   rO   rS   rp     rU   z9_AsyncExperimentManager.aget_examples.<locals>.<listcomp>c                   s"   g | ]}t fd d D qS )c                   s   g | ]}  |qS rO   )r   r   r   rO   rS   rp     s    zD_AsyncExperimentManager.aget_examples.<locals>.<listcomp>.<listcomp>)async_iter_from_list)rP   _examples_listr   rO   rS   rp     s    
   lock)r   _aresolve_datar   r@   r   r   r   rs   ateer   async_chain_from_iterableranger   r~   Lock)r   examples_copyexamples_iterrO   r   rS   aget_examples  s.   

z%_AsyncExperimentManager.aget_examplesrW   c                   sZ   | j d u st| j dd s't|  I d H I d H }|d u r"tdt|jS t| j jS )Nreference_dataset_idz!No examples found in the dataset.)	_experimentgetattrrs   py_anextr   r_   rW   r   r   )r   r   rO   rO   rS   r     s   
z&_AsyncExperimentManager.get_dataset_idAsyncIterator[schemas.Run]c                 C sT   | j d u r
tdtjt| j dt d\| _ }|2 z	3 d H W }|V  q6 d S )NzRuns not loaded yet.r   r   )r   r_   rs   r   r   r~   r   )r   rv   rm   rO   rO   rS   	aget_runs  s   
z!_AsyncExperimentManager.aget_runs AsyncIterator[EvaluationResults]c                 C sx   | j d u r|  I d H 2 z3 d H W }dg iV  q6 d S tjt| j dt d\| _ }|2 z	3 d H W }|V  q/6 d S )Nr   r   r   )r   r   rs   r   r   r~   r   )r   r   r   resultrO   rO   rS   aget_evaluation_results  s   

z/_AsyncExperimentManager.aget_evaluation_resultsc                   s   zt |  I d H I d H }W n ty   tdw |s"td| jr*| |nd }| || | j| j	d< | j
|  I d H |dS )Nz\No examples found in the dataset. Please ensure the data provided to aevaluate is not empty.z[No examples found in the dataset.Please ensure the data provided to aevaluate is not empty.r>   )rD   )rs   r   r   StopAsyncIterationr_   r   _get_project_print_experiment_startr   	_metadata_copy)r   first_exampleru   rO   rO   rS   r     s&   z_AsyncExperimentManager.astartc                 C  s   i }|j pi  D ]1\}}| jd ur6t|j| | jv r6t| jt|j|  }|d ||d d||< q	|||< q	tj|j|j	|j
|j|j|j|j|j||j|jdS )Nr   r   r   r   )r   r^   r   rW   r`   r   r   r   r   r   r   r   r   r7   r   r   r   r   )r   r   r   r   r   r   rO   rO   rS   _get_example_with_readers  s2   

z1_AsyncExperimentManager._get_example_with_readersr<   r=   rI   r   r3   *Sequence[Union[EVALUATOR_T, AEVALUATOR_T]]c          	        s   t   tdstjdd_td fddfdd	}tj|| d
d}tj|dt	
 d\}}}jdd |2 dd |2 dd |2 dS )zRun predictions and evaluations in a single pipeline.

        This allows evaluators to process results as soon as they're available from
        the target function, rather than waiting for all predictions to complete first.
        _evaluation_feedback_executor   max_workersr   r   c                   sh   t | jjjtjI d H }|d |d } }j || dg idjdI d H }|S )Nr   rm   r   rm   r   r   feedback_executor)		_aforwardr   experiment_namer   r@   r'   r   _arun_evaluatorsr   )r   predrm   r   )r3   r   rI   traceable_targetrO   rS   process_example  s(   
		zQ_AsyncExperimentManager.awith_predictions_and_evaluators.<locals>.process_examplec                   s<     I dH 2 z3 dH W }  | V  q6  I dH  dS )zCreate a single task per example.

            That task is to run the target function and all the evaluators
            sequentially.
            N)r   _aendr   )r   r   rO   rS   process_examples5  s
   zR_AsyncExperimentManager.awith_predictions_and_evaluators.<locals>.process_examplesMbP?_eager_consumption_timeout   r   c                 S "   | z3 d H W }|d V  q6 d S Nr   rO   rP   r   rO   rO   rS   rT   L       zK_AsyncExperimentManager.awith_predictions_and_evaluators.<locals>.<genexpr>c                 S r  Nrm   rO   r  rO   rO   rS   rT   M  r  c                 S r  Nr   rO   r  rO   rO   rS   rT   N  r  rv   r   N)r   r   )r%   r   cfThreadPoolExecutorr   _ensure_async_traceablers   aiter_with_concurrencyr   r~   r   r   )	r   rI   r3   r<   r   experiment_resultsr1r2r3rO   )r3   r   r   rI   r   rS   r     s$   
z8_AsyncExperimentManager.awith_predictions_and_evaluatorsc                  sP   | j ||t|d}tj|dt d\}}| jdd |2 dd |2 dS )N)r<   rz   r   r   c                 S r  r  rO   rP   r   rO   rO   rS   rT   ^  r  z<_AsyncExperimentManager.awith_predictions.<locals>.<genexpr>c                 S r  r	  rO   r  rO   rO   rS   rT   _  r  )rv   )	_apredictr'   rs   r   r~   r   r   )r   rI   r<   _experiment_resultsr  r  rO   rO   rS   r   Q  s   z)_AsyncExperimentManager.awith_predictionsr}   c                  s`   t |}| j||d}tj|dt d\}}}| jdd |2 dd |2 dd |2 dS )	Nr}   r  r   c                 S r  r  rO   r  rO   rO   rS   rT   l  r  z;_AsyncExperimentManager.awith_evaluators.<locals>.<genexpr>c                 S r  r	  rO   r  rO   rO   rS   rT   m  r  c                 S r  r
  rO   r  rO   rO   rS   rT   n  r  r  )r%   _ascorers   r   r~   r   r   )r   r3   r<   r  r  r  r  rO   rO   rS   r   b  s   z(_AsyncExperimentManager.awith_evaluatorsr5   Sequence[SUMMARY_EVALUATOR_T]c                   s2   t |}| |}| j|  I d H |  |dS )N)rv   r   )r)   _aapply_summary_evaluatorsr   r   r   )r   r5   wrapped_evaluatorsaggregate_feedback_genrO   rO   rS   r   q  s   
z0_AsyncExperimentManager.awith_summary_evaluators"AsyncIterator[ExperimentResultRow]c                 C sL   t |  |  I d H |  2 z3 d H W \}}}t|||dV  q6 d S )Nr   )rs   	async_zipr   r   r   r   )r   rm   r   r   rO   rO   rS   aget_results}  s   
z$_AsyncExperimentManager.aget_resultsdict[str, list[dict]]c                   s.   | j d u r
dg iS ddd | j 2 I d H iS )Nr   c                   s*   g | z3 d H W }|d D ]}|qq6 S )Nr   rO   )rP   r   resrO   rO   rS   rp     s    
z?_AsyncExperimentManager.aget_summary_scores.<locals>.<listcomp>)r   r   rO   rO   rS   aget_summary_scores  s   

z+_AsyncExperimentManager.aget_summary_scoresAsyncIterator[_ForwardResults]c                 sT   t |  fdd}tj|| dd2 z	3 d H W }|V  q6  I d H  d S )Nc               	    sH     I d H 2 z3 d H W } t | jjjjV  q6 d S rN   )r   r   r   r   r   r@   r   r   fnrz   r   rO   rS   predict_all  s   z6_AsyncExperimentManager._apredict.<locals>.predict_allr  r  )r  rs   r  r   )r   rI   r<   rz   r%  r   rO   r#  rS   r    s   z!_AsyncExperimentManager._apredictSequence[RunEvaluator]c                  sl   t jdd% fdd}tj|| dd2 z	3 d H W }|V  q6 W d    d S 1 s/w   Y  d S )Nr   r   c                   s0     2 z3 d H W } j | dV  q6 d S )Nr   )r  r   )current_resultsr3   r   r   rO   rS   	score_all  s   
z2_AsyncExperimentManager._ascore.<locals>.score_allr  r  )r  r  rs   r  )r   r3   r<   r)  r   rO   r(  rS   r    s   "z_AsyncExperimentManager._ascorer'  r   r   cf.ThreadPoolExecutorc              	     s   t  }i |d pi dji}t jdi i |d|js dndjdF |d |d  |d	 }d fdd}g }|D ]}	|||	I d H  qD|D ]}
|
d ur`|d |
 qSt |dW  d    S 1 srw   Y  d S )Nr7   rD   r3   localTproject_namer7   enabledr@   rm   r   r   	evaluatorr-   c                   s0  t  z#| jdI d H }j|}jr&jj|d |W S  ty   zcz,t	| }t
 fdd|D d}j|}jrTjj|d |W W  Y d   S  tyv } ztd|  W Y d }~nd }~ww tjdt|  dj d	t  d
d W Y d   d S d   ww )N)rm   r   evaluator_run_id)rm   	_executorc                   s$   g | ]}t |t d didqS )errorT)keyr   commentextra)r+   repr)rP   r3  r   r0  rO   rS   rp     s    z[_AsyncExperimentManager._arun_evaluators.<locals>._run_single_evaluator.<locals>.<listcomp>)r   zError parsing feedback keys: zError running evaluator z on run : Texc_info)rX   uuid4aevaluate_runr   r@   _select_eval_resultsr   _log_evaluation_feedback	Exceptionr   r,   ra   rb   r2  r6  r`   )r/  evaluator_responseselected_resultsfeedback_keyserror_responsee2r   r   rm   r   r7  rS   _run_single_evaluator  s^   zG_AsyncExperimentManager._arun_evaluators.<locals>._run_single_evaluatorr   r   rO   )r/  r-   )	rhget_tracing_contextr   tracing_contextr   r@   appendextendr   )r   r3   r'  r   current_contextr7   eval_resultsrF  all_resultsr/  r   rO   rE  rS   r     sD   
	1$z(_AsyncExperimentManager._arun_evaluatorsc                 C s  g g }}t |  I d H }t |  |2 z3 d H W \}}|| || q6 g }| jr6|  jnd }t	
 }	i |	d pBi | j|d}
t	jdi i |	d|
| jsXdnd| jdl |D ]a}z>|||}| jj||jd}|| | jr|D ]#}|jdhd	}|d
d }t j| jjfi |d ||dI d H  qW qd ty } ztjdt| d| dd W Y d }~qdd }~ww W d    n1 sw   Y  d|iV  d S )Nr7   )rD   experiment_idr3   r+  Tr,  )fn_nametarget_run_id)excludeevaluator_info)run_id
project_idsource_infoz Error running summary evaluator r8  r9  r   rO   )rs   r   r   r  r   rJ  r   _get_experimentr`   rG  rH  r   rI  r@   r=  __name__rK  dictpoprt   create_feedbackr?  ra   r2  r6  )r   r5   rv   examplesasync_examplesrm   r   aggregate_feedbackrU  rL  r7   r/  summary_eval_resultflattened_resultsr   feedbackrS  r   rO   rO   rS   r    sz   


	

!z2_AsyncExperimentManager._aapply_summary_evaluatorsc                   sV   g }|   I d H 2 z3 d H W }|jr||j q
6 |r!t|nd }|r)| S d S rN   )r   r   rJ  max	isoformat)r   r   r   max_modified_atrO   rO   rS   _get_dataset_versionQ  s   z,_AsyncExperimentManager._get_dataset_versionOptional[list[str]]c                   s   t  }|  I d H 2 z/3 d H W }|jr5|jdr5t|jd tr5|jd D ]}t|tr3|| q'q|d q6 t|S )Ndataset_splitbase)setr   r7   getrV   rd   rW   add)r   splitsr   splitrO   rO   rS   _get_dataset_splits\  s"   


z+_AsyncExperimentManager._get_dataset_splitsNonec                   sn   | j sd S | j}|d u rtd|  }|  I d H |d< |  I d H |d< | jj|ji |j	|d d S )NzExperiment not started yet.dataset_versiondataset_splits)r7   )
r   r   r_   _get_experiment_metadatare  rn  r@   update_projectr`   r7   )r   rD   project_metadatarO   rO   rS   r   l  s"   
z_AsyncExperimentManager._aendargsr	   rK   c                 O  sr   | j f}| j| j| j| j| j| j| j| j| j	| j
| jd}t|t|t|d   }i ||}| j|i |S )N)rD   r7   rv   r@   r   r   rz   r{   rF   r   rG   )r   r   r   r   r@   r   r   r   r   r   r   r   rd   lenr   )r   ru  rK   default_argsdefault_kwargs	full_argsfull_kwargsrO   rO   rS   r   ~  s    z_AsyncExperimentManager._copy)NNNNNNNr/   FFTNr0   )rD   r   r7   r8   rv   r   r@   rA   r   r   r   r   r;   r:   r>   r?   rz   rC   r{   rC   rF   rC   r   r8   rG   rH   r1   rw   )r   r   rL   r   )rL   r   rL   rW   )rL   r   )rL   r   )rL   r   rN   )r<   r=   rI   r   r3   r   rL   r   )r<   r=   rI   r   rL   r   )r3   r   r<   r=   rL   r   )r5   r  rL   r   rL   r  )rL   r  )NF)r<   r=   rz   rC   rI   r   rL   r"  )r3   r&  r<   r=   rL   r  )r3   r&  r'  r   r   r*  rL   r   )r5   r  rL   r   )rL   r:   )rL   rf  rL   ro  )ru  r	   rK   r	   rL   r   )rX  
__module____qualname____doc__r   r   r   r   r   r   r   r   r   r   r   r   r  r!  r  r  r   r  re  rn  r   r   __classcell__rO   rO   r   rS   r     sV    ,
&
,
!


	

%J



 

V
7

r   c                   @  sx   e Zd Zd$ddZed%ddZd&d
dZd'ddZd(ddZ	d)d*ddZ	d%ddZ
d+ddZd%d d!Zd,d"d#ZdS )-rM   experiment_managerr   c                 C  s4   || _ g | _t | _t| | j | _d| _d S )Nr   )	_manager_resultsr~   r   _lockcreate_task_process_data_task_processed_count)r   r  rO   rO   rS   r     s
   

zAsyncExperimentResults.__init__rL   rW   c                 C  s   | j jS rN   )r  r   r   rO   rO   rS   r        z&AsyncExperimentResults.experiment_namer  c                 C  s   | S rN   rO   r   rO   rO   rS   ry     s   z AsyncExperimentResults.__aiter__r   c              	     s   d
 fdd}	  j 4 I d H 3  jt jk r2 j j }  jd7  _|W  d   I d H  S  j r9tW d   I d H  n1 I d H sIw   Y  ttj	|t jd d	I d H  q	)Nindexr?   rL   ro  c                   s.    j | k rtdI d H   j | k sd S d S )Ng?)r  r~   sleep)r  r   rO   rS   _wait_until_index  s   
z;AsyncExperimentResults.__anext__.<locals>._wait_until_indexTr/   )timeout)r  r?   rL   ro  )
r  r  rv  r  r  doner   r~   shieldwait_for)r   r  r   rO   r   rS   	__anext__  s    
(
z AsyncExperimentResults.__anext__r   ro  c              
     s   t  }|| 2 z)3 d H W }| j4 I d H  | j| W d   I d H  n1 I d H s.w   Y  q
6 | I d H }| j4 I d H  || _W d   I d H  d S 1 I d H sXw   Y  d S rN   )r"   r  r  r  rJ  r!  r   )r   r   tqdmitemsummary_scoresrO   rO   rS   r    s   (.z$AsyncExperimentResults._process_datar   Nstartr=   end	DataFramec                 C  s   t | j||dS )N)r  r  )r(   r  )r   r  r  rO   rO   rS   	to_pandas  s   z AsyncExperimentResults.to_pandasc                 C  s6   dd l }| jr|jdr| dd}| S |  S )Nr   pandas   )importlib.utilr  util	find_specr  _repr_html___repr__)r   	importlibdfrO   rO   rS   r    s
   z"AsyncExperimentResults._repr_html_r?   c                 C  s
   t | jS rN   )rv  r  r   rO   rO   rS   __len__  s   
zAsyncExperimentResults.__len__c                 C  s   d| j  dS )Nz<AsyncExperimentResults >)r   r   rO   rO   rS   r    s   zAsyncExperimentResults.__repr__c                   s   | j I d H  d S rN   )r  r   rO   rO   rS   r     s   zAsyncExperimentResults.wait)r  r   r{  r|  )rL   r   )r   r   rL   ro  )r   N)r  r=   r  r=   rL   r  )rL   r?   r}  )rX  r~  r  r   propertyr   ry   r  r  r  r  r  r  r   rO   rO   rO   rS   rM     s    







	
r$  ,rh.SupportsLangsmithExtra[[dict], Awaitable]r   r   r   rW   rY  langsmith.Clientrz   r   c                   s2  d dfdd}d fdd	}t j||i |d
 jp j i|d}	|dkr0 j|	d< n|dkr9||	d< ntd|t jddI zt| }
 fdd|
D }| |d|	iI d H  W n t	y} } zt
jd| ddd W Y d }~nd }~ww tttj dW  d    S 1 sw   Y  d S )Nrrun_trees.RunTreerL   ro  c                   s   |  d S rN   rO   r  )rm   rO   rS   _get_run  r  z_aforward.<locals>._get_run
rt.RunTreec                   s    j | _d S rN   )r`   rl   r  r   rO   rS   _set_reference_example_id  s   z,_aforward.<locals>._set_reference_example_idexample_version)on_endr-  r7   r@   r0   rl   ignore_on_successz2Unrecognized error_handling value: error_handling=T)r.  c                   s   g | ]}t  |qS rO   )r   )rP   argnr   rO   rS   rp     rq   z_aforward.<locals>.<listcomp>langsmith_extrazError running target function: r/   )r:  
stacklevel)rm   r   )r  r  rL   ro  )r  r  rL   ro  )rG  LangSmithExtrar   r   rc  r`   r_   rI  r   r?  ra   r2  r   r   r   r   )r$  r   r   r7   r@   rz   rG   r  r  r  	arg_namesru  r   rO   )r   rm   rS   r     sF   		

$r   r   c                 C  sV   t | st| st| rtdtdt| r| S t| r#| j} tjdd| S )NzTarget must be an async function. For sync functions, use evaluate. Example usage:

async def predict(inputs: dict) -> dict:
    # do work, like chain.invoke(inputs)
    return {...}
await aevaluate(predict, ...)zTarget must be a callable async function. Received a non-callable object. Example usage:

async def predict(inputs: dict) -> dict:
    # do work, like chain.invoke(inputs)
    return {...}
await aevaluate(predict, ...)AsyncTarget)r   )	r~   r   r   callabler_   rG  is_traceable_functionainvoke	traceable)rI   rO   rO   rS   r    s   	
r  )rz   r   c                C  s(   t | tr
t| S tt| ||dS )z*Return the examples for the given dataset.r   )rV   r   rs   r   r$   )r1   r@   rz   rO   rO   rS   r   &  s
   

r   TiterableIterable[AsyncIterable[T]]AsyncIterator[T]c                 C s*   | D ]}|2 z	3 dH W }|V  q6 qdS )zChain multiple async iterables.NrO   )r  sub_iterabler  rO   rO   rS   r   7  s   r   r\  list[schemas.Example]AsyncIterable[schemas.Example]c                 C s   | D ]}|V  qdS )z0Convert a list of examples to an async iterable.NrO   )r\  r   rO   rO   rS   r   @  s   r   )NNNNNNr   r/   NTNTr0   ) r1   r2   r3   r4   r5   r6   r7   r8   r9   r:   r;   r:   r<   r=   r>   r?   r@   rA   rB   rC   rD   rE   rF   rC   rG   rH   rI   rJ   rK   r	   rL   rM   )NNNr   NFT)r3   r4   r5   r6   r7   r8   r<   r=   r@   rA   rj   rC   rB   rC   rD   rk   rL   rM   )NNNNNNr/   NTNTr0   )r1   rw   r3   r4   r5   r6   r7   r8   r9   r:   r;   r:   r<   r=   r>   r?   r@   rA   rB   rC   rD   rE   rF   rC   rG   rH   rI   rx   rL   rM   )Fr0   )r$  r  r   r   r   rW   r7   rY  r@   r  rz   rC   rG   rH   rL   r   )rI   r   rL   r  )r1   rw   r@   r  rz   rC   rL   r   )r  r  rL   r  )r\  r  rL   r  )Vr  
__future__r   r~   concurrent.futuresfuturesr  r   loggingr   rX   collections.abcr   r   r   r   r   typingr   r	   r
   r   r   r   r   r   	langsmithr   rG  r   r   r   r   r   langsmith._internalr   rs   #langsmith._internal._beta_decoratorr   langsmith.evaluation._runnerr   r   r   r   r   r   r   r   r   r   r    r!   r"   r#   r$   r%   r&   r'   r(   r)   langsmith.evaluation.evaluatorr*   r+   r,   r-   r  pdlangchain_core.runnablesr.   r  	getLoggerrX  ra   rY  r   ri   rc   re   r   rM   r   r  r   r  r   r   rO   rO   rO   rS   <module>   s    (X
$  {N    |K
1"
	