(utils.py): Add functionality to trace runs

🔧 (chat.py): Remove redundant setting of run_id in retrieve_vertices_order function
🔧 (chat.py): Remove setting of run_id in build_vertex function and add end_all_traces method call when no next_runnable_vertices
🔧 (base.py): Add logic to set run_name based on flow name and flow id, and end all traces with metadata in Graph class
This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-06-19 19:00:35 -03:00
commit a6e9972f4b
3 changed files with 58 additions and 11 deletions

View file

@ -1,3 +1,4 @@
import uuid
import warnings
from pathlib import Path
from typing import TYPE_CHECKING, Optional
@ -210,13 +211,17 @@ async def build_graph_from_db(flow_id: str, session: Session, chat_service: "Cha
flow: Optional[Flow] = session.get(Flow, flow_id)
if not flow or not flow.data:
raise ValueError("Invalid flow ID")
graph = Graph.from_payload(flow.data, flow_id)
graph = Graph.from_payload(flow.data, flow_id, flow_name=flow.name, user_id=flow.user_id)
for vertex_id in graph._has_session_id_vertices:
vertex = graph.get_vertex(vertex_id)
if vertex is None:
raise ValueError(f"Vertex {vertex_id} not found")
if not vertex._raw_params.get("session_id"):
vertex.update_raw_params({"session_id": flow_id}, overwrite=True)
run_id = uuid.uuid4()
graph.set_run_id(run_id)
graph.set_run_name()
await chat_service.set_cache(flow_id, graph)
return graph

View file

@ -108,11 +108,9 @@ async def retrieve_vertices_order(
# Now vertices is a list of lists
# We need to get the id of each vertex
# and return the same structure but only with the ids
run_id = uuid.uuid4()
graph.set_run_id(run_id)
vertices_to_run = list(graph.vertices_to_run) + get_top_level_vertices(graph, graph.vertices_to_run)
await chat_service.set_cache(str(flow_id), graph)
return VerticesOrderResponse(ids=first_layer, run_id=run_id, vertices_to_run=vertices_to_run)
return VerticesOrderResponse(ids=first_layer, run_id=graph._run_id, vertices_to_run=vertices_to_run)
except Exception as exc:
if "stream or streaming set to True" in str(exc):
@ -204,6 +202,7 @@ async def build_vertex(
logs = {output_label: Log(message=message, type="error")}
result_data_response = ResultDataResponse(results={}, logs=logs)
artifacts = {}
graph.end_all_traces(outputs=message)
# If there's an error building the vertex
# we need to clear the cache
await chat_service.clear_cache(flow_id_str)
@ -240,6 +239,9 @@ async def build_vertex(
if graph.stop_vertex and graph.stop_vertex in next_runnable_vertices:
next_runnable_vertices = [graph.stop_vertex]
if not next_runnable_vertices:
graph.end_all_traces()
build_response = VertexBuildResponse(
inactivated_vertices=inactivated_vertices,
next_vertices_ids=next_runnable_vertices,

View file

@ -1,9 +1,10 @@
import asyncio
import uuid
from collections import defaultdict, deque
from datetime import datetime, timezone
from functools import partial
from itertools import chain
from typing import TYPE_CHECKING, Dict, Generator, List, Optional, Tuple, Type, Union
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Tuple, Type, Union
from loguru import logger
@ -20,7 +21,7 @@ from langflow.schema import Data
from langflow.schema.schema import INPUT_FIELD_NAME, InputType
from langflow.services.cache.utils import CacheMiss
from langflow.services.chat.service import ChatService
from langflow.services.deps import get_chat_service
from langflow.services.deps import get_chat_service, get_tracing_service
from langflow.services.monitor.utils import log_transaction
if TYPE_CHECKING:
@ -35,6 +36,7 @@ class Graph:
nodes: List[Dict],
edges: List[Dict[str, str]],
flow_id: Optional[str] = None,
flow_name: Optional[str] = None,
user_id: Optional[str] = None,
) -> None:
"""
@ -51,6 +53,7 @@ class Graph:
self._runs = 0
self._updates = 0
self.flow_id = flow_id
self.flow_name = flow_name
self.user_id = user_id
self._is_input_vertices: List[str] = []
self._is_output_vertices: List[str] = []
@ -58,6 +61,7 @@ class Graph:
self._has_session_id_vertices: List[str] = []
self._sorted_vertices_layers: List[List[str]] = []
self._run_id = ""
self._start_time = datetime.now(timezone.utc)
self.top_level_vertices = []
for vertex in self._vertices:
@ -81,6 +85,11 @@ class Graph:
self.build_graph_maps(self.edges)
self.define_vertices_lists()
self.state_manager = GraphStateManager()
try:
self.tracing_service = get_tracing_service()
except Exception as exc:
logger.error(f"Error getting tracing service: {exc}")
self.tracing_service = None
def get_state(self, name: str) -> Optional[Data]:
"""
@ -214,6 +223,24 @@ class Graph:
for vertex in self.vertices:
self.state_manager.subscribe(run_id, vertex.update_graph_state)
self._run_id = run_id
self.tracing_service.set_run_id(run_id)
def set_run_name(self):
# Given a flow name, flow_id
if not self.tracing_service:
return
name = f"{self.flow_name} - {self.flow_id}"
self.tracing_service.set_run_name(name)
self.tracing_service.initialize_tracers()
def end_all_traces(self, outputs: dict[str, Any] | None = None):
if not self.tracing_service:
return
self._end_time = datetime.now(timezone.utc)
if outputs is None:
outputs = {}
outputs |= self.metadata
self.tracing_service.end(outputs)
@property
def sorted_vertices_layers(self) -> List[List[str]]:
@ -442,10 +469,13 @@ class Graph:
Returns:
dict: The metadata of the graph.
"""
time_format = "%Y-%m-%d %H:%M:%S"
return {
"runs": self._runs,
"updates": self._updates,
"inactivated_vertices": self.inactivated_vertices,
"start_time": self._start_time.strftime(time_format),
"end_time": self._end_time.strftime(time_format),
"time_elapsed": f"{(self._end_time - self._start_time).total_seconds()} seconds",
"flow_id": self.flow_id,
"flow_name": self.flow_name,
}
def build_graph_maps(self, edges: Optional[List[ContractEdge]] = None, vertices: Optional[List[Vertex]] = None):
@ -553,9 +583,18 @@ class Graph:
state["run_manager"] = RunnableVerticesManager.from_dict(run_manager)
self.__dict__.update(state)
self.state_manager = GraphStateManager()
self.tracing_service = get_tracing_service()
self.set_run_id(self._run_id)
self.set_run_name()
@classmethod
def from_payload(cls, payload: Dict, flow_id: Optional[str] = None, user_id: Optional[str] = None) -> "Graph":
def from_payload(
cls,
payload: Dict,
flow_id: Optional[str] = None,
flow_name: Optional[str] = None,
user_id: Optional[str] = None,
) -> "Graph":
"""
Creates a graph from a payload.
@ -570,7 +609,7 @@ class Graph:
try:
vertices = payload["nodes"]
edges = payload["edges"]
return cls(vertices, edges, flow_id, user_id)
return cls(vertices, edges, flow_id, flow_name, user_id)
except KeyError as exc:
logger.exception(exc)
if "nodes" not in payload and "edges" not in payload:
@ -880,6 +919,7 @@ class Graph:
chat_service = get_chat_service()
run_id = uuid.uuid4()
self.set_run_id(run_id)
self.set_run_name()
lock = chat_service._cache_locks[self.run_id]
while to_process:
current_batch = list(to_process) # Copy current deque items to a list