fix: langfuse trace get the right parent span (#6844)
* fix: langfuse trace get the right parent span * [autofix.ci] apply automated fixes * fix: langfuse callback from parent span * [autofix.ci] apply automated fixes --------- Co-authored-by: tianzhipeng-jk <tianzhipeng-jk@360shuke.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Ítalo Johnny <italojohnnydosanjos@gmail.com>
This commit is contained in:
parent
5323a71084
commit
037ffb620a
2 changed files with 15 additions and 18 deletions
File diff suppressed because one or more lines are too long
|
|
@ -1,6 +1,7 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from collections import OrderedDict
|
||||
from datetime import datetime, timezone
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
|
|
@ -14,7 +15,6 @@ if TYPE_CHECKING:
|
|||
from uuid import UUID
|
||||
|
||||
from langchain.callbacks.base import BaseCallbackHandler
|
||||
from langfuse.client import StatefulSpanClient
|
||||
|
||||
from langflow.graph.vertex.base import Vertex
|
||||
from langflow.services.tracing.schema import Log
|
||||
|
|
@ -29,8 +29,7 @@ class LangFuseTracer(BaseTracer):
|
|||
self.trace_type = trace_type
|
||||
self.trace_id = trace_id
|
||||
self.flow_id = trace_name.split(" - ")[-1]
|
||||
self.last_span: StatefulSpanClient | None = None
|
||||
self.spans: dict = {}
|
||||
self.spans: dict = OrderedDict() # spans that are not ended
|
||||
|
||||
config = self._get_config()
|
||||
self._ready: bool = self.setup_langfuse(config) if config else False
|
||||
|
|
@ -42,18 +41,10 @@ class LangFuseTracer(BaseTracer):
|
|||
def setup_langfuse(self, config) -> bool:
|
||||
try:
|
||||
from langfuse import Langfuse
|
||||
from langfuse.callback.langchain import LangchainCallbackHandler
|
||||
|
||||
self._client = Langfuse(**config)
|
||||
self.trace = self._client.trace(id=str(self.trace_id), name=self.flow_id)
|
||||
|
||||
config |= {
|
||||
"trace_name": self.flow_id,
|
||||
"stateful_client": self.trace,
|
||||
"update_stateful_client": True,
|
||||
}
|
||||
self._callback = LangchainCallbackHandler(**config)
|
||||
|
||||
except ImportError:
|
||||
logger.exception("Could not import langfuse. Please install it with `pip install langfuse`.")
|
||||
return False
|
||||
|
|
@ -67,7 +58,7 @@ class LangFuseTracer(BaseTracer):
|
|||
@override
|
||||
def add_trace(
|
||||
self,
|
||||
trace_id: str,
|
||||
trace_id: str, # actualy component id
|
||||
trace_name: str,
|
||||
trace_type: str,
|
||||
inputs: dict[str, Any],
|
||||
|
|
@ -90,9 +81,12 @@ class LangFuseTracer(BaseTracer):
|
|||
"start_time": start_time,
|
||||
}
|
||||
|
||||
span = self.last_span.span(**content_span) if self.last_span else self.trace.span(**content_span)
|
||||
if len(self.spans) > 0:
|
||||
last_span = next(reversed(self.spans))
|
||||
span = self.spans[last_span].span(**content_span)
|
||||
else:
|
||||
span = self.trace.span(**content_span)
|
||||
|
||||
self.last_span = span
|
||||
self.spans[trace_id] = span
|
||||
|
||||
@override
|
||||
|
|
@ -108,7 +102,7 @@ class LangFuseTracer(BaseTracer):
|
|||
if not self._ready:
|
||||
return
|
||||
|
||||
span = self.spans.get(trace_id, None)
|
||||
span = self.spans.pop(trace_id, None)
|
||||
if span:
|
||||
output: dict = {}
|
||||
output |= outputs or {}
|
||||
|
|
@ -133,7 +127,10 @@ class LangFuseTracer(BaseTracer):
|
|||
def get_langchain_callback(self) -> BaseCallbackHandler | None:
|
||||
if not self._ready:
|
||||
return None
|
||||
return self._callback
|
||||
|
||||
# get callback from parent span
|
||||
stateful_client = self.spans[next(reversed(self.spans))] if len(self.spans) > 0 else self.trace
|
||||
return stateful_client.get_langchain_handler()
|
||||
|
||||
@staticmethod
|
||||
def _get_config() -> dict:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue