diff --git a/src/backend/base/langflow/api/utils.py b/src/backend/base/langflow/api/utils.py index 099df66c5..9aa145aa0 100644 --- a/src/backend/base/langflow/api/utils.py +++ b/src/backend/base/langflow/api/utils.py @@ -1,3 +1,4 @@ +import uuid import warnings from pathlib import Path from typing import TYPE_CHECKING, Optional @@ -210,13 +211,17 @@ async def build_graph_from_db(flow_id: str, session: Session, chat_service: "Cha flow: Optional[Flow] = session.get(Flow, flow_id) if not flow or not flow.data: raise ValueError("Invalid flow ID") - graph = Graph.from_payload(flow.data, flow_id) + graph = Graph.from_payload(flow.data, flow_id, flow_name=flow.name, user_id=flow.user_id) for vertex_id in graph._has_session_id_vertices: vertex = graph.get_vertex(vertex_id) if vertex is None: raise ValueError(f"Vertex {vertex_id} not found") if not vertex._raw_params.get("session_id"): vertex.update_raw_params({"session_id": flow_id}, overwrite=True) + + run_id = uuid.uuid4() + graph.set_run_id(run_id) + graph.set_run_name() await chat_service.set_cache(flow_id, graph) return graph diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index 9dd911ca4..3f92b8a9a 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -108,11 +108,9 @@ async def retrieve_vertices_order( # Now vertices is a list of lists # We need to get the id of each vertex # and return the same structure but only with the ids - run_id = uuid.uuid4() - graph.set_run_id(run_id) vertices_to_run = list(graph.vertices_to_run) + get_top_level_vertices(graph, graph.vertices_to_run) await chat_service.set_cache(str(flow_id), graph) - return VerticesOrderResponse(ids=first_layer, run_id=run_id, vertices_to_run=vertices_to_run) + return VerticesOrderResponse(ids=first_layer, run_id=graph._run_id, vertices_to_run=vertices_to_run) except Exception as exc: if "stream or streaming set to True" in str(exc): @@ -204,6 +202,7 @@ async def build_vertex( logs = {output_label: Log(message=message, type="error")} result_data_response = ResultDataResponse(results={}, logs=logs) artifacts = {} + graph.end_all_traces(outputs=message) # If there's an error building the vertex # we need to clear the cache await chat_service.clear_cache(flow_id_str) @@ -240,6 +239,9 @@ async def build_vertex( if graph.stop_vertex and graph.stop_vertex in next_runnable_vertices: next_runnable_vertices = [graph.stop_vertex] + if not next_runnable_vertices: + graph.end_all_traces() + build_response = VertexBuildResponse( inactivated_vertices=inactivated_vertices, next_vertices_ids=next_runnable_vertices, diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index a0e4d9a24..b0fbc0ab2 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -1,9 +1,10 @@ import asyncio import uuid from collections import defaultdict, deque +from datetime import datetime, timezone from functools import partial from itertools import chain -from typing import TYPE_CHECKING, Dict, Generator, List, Optional, Tuple, Type, Union +from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Tuple, Type, Union from loguru import logger @@ -20,7 +21,7 @@ from langflow.schema import Data from langflow.schema.schema import INPUT_FIELD_NAME, InputType from langflow.services.cache.utils import CacheMiss from langflow.services.chat.service import ChatService -from langflow.services.deps import get_chat_service +from langflow.services.deps import get_chat_service, get_tracing_service from langflow.services.monitor.utils import log_transaction if TYPE_CHECKING: @@ -35,6 +36,7 @@ class Graph: nodes: List[Dict], edges: List[Dict[str, str]], flow_id: Optional[str] = None, + flow_name: Optional[str] = None, user_id: Optional[str] = None, ) -> None: """ @@ -51,6 +53,7 @@ class Graph: self._runs = 0 self._updates = 0 self.flow_id = flow_id + self.flow_name = flow_name self.user_id = user_id self._is_input_vertices: List[str] = [] self._is_output_vertices: List[str] = [] @@ -58,6 +61,7 @@ class Graph: self._has_session_id_vertices: List[str] = [] self._sorted_vertices_layers: List[List[str]] = [] self._run_id = "" + self._start_time = datetime.now(timezone.utc) self.top_level_vertices = [] for vertex in self._vertices: @@ -81,6 +85,11 @@ class Graph: self.build_graph_maps(self.edges) self.define_vertices_lists() self.state_manager = GraphStateManager() + try: + self.tracing_service = get_tracing_service() + except Exception as exc: + logger.error(f"Error getting tracing service: {exc}") + self.tracing_service = None def get_state(self, name: str) -> Optional[Data]: """ @@ -214,6 +223,24 @@ class Graph: for vertex in self.vertices: self.state_manager.subscribe(run_id, vertex.update_graph_state) self._run_id = run_id + self.tracing_service.set_run_id(run_id) + + def set_run_name(self): + # Given a flow name, flow_id + if not self.tracing_service: + return + name = f"{self.flow_name} - {self.flow_id}" + self.tracing_service.set_run_name(name) + self.tracing_service.initialize_tracers() + + def end_all_traces(self, outputs: dict[str, Any] | None = None): + if not self.tracing_service: + return + self._end_time = datetime.now(timezone.utc) + if outputs is None: + outputs = {} + outputs |= self.metadata + self.tracing_service.end(outputs) @property def sorted_vertices_layers(self) -> List[List[str]]: @@ -442,10 +469,13 @@ class Graph: Returns: dict: The metadata of the graph. """ + time_format = "%Y-%m-%d %H:%M:%S" return { - "runs": self._runs, - "updates": self._updates, - "inactivated_vertices": self.inactivated_vertices, + "start_time": self._start_time.strftime(time_format), + "end_time": self._end_time.strftime(time_format), + "time_elapsed": f"{(self._end_time - self._start_time).total_seconds()} seconds", + "flow_id": self.flow_id, + "flow_name": self.flow_name, } def build_graph_maps(self, edges: Optional[List[ContractEdge]] = None, vertices: Optional[List[Vertex]] = None): @@ -553,9 +583,18 @@ class Graph: state["run_manager"] = RunnableVerticesManager.from_dict(run_manager) self.__dict__.update(state) self.state_manager = GraphStateManager() + self.tracing_service = get_tracing_service() + self.set_run_id(self._run_id) + self.set_run_name() @classmethod - def from_payload(cls, payload: Dict, flow_id: Optional[str] = None, user_id: Optional[str] = None) -> "Graph": + def from_payload( + cls, + payload: Dict, + flow_id: Optional[str] = None, + flow_name: Optional[str] = None, + user_id: Optional[str] = None, + ) -> "Graph": """ Creates a graph from a payload. @@ -570,7 +609,7 @@ class Graph: try: vertices = payload["nodes"] edges = payload["edges"] - return cls(vertices, edges, flow_id, user_id) + return cls(vertices, edges, flow_id, flow_name, user_id) except KeyError as exc: logger.exception(exc) if "nodes" not in payload and "edges" not in payload: @@ -880,6 +919,7 @@ class Graph: chat_service = get_chat_service() run_id = uuid.uuid4() self.set_run_id(run_id) + self.set_run_name() lock = chat_service._cache_locks[self.run_id] while to_process: current_batch = list(to_process) # Copy current deque items to a list