feat: ArizePhoenixTracer v2 - Enhanced Session Tracking and Flow Organization (#5336)

Updated ArizePhoenixTracer
This commit is contained in:
Ali Saleh 2024-12-18 21:32:58 +05:00 committed by GitHub
commit 82080ebade
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -13,7 +13,7 @@ from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage
from loguru import logger
from openinference.semconv.trace import OpenInferenceMimeTypeValues, SpanAttributes
from opentelemetry.semconv.trace import SpanAttributes as OTELSpanAttributes
from opentelemetry.trace import Span, Status, StatusCode
from opentelemetry.trace import Span, Status, StatusCode, use_span
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from typing_extensions import override
@ -34,7 +34,10 @@ if TYPE_CHECKING:
class ArizePhoenixTracer(BaseTracer):
flow_name: str
flow_id: str
chat_input_value: str
chat_output_value: str
def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID):
"""Initializes the ArizePhoenixTracer instance and sets up a root span."""
@ -42,7 +45,10 @@ class ArizePhoenixTracer(BaseTracer):
self.trace_type = trace_type
self.project_name = project_name
self.trace_id = trace_id
self.flow_name = trace_name.split(" - ")[0]
self.flow_id = trace_name.split(" - ")[-1]
self.chat_input_value = ""
self.chat_output_value = ""
try:
self._ready = self.setup_arize_phoenix()
@ -53,12 +59,17 @@ class ArizePhoenixTracer(BaseTracer):
self.propagator = TraceContextTextMapPropagator()
self.carrier: dict[Any, CarrierT] = {}
with self.tracer.start_as_current_span(
self.root_span = self.tracer.start_span(
name=self.flow_id,
start_time=self._get_current_timestamp(),
) as root_span:
root_span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, self.trace_type)
root_span.set_status(Status(StatusCode.OK))
)
self.root_span.set_attribute(SpanAttributes.SESSION_ID, self.flow_id)
self.root_span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, self.trace_type)
self.root_span.set_attribute("langflow.project.name", self.project_name)
self.root_span.set_attribute("langflow.flow.name", self.flow_name)
self.root_span.set_attribute("langflow.flow.id", self.flow_id)
with use_span(self.root_span, end_on_exit=False):
self.propagator.inject(carrier=self.carrier)
self.child_spans: dict[str, Span] = {}
@ -118,7 +129,8 @@ class ArizePhoenixTracer(BaseTracer):
TracerProvider,
)
project_name = self.project_name or self.flow_id
name_without_space = self.flow_name.replace(" ", "-")
project_name = self.project_name if name_without_space == "None" else name_without_space
attributes = {PROJECT_NAME: project_name, "model_id": project_name}
resource = Resource.create(attributes=attributes)
tracer_provider = TracerProvider(resource=resource, verbose=False)
@ -187,11 +199,6 @@ class ArizePhoenixTracer(BaseTracer):
else:
child_span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, trace_type)
if "session_id" in inputs and len(inputs["session_id"]) > 0 and inputs["session_id"] != self.flow_id:
child_span.set_attribute(SpanAttributes.SESSION_ID, inputs["session_id"])
else:
child_span.set_attribute(SpanAttributes.SESSION_ID, self.flow_id)
processed_inputs = self._convert_to_arize_phoenix_types(inputs) if inputs else {}
if processed_inputs:
child_span.set_attribute(SpanAttributes.INPUT_VALUE, self._safe_json_dumps(processed_inputs))
@ -202,6 +209,12 @@ class ArizePhoenixTracer(BaseTracer):
for key, value in processed_metadata.items():
child_span.set_attribute(f"{SpanAttributes.METADATA}.{key}", value)
component_name = trace_id.split("-")[0]
if component_name == "ChatInput":
self.chat_input_value = processed_inputs["input_value"]
elif component_name == "ChatOutput":
self.chat_output_value = processed_inputs["input_value"]
self.child_spans[trace_id] = child_span
@override
@ -232,28 +245,7 @@ class ArizePhoenixTracer(BaseTracer):
for key, value in processed_logs.items():
child_span.set_attribute(f"logs.{key}", value)
if error:
error_string = self._error_to_string(error)
child_span.set_status(Status(StatusCode.ERROR, error_string))
child_span.set_attribute("error.message", error_string)
if isinstance(error, Exception):
child_span.record_exception(error)
else:
exception_type = error.__class__.__name__
exception_message = str(error)
if not exception_message:
exception_message = repr(error)
attributes: dict[str, AttributeValue] = {
OTELSpanAttributes.EXCEPTION_TYPE: exception_type,
OTELSpanAttributes.EXCEPTION_MESSAGE: exception_message,
OTELSpanAttributes.EXCEPTION_ESCAPED: False,
OTELSpanAttributes.EXCEPTION_STACKTRACE: error_string,
}
child_span.add_event(name="exception", attributes=attributes)
else:
child_span.set_status(Status(StatusCode.OK))
self._set_span_status(child_span, error)
child_span.end(end_time=self._get_current_timestamp())
self.child_spans.pop(trace_id)
@ -269,6 +261,30 @@ class ArizePhoenixTracer(BaseTracer):
if not self._ready:
return
if self.root_span:
self.root_span.set_attribute(SpanAttributes.INPUT_VALUE, self.chat_input_value)
self.root_span.set_attribute(SpanAttributes.INPUT_MIME_TYPE, OpenInferenceMimeTypeValues.TEXT.value)
self.root_span.set_attribute(SpanAttributes.OUTPUT_VALUE, self.chat_output_value)
self.root_span.set_attribute(SpanAttributes.OUTPUT_MIME_TYPE, OpenInferenceMimeTypeValues.TEXT.value)
processed_metadata = self._convert_to_arize_phoenix_types(metadata) if metadata else {}
if processed_metadata:
for key, value in processed_metadata.items():
self.root_span.set_attribute(f"{SpanAttributes.METADATA}.{key}", value)
self._set_span_status(self.root_span, error)
self.root_span.end()
try:
from openinference.instrumentation.langchain import LangChainInstrumentor
LangChainInstrumentor().uninstrument(tracer_provider=self.tracer_provider, skip_dep_check=True)
except ImportError:
logger.exception(
"Could not import LangChainInstrumentor."
"Please install it with `pip install openinference-instrumentation-langchain`."
)
def _convert_to_arize_phoenix_types(self, io_dict: dict[str | Any, Any]) -> dict[str, Any]:
"""Converts data types to Arize/Phoenix compatible formats."""
return {
@ -319,6 +335,30 @@ class ArizePhoenixTracer(BaseTracer):
"""A convenience wrapper around `json.dumps` that ensures that any object can be safely encoded."""
return json.dumps(obj, default=str, ensure_ascii=False, **kwargs)
def _set_span_status(self, current_span: Span, error: Exception | None = None):
"""Sets the status and attributes of the current span based on the presence of an error."""
if error:
error_string = self._error_to_string(error)
current_span.set_status(Status(StatusCode.ERROR, error_string))
current_span.set_attribute("error.message", error_string)
if isinstance(error, Exception):
current_span.record_exception(error)
else:
exception_type = error.__class__.__name__
exception_message = str(error)
if not exception_message:
exception_message = repr(error)
attributes: dict[str, AttributeValue] = {
OTELSpanAttributes.EXCEPTION_TYPE: exception_type,
OTELSpanAttributes.EXCEPTION_MESSAGE: exception_message,
OTELSpanAttributes.EXCEPTION_ESCAPED: False,
OTELSpanAttributes.EXCEPTION_STACKTRACE: error_string,
}
current_span.add_event(name="exception", attributes=attributes)
else:
current_span.set_status(Status(StatusCode.OK))
def get_langchain_callback(self) -> BaseCallbackHandler | None:
"""Returns the LangChain callback handler if applicable."""
return None