refactor: move tracers to respective files (#2734)

* refactor: move Tracers to respective files

* feat(pyproject.toml): update langwatch version to 0.1.9 and add langsmith dependency at version 0.1.86
This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-07-16 16:02:16 -03:00 committed by GitHub
commit 50dc1e87d1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 341 additions and 2748 deletions

10
poetry.lock generated
View file

@ -4941,7 +4941,6 @@ asyncer = "^0.0.5"
bcrypt = "4.0.1"
cachetools = "^5.3.1"
chardet = "^5.2.0"
crewai = "^0.36.0"
cryptography = "^42.0.5"
docstring-parser = "^0.16"
duckdb = "^1.0.0"
@ -4954,7 +4953,6 @@ jq = {version = "^1.7.0", markers = "sys_platform != \"win32\""}
langchain = "~0.2.0"
langchain-experimental = "^0.0.61"
langchainhub = "~0.1.15"
langwatch = "^0.1.3"
loguru = "^0.7.1"
multiprocess = "^0.70.14"
nest-asyncio = "^1.6.0"
@ -5038,13 +5036,13 @@ requests = ">=2,<3"
[[package]]
name = "langwatch"
version = "0.1.8"
version = "0.1.9"
description = "Python SDK for LangWatch for monitoring your LLMs"
optional = false
python-versions = "<4.0,>=3.9"
files = [
{file = "langwatch-0.1.8-py3-none-any.whl", hash = "sha256:e6e7ffe2f3cd61c477e8ff6de4ad3d1e06f63b0c8f02880d68293fc126ec2bf7"},
{file = "langwatch-0.1.8.tar.gz", hash = "sha256:2ccc3a741ef9bf493946264ab8fff5cb33845e51d4426136218d62b1a4cbd26d"},
{file = "langwatch-0.1.9-py3-none-any.whl", hash = "sha256:d082f70bb7f6aa67623281e3e624065932bc6e5729322637d8957cc63942a007"},
{file = "langwatch-0.1.9.tar.gz", hash = "sha256:3f044b668239b0e4cc244635f8ceeb7e10948c736f6b60bcda7c26852308b2f8"},
]
[package.dependencies]
@ -11737,4 +11735,4 @@ local = ["ctransformers", "llama-cpp-python", "sentence-transformers"]
[metadata]
lock-version = "2.0"
python-versions = ">=3.10,<3.13"
content-hash = "afe38792a9e202ab55558389026b64beb95d1437b2fd4bccfbb300cbc7ad0677"
content-hash = "53efe77ea0fe985d9319884d4e0ade4d1949f6348ecf75d6afa7297483dd314c"

View file

@ -99,7 +99,8 @@ langchain-nvidia-ai-endpoints = "^0.1.2"
langchain-google-calendar-tools = "^0.0.1"
langchain-milvus = "^0.1.1"
crewai = {extras = ["tools"], version = "^0.36.0"}
langwatch = "^0.1.7"
langwatch = "^0.1.9"
langsmith = "^0.1.86"
[tool.poetry.group.dev.dependencies]

View file

@ -0,0 +1,151 @@
import os
import traceback
import types
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Dict, Optional
from uuid import UUID
from loguru import logger
from langflow.schema.data import Data
from langflow.services.tracing.base import BaseTracer
from langflow.services.tracing.schema import Log
if TYPE_CHECKING:
from langflow.graph.vertex.base import Vertex
class LangSmithTracer(BaseTracer):
def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID):
from langsmith.run_trees import RunTree
self.trace_name = trace_name
self.trace_type = trace_type
self.project_name = project_name
self.trace_id = trace_id
try:
self._run_tree = RunTree(
project_name=self.project_name,
name=self.trace_name,
run_type=self.trace_type,
id=self.trace_id,
)
self._run_tree.add_event({"name": "Start", "time": datetime.now(timezone.utc).isoformat()})
self._children: dict[str, RunTree] = {}
self._ready = self.setup_langsmith()
except Exception as e:
logger.debug(f"Error setting up LangSmith tracer: {e}")
self._ready = False
@property
def ready(self):
return self._ready
def setup_langsmith(self):
try:
from langsmith import Client
self._client = Client()
except ImportError:
logger.error("Could not import langsmith. Please install it with `pip install langsmith`.")
return False
os.environ["LANGCHAIN_TRACING_V2"] = "true"
return True
def add_trace(
self,
trace_id: str,
trace_name: str,
trace_type: str,
inputs: Dict[str, Any],
metadata: Dict[str, Any] | None = None,
vertex: Optional["Vertex"] = None,
):
if not self._ready:
return
processed_inputs = {}
if inputs:
processed_inputs = self._convert_to_langchain_types(inputs)
child = self._run_tree.create_child(
name=trace_name,
run_type=trace_type, # type: ignore[arg-type]
inputs=processed_inputs,
)
if metadata:
child.add_metadata(self._convert_to_langchain_types(metadata))
self._children[trace_name] = child
self._child_link: dict[str, str] = {}
def _convert_to_langchain_types(self, io_dict: Dict[str, Any]):
converted = {}
for key, value in io_dict.items():
converted[key] = self._convert_to_langchain_type(value)
return converted
def _convert_to_langchain_type(self, value):
from langflow.schema.message import Message
if isinstance(value, dict):
for key, _value in value.copy().items():
_value = self._convert_to_langchain_type(_value)
value[key] = _value
elif isinstance(value, list):
value = [self._convert_to_langchain_type(v) for v in value]
elif isinstance(value, Message):
if "prompt" in value:
value = value.load_lc_prompt()
elif value.sender:
value = value.to_lc_message()
else:
value = value.to_lc_document()
elif isinstance(value, Data):
value = value.to_lc_document()
elif isinstance(value, types.GeneratorType):
# generator is not serializable, also we can't consume it
value = str(value)
return value
def end_trace(
self,
trace_id: str,
trace_name: str,
outputs: Dict[str, Any] | None = None,
error: Exception | None = None,
logs: list[Log | dict] = [],
):
child = self._children[trace_name]
raw_outputs = {}
processed_outputs = {}
if outputs:
raw_outputs = outputs
processed_outputs = self._convert_to_langchain_types(outputs)
if logs:
child.add_metadata(self._convert_to_langchain_types({"logs": {log.get("name"): log for log in logs}}))
child.add_metadata(self._convert_to_langchain_types({"outputs": raw_outputs}))
child.end(outputs=processed_outputs, error=self._error_to_string(error))
if error:
child.patch()
else:
child.post()
self._child_link[trace_name] = child.get_url()
def _error_to_string(self, error: Optional[Exception]):
error_message = None
if error:
string_stacktrace = traceback.format_exception(error)
error_message = f"{error.__class__.__name__}: {error}\n\n{string_stacktrace}"
return error_message
def end(
self,
inputs: dict[str, Any],
outputs: Dict[str, Any],
error: Exception | None = None,
metadata: dict[str, Any] | None = None,
):
self._run_tree.add_metadata({"inputs": inputs})
if metadata:
self._run_tree.add_metadata(metadata)
self._run_tree.end(outputs=outputs, error=self._error_to_string(error))
self._run_tree.post()
self._run_link = self._run_tree.get_url()

