fix: improve end_traces so it doesn't block the build loop (#3482)
* refactor: Simplify log configuration logic in Graph class * feat(tracing): Refactor _end_traces method to be async and use asyncio.to_thread. This is an attempt to avoid blocking the build loop
This commit is contained in:
parent
dde0fa4566
commit
40237d0631
2 changed files with 10 additions and 7 deletions
|
|
@ -60,9 +60,8 @@ class Graph:
|
|||
edges (List[Dict[str, str]]): A list of dictionaries representing the edges of the graph.
|
||||
flow_id (Optional[str], optional): The ID of the flow. Defaults to None.
|
||||
"""
|
||||
if not log_config:
|
||||
log_config = {"disable": False}
|
||||
configure(**log_config)
|
||||
if log_config:
|
||||
configure(**log_config)
|
||||
self._start = start
|
||||
self._state_model = None
|
||||
self._end = end
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ import asyncio
|
|||
import os
|
||||
from collections import defaultdict
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import TYPE_CHECKING, Any, Dict, Optional, List
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Optional
|
||||
from uuid import UUID
|
||||
|
||||
from loguru import logger
|
||||
|
|
@ -12,11 +12,12 @@ from langflow.services.tracing.base import BaseTracer
|
|||
from langflow.services.tracing.schema import Log
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from langchain.callbacks.base import BaseCallbackHandler
|
||||
|
||||
from langflow.custom.custom_component.component import Component
|
||||
from langflow.graph.vertex.base import Vertex
|
||||
from langflow.services.monitor.service import MonitorService
|
||||
from langflow.services.settings.service import SettingsService
|
||||
from langchain.callbacks.base import BaseCallbackHandler
|
||||
|
||||
|
||||
def _get_langsmith_tracer():
|
||||
|
|
@ -210,8 +211,11 @@ class TracingService(Service):
|
|||
self._end_traces(trace_id, trace_name, e)
|
||||
raise e
|
||||
finally:
|
||||
self._end_traces(trace_id, trace_name, None)
|
||||
self._reset_io()
|
||||
asyncio.create_task(asyncio.to_thread(self._end_and_reset, trace_id, trace_name, None))
|
||||
|
||||
async def _end_and_reset(self, trace_id: str, trace_name: str, error: Exception | None = None):
|
||||
self._end_traces(trace_id, trace_name, error)
|
||||
self._reset_io()
|
||||
|
||||
def set_outputs(
|
||||
self,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue