From 82080ebadef5b8606638cf4841c37eee6162a7f9 Mon Sep 17 00:00:00 2001 From: Ali Saleh Date: Wed, 18 Dec 2024 21:32:58 +0500 Subject: [PATCH] feat: ArizePhoenixTracer v2 - Enhanced Session Tracking and Flow Organization (#5336) Updated ArizePhoenixTracer --- .../services/tracing/arize_phoenix.py | 106 ++++++++++++------ 1 file changed, 73 insertions(+), 33 deletions(-) diff --git a/src/backend/base/langflow/services/tracing/arize_phoenix.py b/src/backend/base/langflow/services/tracing/arize_phoenix.py index b4eb386c3..4dcb30f67 100644 --- a/src/backend/base/langflow/services/tracing/arize_phoenix.py +++ b/src/backend/base/langflow/services/tracing/arize_phoenix.py @@ -13,7 +13,7 @@ from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage from loguru import logger from openinference.semconv.trace import OpenInferenceMimeTypeValues, SpanAttributes from opentelemetry.semconv.trace import SpanAttributes as OTELSpanAttributes -from opentelemetry.trace import Span, Status, StatusCode +from opentelemetry.trace import Span, Status, StatusCode, use_span from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from typing_extensions import override @@ -34,7 +34,10 @@ if TYPE_CHECKING: class ArizePhoenixTracer(BaseTracer): + flow_name: str flow_id: str + chat_input_value: str + chat_output_value: str def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID): """Initializes the ArizePhoenixTracer instance and sets up a root span.""" @@ -42,7 +45,10 @@ class ArizePhoenixTracer(BaseTracer): self.trace_type = trace_type self.project_name = project_name self.trace_id = trace_id + self.flow_name = trace_name.split(" - ")[0] self.flow_id = trace_name.split(" - ")[-1] + self.chat_input_value = "" + self.chat_output_value = "" try: self._ready = self.setup_arize_phoenix() @@ -53,12 +59,17 @@ class ArizePhoenixTracer(BaseTracer): self.propagator = TraceContextTextMapPropagator() self.carrier: dict[Any, CarrierT] = {} - with self.tracer.start_as_current_span( + self.root_span = self.tracer.start_span( name=self.flow_id, start_time=self._get_current_timestamp(), - ) as root_span: - root_span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, self.trace_type) - root_span.set_status(Status(StatusCode.OK)) + ) + self.root_span.set_attribute(SpanAttributes.SESSION_ID, self.flow_id) + self.root_span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, self.trace_type) + self.root_span.set_attribute("langflow.project.name", self.project_name) + self.root_span.set_attribute("langflow.flow.name", self.flow_name) + self.root_span.set_attribute("langflow.flow.id", self.flow_id) + + with use_span(self.root_span, end_on_exit=False): self.propagator.inject(carrier=self.carrier) self.child_spans: dict[str, Span] = {} @@ -118,7 +129,8 @@ class ArizePhoenixTracer(BaseTracer): TracerProvider, ) - project_name = self.project_name or self.flow_id + name_without_space = self.flow_name.replace(" ", "-") + project_name = self.project_name if name_without_space == "None" else name_without_space attributes = {PROJECT_NAME: project_name, "model_id": project_name} resource = Resource.create(attributes=attributes) tracer_provider = TracerProvider(resource=resource, verbose=False) @@ -187,11 +199,6 @@ class ArizePhoenixTracer(BaseTracer): else: child_span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, trace_type) - if "session_id" in inputs and len(inputs["session_id"]) > 0 and inputs["session_id"] != self.flow_id: - child_span.set_attribute(SpanAttributes.SESSION_ID, inputs["session_id"]) - else: - child_span.set_attribute(SpanAttributes.SESSION_ID, self.flow_id) - processed_inputs = self._convert_to_arize_phoenix_types(inputs) if inputs else {} if processed_inputs: child_span.set_attribute(SpanAttributes.INPUT_VALUE, self._safe_json_dumps(processed_inputs)) @@ -202,6 +209,12 @@ class ArizePhoenixTracer(BaseTracer): for key, value in processed_metadata.items(): child_span.set_attribute(f"{SpanAttributes.METADATA}.{key}", value) + component_name = trace_id.split("-")[0] + if component_name == "ChatInput": + self.chat_input_value = processed_inputs["input_value"] + elif component_name == "ChatOutput": + self.chat_output_value = processed_inputs["input_value"] + self.child_spans[trace_id] = child_span @override @@ -232,28 +245,7 @@ class ArizePhoenixTracer(BaseTracer): for key, value in processed_logs.items(): child_span.set_attribute(f"logs.{key}", value) - if error: - error_string = self._error_to_string(error) - child_span.set_status(Status(StatusCode.ERROR, error_string)) - child_span.set_attribute("error.message", error_string) - - if isinstance(error, Exception): - child_span.record_exception(error) - else: - exception_type = error.__class__.__name__ - exception_message = str(error) - if not exception_message: - exception_message = repr(error) - attributes: dict[str, AttributeValue] = { - OTELSpanAttributes.EXCEPTION_TYPE: exception_type, - OTELSpanAttributes.EXCEPTION_MESSAGE: exception_message, - OTELSpanAttributes.EXCEPTION_ESCAPED: False, - OTELSpanAttributes.EXCEPTION_STACKTRACE: error_string, - } - child_span.add_event(name="exception", attributes=attributes) - else: - child_span.set_status(Status(StatusCode.OK)) - + self._set_span_status(child_span, error) child_span.end(end_time=self._get_current_timestamp()) self.child_spans.pop(trace_id) @@ -269,6 +261,30 @@ class ArizePhoenixTracer(BaseTracer): if not self._ready: return + if self.root_span: + self.root_span.set_attribute(SpanAttributes.INPUT_VALUE, self.chat_input_value) + self.root_span.set_attribute(SpanAttributes.INPUT_MIME_TYPE, OpenInferenceMimeTypeValues.TEXT.value) + self.root_span.set_attribute(SpanAttributes.OUTPUT_VALUE, self.chat_output_value) + self.root_span.set_attribute(SpanAttributes.OUTPUT_MIME_TYPE, OpenInferenceMimeTypeValues.TEXT.value) + + processed_metadata = self._convert_to_arize_phoenix_types(metadata) if metadata else {} + if processed_metadata: + for key, value in processed_metadata.items(): + self.root_span.set_attribute(f"{SpanAttributes.METADATA}.{key}", value) + + self._set_span_status(self.root_span, error) + self.root_span.end() + + try: + from openinference.instrumentation.langchain import LangChainInstrumentor + + LangChainInstrumentor().uninstrument(tracer_provider=self.tracer_provider, skip_dep_check=True) + except ImportError: + logger.exception( + "Could not import LangChainInstrumentor." + "Please install it with `pip install openinference-instrumentation-langchain`." + ) + def _convert_to_arize_phoenix_types(self, io_dict: dict[str | Any, Any]) -> dict[str, Any]: """Converts data types to Arize/Phoenix compatible formats.""" return { @@ -319,6 +335,30 @@ class ArizePhoenixTracer(BaseTracer): """A convenience wrapper around `json.dumps` that ensures that any object can be safely encoded.""" return json.dumps(obj, default=str, ensure_ascii=False, **kwargs) + def _set_span_status(self, current_span: Span, error: Exception | None = None): + """Sets the status and attributes of the current span based on the presence of an error.""" + if error: + error_string = self._error_to_string(error) + current_span.set_status(Status(StatusCode.ERROR, error_string)) + current_span.set_attribute("error.message", error_string) + + if isinstance(error, Exception): + current_span.record_exception(error) + else: + exception_type = error.__class__.__name__ + exception_message = str(error) + if not exception_message: + exception_message = repr(error) + attributes: dict[str, AttributeValue] = { + OTELSpanAttributes.EXCEPTION_TYPE: exception_type, + OTELSpanAttributes.EXCEPTION_MESSAGE: exception_message, + OTELSpanAttributes.EXCEPTION_ESCAPED: False, + OTELSpanAttributes.EXCEPTION_STACKTRACE: error_string, + } + current_span.add_event(name="exception", attributes=attributes) + else: + current_span.set_status(Status(StatusCode.OK)) + def get_langchain_callback(self) -> BaseCallbackHandler | None: """Returns the LangChain callback handler if applicable.""" return None