View file

@ -0,0 +1,178 @@
from typing import TYPE_CHECKING, Any, Dict, Optional, cast
from uuid import UUID
import nanoid # type: ignore
from langwatch.tracer import ContextSpan
from loguru import logger
from langflow.schema.data import Data
from langflow.services.tracing.base import BaseTracer
from langflow.services.tracing.schema import Log
if TYPE_CHECKING:
from langwatch.tracer import ContextSpan
from langwatch.types import SpanTypes
from langflow.graph.vertex.base import Vertex
class LangWatchTracer(BaseTracer):
flow_id: str
def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID):
self.trace_name = trace_name
self.trace_type = trace_type
self.project_name = project_name
self.trace_id = trace_id
self.flow_id = trace_name.split(" - ")[-1]
try:
self._ready = self.setup_langwatch()
# import after setting up langwatch so we are sure to be available
import nanoid # type: ignore
self.trace = self._client.trace(
trace_id=str(self.trace_id),
)
self.spans: dict[str, "ContextSpan"] = {}
name_without_id = " - ".join(trace_name.split(" - ")[0:-1])
self.trace.root_span.update(
span_id=f"{self.flow_id}-{nanoid.generate(size=6)}", # nanoid to make the span_id globally unique, which is required for LangWatch for now
name=name_without_id,
type=self._convert_trace_type(trace_type),
)
except Exception as e:
logger.debug(f"Error setting up LangWatch tracer: {e}")
self._ready = False
@property
def ready(self):
return self._ready
def setup_langwatch(self):
try:
import langwatch
self._client = langwatch
except ImportError:
logger.error("Could not import langwatch. Please install it with `pip install langwatch`.")
return False
return True
def _convert_trace_type(self, trace_type: str):
trace_type_: "SpanTypes" = (
cast("SpanTypes", trace_type)
if trace_type in ["span", "llm", "chain", "tool", "agent", "guardrail", "rag"]
else "span"
)
return trace_type_
def add_trace(
self,
trace_id: str,
trace_name: str,
trace_type: str,
inputs: Dict[str, Any],
metadata: Dict[str, Any] | None = None,
vertex: Optional["Vertex"] = None,
):
# If user is not using session_id, then it becomes the same as flow_id, but
# we don't want to have an infinite thread with all the flow messages
if "session_id" in inputs and inputs["session_id"] != self.flow_id:
self.trace.update(metadata=(self.trace.metadata or {}) | {"thread_id": inputs["session_id"]})
name_without_id = " (".join(trace_name.split(" (")[0:-1])
trace_type_ = self._convert_trace_type(trace_type)
self.spans[trace_id] = self.trace.span(
span_id=f"{trace_id}-{nanoid.generate(size=6)}", # Add a nanoid to make the span_id globally unique, which is required for LangWatch for now
name=name_without_id,
type=trace_type_,
parent=(
[span for key, span in self.spans.items() for edge in vertex.incoming_edges if key == edge.source_id][
-1
]
if vertex and len(vertex.incoming_edges) > 0
else self.trace.root_span
),
input=self._convert_to_langwatch_types(inputs),
)
if trace_type_ == "llm" and "model_name" in inputs:
self.spans[trace_id].update(model=inputs["model_name"])
def end_trace(
self,
trace_id: str,
trace_name: str,
outputs: Dict[str, Any] | None = None,
error: Exception | None = None,
logs: list[Log | dict] = [],
):
if self.spans.get(trace_id):
# Workaround for when model is used just as a component not actually called as an LLM,
# to prevent LangWatch from calculating the cost based on it when it was in fact never called
if (
self.spans[trace_id].type == "llm"
and outputs
and "model_output" in outputs
and "text_output" not in outputs
):
self.spans[trace_id].update(metrics={"prompt_tokens": 0, "completion_tokens": 0})
self.spans[trace_id].end(output=self._convert_to_langwatch_types(outputs), error=error)
def end(
self,
inputs: dict[str, Any],
outputs: Dict[str, Any],
error: Exception | None = None,
metadata: dict[str, Any] | None = None,
):
self.trace.root_span.end(
input=self._convert_to_langwatch_types(inputs),
output=self._convert_to_langwatch_types(outputs),
error=error,
)
if metadata and "flow_name" in metadata:
self.trace.update(metadata=(self.trace.metadata or {}) | {"labels": [f"Flow: {metadata['flow_name']}"]})
self.trace.deferred_send_spans()
def _convert_to_langwatch_types(self, io_dict: Optional[Dict[str, Any]]):
from langwatch.utils import autoconvert_typed_values
if io_dict is None:
return None
converted = {}
for key, value in io_dict.items():
converted[key] = self._convert_to_langwatch_type(value)
return autoconvert_typed_values(converted)
def _convert_to_langwatch_type(self, value):
from langwatch.langchain import langchain_message_to_chat_message, langchain_messages_to_chat_messages
from langflow.schema.message import BaseMessage, Message
if isinstance(value, dict):
for key, _value in value.copy().items():
_value = self._convert_to_langwatch_type(_value)
value[key] = _value
elif isinstance(value, list):
value = [self._convert_to_langwatch_type(v) for v in value]
elif isinstance(value, Message):
if "prompt" in value:
prompt = value.load_lc_prompt()
if len(prompt.input_variables) == 0 and all(isinstance(m, BaseMessage) for m in prompt.messages):
value = langchain_messages_to_chat_messages([cast(list[BaseMessage], prompt.messages)])
else:
value = cast(dict, value.load_lc_prompt())
elif value.sender:
value = langchain_message_to_chat_message(value.to_lc_message())
else:
value = cast(dict, value.to_lc_document())
elif isinstance(value, Data):
value = cast(dict, value.to_lc_document())
return value

View file

@ -1,27 +1,23 @@
import asyncio
import os
import traceback
import types
from collections import defaultdict
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Dict, Optional, cast
from typing import TYPE_CHECKING, Any, Dict, Optional
from uuid import UUID
from loguru import logger
from langflow.schema.data import Data
from langflow.services.base import Service
from langflow.services.tracing.base import BaseTracer
from langflow.services.tracing.langsmith import LangSmithTracer
from langflow.services.tracing.langwatch import LangWatchTracer
from langflow.services.tracing.schema import Log
if TYPE_CHECKING:
from langflow.services.monitor.service import MonitorService
from langflow.services.settings.service import SettingsService
from langflow.custom.custom_component.component import Component
from langflow.graph.vertex.base import Vertex
from langwatch.tracer import ContextSpan
from langwatch.types import SpanTypes
from langflow.services.monitor.service import MonitorService
from langflow.services.settings.service import SettingsService
class TracingService(Service):
@ -221,305 +217,3 @@ class TracingService(Service):
if "api_key" in key:
inputs[key] = "*****" # avoid logging api_keys for security reasons
return inputs
class LangSmithTracer(BaseTracer):
def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID):
from langsmith.run_trees import RunTree
self.trace_name = trace_name
self.trace_type = trace_type
self.project_name = project_name
self.trace_id = trace_id
try:
self._run_tree = RunTree(
project_name=self.project_name,
name=self.trace_name,
run_type=self.trace_type,
id=self.trace_id,
)
self._run_tree.add_event({"name": "Start", "time": datetime.now(timezone.utc).isoformat()})
self._children: dict[str, RunTree] = {}
self._ready = self.setup_langsmith()
except Exception as e:
logger.debug(f"Error setting up LangSmith tracer: {e}")
self._ready = False
@property
def ready(self):
return self._ready
def setup_langsmith(self):
try:
from langsmith import Client
self._client = Client()
except ImportError:
logger.error("Could not import langsmith. Please install it with `pip install langsmith`.")
return False
os.environ["LANGCHAIN_TRACING_V2"] = "true"
return True
def add_trace(
self,
trace_id: str,
trace_name: str,
trace_type: str,
inputs: Dict[str, Any],
metadata: Dict[str, Any] | None = None,
vertex: Optional["Vertex"] = None,
):
if not self._ready:
return
processed_inputs = {}
if inputs:
processed_inputs = self._convert_to_langchain_types(inputs)
child = self._run_tree.create_child(
name=trace_name,
run_type=trace_type, # type: ignore[arg-type]
inputs=processed_inputs,
)
if metadata:
child.add_metadata(self._convert_to_langchain_types(metadata))
self._children[trace_name] = child
self._child_link: dict[str, str] = {}
def _convert_to_langchain_types(self, io_dict: Dict[str, Any]):
converted = {}
for key, value in io_dict.items():
converted[key] = self._convert_to_langchain_type(value)
return converted
def _convert_to_langchain_type(self, value):
from langflow.schema.message import Message
if isinstance(value, dict):
for key, _value in value.copy().items():
_value = self._convert_to_langchain_type(_value)
value[key] = _value
elif isinstance(value, list):
value = [self._convert_to_langchain_type(v) for v in value]
elif isinstance(value, Message):
if "prompt" in value:
value = value.load_lc_prompt()
elif value.sender:
value = value.to_lc_message()
else:
value = value.to_lc_document()
elif isinstance(value, Data):
value = value.to_lc_document()
elif isinstance(value, types.GeneratorType):
# generator is not serializable, also we can't consume it
value = str(value)
return value
def end_trace(
self,
trace_id: str,
trace_name: str,
outputs: Dict[str, Any] | None = None,
error: Exception | None = None,
logs: list[Log | dict] = [],
):
child = self._children[trace_name]
raw_outputs = {}
processed_outputs = {}
if outputs:
raw_outputs = outputs
processed_outputs = self._convert_to_langchain_types(outputs)
if logs:
child.add_metadata(self._convert_to_langchain_types({"logs": {log.get("name"): log for log in logs}}))
child.add_metadata(self._convert_to_langchain_types({"outputs": raw_outputs}))
child.end(outputs=processed_outputs, error=self._error_to_string(error))
if error:
child.patch()
else:
child.post()
self._child_link[trace_name] = child.get_url()
def _error_to_string(self, error: Optional[Exception]):
error_message = None
if error:
string_stacktrace = traceback.format_exception(error)
error_message = f"{error.__class__.__name__}: {error}\n\n{string_stacktrace}"
return error_message
def end(
self,
inputs: dict[str, Any],
outputs: Dict[str, Any],
error: Exception | None = None,
metadata: dict[str, Any] | None = None,
):
self._run_tree.add_metadata({"inputs": inputs})
if metadata:
self._run_tree.add_metadata(metadata)
self._run_tree.end(outputs=outputs, error=self._error_to_string(error))
self._run_tree.post()
self._run_link = self._run_tree.get_url()
class LangWatchTracer(BaseTracer):
flow_id: str
def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID):
self.trace_name = trace_name
self.trace_type = trace_type
self.project_name = project_name
self.trace_id = trace_id
self.flow_id = trace_name.split(" - ")[-1]
try:
self._ready = self.setup_langwatch()
# import after setting up langwatch so we are sure to be available
import nanoid # type: ignore
self.trace = self._client.trace(
trace_id=str(self.trace_id),
)
self.spans: dict[str, "ContextSpan"] = {}
name_without_id = " - ".join(trace_name.split(" - ")[0:-1])
self.trace.root_span.update(
span_id=f"{self.flow_id}-{nanoid.generate(size=6)}", # nanoid to make the span_id globally unique, which is required for LangWatch for now
name=name_without_id,
type=self._convert_trace_type(trace_type),
)
except Exception as e:
logger.debug(f"Error setting up LangWatch tracer: {e}")
self._ready = False
@property
def ready(self):
return self._ready
def setup_langwatch(self):
try:
import langwatch
self._client = langwatch
except ImportError:
logger.error("Could not import langwatch. Please install it with `pip install langwatch`.")
return False
return True
def _convert_trace_type(self, trace_type: str):
trace_type_: "SpanTypes" = (
cast("SpanTypes", trace_type)
if trace_type in ["span", "llm", "chain", "tool", "agent", "guardrail", "rag"]
else "span"
)
return trace_type_
def add_trace(
self,
trace_id: str,
trace_name: str,
trace_type: str,
inputs: Dict[str, Any],
metadata: Dict[str, Any] | None = None,
vertex: Optional["Vertex"] = None,
):
import nanoid
# If user is not using session_id, then it becomes the same as flow_id, but
# we don't want to have an infinite thread with all the flow messages
if "session_id" in inputs and inputs["session_id"] != self.flow_id:
self.trace.update(metadata=(self.trace.metadata or {}) | {"thread_id": inputs["session_id"]})
name_without_id = " (".join(trace_name.split(" (")[0:-1])
trace_type_ = self._convert_trace_type(trace_type)
self.spans[trace_id] = self.trace.span(
span_id=f"{trace_id}-{nanoid.generate(size=6)}", # Add a nanoid to make the span_id globally unique, which is required for LangWatch for now
name=name_without_id,
type=trace_type_,
parent=(
[span for key, span in self.spans.items() for edge in vertex.incoming_edges if key == edge.source_id][
-1
]
if vertex and len(vertex.incoming_edges) > 0
else self.trace.root_span
),
input=self._convert_to_langwatch_types(inputs),
)
if trace_type_ == "llm" and "model_name" in inputs:
self.spans[trace_id].update(model=inputs["model_name"])
def end_trace(
self,
trace_id: str,
trace_name: str,
outputs: Dict[str, Any] | None = None,
error: Exception | None = None,
logs: list[Log | dict] = [],
):
if self.spans.get(trace_id):
# Workaround for when model is used just as a component not actually called as an LLM,
# to prevent LangWatch from calculating the cost based on it when it was in fact never called
if (
self.spans[trace_id].type == "llm"
and outputs
and "model_output" in outputs
and "text_output" not in outputs
):
self.spans[trace_id].update(metrics={"prompt_tokens": 0, "completion_tokens": 0})
self.spans[trace_id].end(output=self._convert_to_langwatch_types(outputs), error=error)
def end(
self,
inputs: dict[str, Any],
outputs: Dict[str, Any],
error: Exception | None = None,
metadata: dict[str, Any] | None = None,
):
self.trace.root_span.end(
input=self._convert_to_langwatch_types(inputs),
output=self._convert_to_langwatch_types(outputs),
error=error,
)
if metadata and "flow_name" in metadata:
self.trace.update(metadata=(self.trace.metadata or {}) | {"labels": [f"Flow: {metadata['flow_name']}"]})
self.trace.deferred_send_spans()
def _convert_to_langwatch_types(self, io_dict: Optional[Dict[str, Any]]):
from langwatch.utils import autoconvert_typed_values
if io_dict is None:
return None
converted = {}
for key, value in io_dict.items():
converted[key] = self._convert_to_langwatch_type(value)
return autoconvert_typed_values(converted)
def _convert_to_langwatch_type(self, value):
from langflow.schema.message import Message, BaseMessage
from langwatch.langchain import (
langchain_messages_to_chat_messages,
langchain_message_to_chat_message,
)
if isinstance(value, dict):
for key, _value in value.copy().items():
_value = self._convert_to_langwatch_type(_value)
value[key] = _value
elif isinstance(value, list):
value = [self._convert_to_langwatch_type(v) for v in value]
elif isinstance(value, Message):
if "prompt" in value:
prompt = value.load_lc_prompt()
if len(prompt.input_variables) == 0 and all(isinstance(m, BaseMessage) for m in prompt.messages):
value = langchain_messages_to_chat_messages([cast(list[BaseMessage], prompt.messages)])
else:
value = cast(dict, value.load_lc_prompt())
elif value.sender:
value = langchain_message_to_chat_message(value.to_lc_message())
else:
value = cast(dict, value.to_lc_document())
elif isinstance(value, Data):
value = cast(dict, value.to_lc_document())
return value

File diff suppressed because it is too large Load diff

View file

@ -71,9 +71,7 @@ opentelemetry-sdk = "^1.25.0"
opentelemetry-exporter-prometheus = "^0.46b0"
prometheus-client = "^0.20.0"
aiofiles = "^24.1.0"
crewai = "^0.36.0"
setuptools = ">=70"
langwatch = "^0.1.3"
[tool.poetry.extras]
deploy = ["celery", "redis", "flower"]