diff --git a/src/backend/langflow/api/v1/chat.py b/src/backend/langflow/api/v1/chat.py index e2738c0f1..b700109b4 100644 --- a/src/backend/langflow/api/v1/chat.py +++ b/src/backend/langflow/api/v1/chat.py @@ -20,7 +20,7 @@ from langflow.api.v1.schemas import ( BuildStatus, BuiltResponse, InitResponse, - ResultDict, + ResultData, StreamData, VertexBuildResponse, VerticesOrderResponse, @@ -322,6 +322,7 @@ async def build_vertex( inputs: dict = Body(None), ): """Build a vertex instead of the entire graph.""" + start_time = time.perf_counter() try: cache = chat_service.get_cache(flow_id) if not cache: @@ -336,7 +337,6 @@ async def build_vertex( graph = cache.get("result") result_dict = {} duration = "" - start_time = time.perf_counter() if tweaks: graph = process_tweaks_on_graph(graph, tweaks) if not (vertex := graph.get_vertex(vertex_id)): @@ -351,13 +351,9 @@ async def build_vertex( # to the frontend vertex.set_artifacts() artifacts = vertex.artifacts - timedelta = time.perf_counter() - start_time - duration = format_elapsed_time(timedelta) - result_dict = ResultDict( + result_dict = ResultData( results=result_dict, artifacts=artifacts, - duration=duration, - timedelta=timedelta, ) vertex.set_result(result_dict) elif vertex.result is not None: @@ -370,7 +366,7 @@ async def build_vertex( except Exception as exc: params = str(exc) valid = False - result_dict = ResultDict(results={}) + result_dict = ResultData(results={}) artifacts = {} # If there's an error building the vertex # we need to clear the cache @@ -383,6 +379,12 @@ async def build_vertex( data=result_dict, artifacts=artifacts, ) + + timedelta = time.perf_counter() - start_time + duration = format_elapsed_time(timedelta) + result_dict.duration = duration + result_dict.timedelta = timedelta + return VertexBuildResponse( valid=valid, params=params, diff --git a/src/backend/langflow/api/v1/schemas.py b/src/backend/langflow/api/v1/schemas.py index e84e65b51..228ee264f 100644 --- a/src/backend/langflow/api/v1/schemas.py +++ b/src/backend/langflow/api/v1/schemas.py @@ -222,7 +222,7 @@ class VerticesOrderResponse(BaseModel): ids: List[List[str]] -class ResultDict(BaseModel): +class ResultData(BaseModel): results: Optional[Any] = Field(default_factory=dict) artifacts: Optional[Any] = Field(default_factory=dict) timedelta: Optional[float] = None @@ -240,7 +240,7 @@ class VertexBuildResponse(BaseModel): valid: bool params: Optional[str] """JSON string of the params.""" - data: ResultDict + data: ResultData """Mapping of vertex ids to result dict containing the param name and result value.""" timestamp: Optional[datetime] = Field(default_factory=datetime.utcnow) """Timestamp of the build.""" diff --git a/src/backend/langflow/graph/vertex/base.py b/src/backend/langflow/graph/vertex/base.py index a5378b95e..c43b8fd0d 100644 --- a/src/backend/langflow/graph/vertex/base.py +++ b/src/backend/langflow/graph/vertex/base.py @@ -14,7 +14,7 @@ from langflow.utils.util import sync_to_async from loguru import logger if TYPE_CHECKING: - from langflow.api.v1.schemas import ResultDict + from langflow.api.v1.schemas import ResultData from langflow.graph.edge.base import ContractEdge from langflow.graph.graph.base import Graph @@ -50,7 +50,7 @@ class Vertex: self.parent_is_top_level = False self.layer = None self.should_run = True - self.result: Optional["ResultDict"] = None + self.result: Optional["ResultData"] = None try: self.is_interface_component = InterfaceComponentTypes(self.vertex_type) except ValueError: @@ -81,7 +81,7 @@ class Vertex: ) return edge_results - def set_result(self, result: "ResultDict") -> None: + def set_result(self, result: "ResultData") -> None: self.result = result def get_built_result(self): diff --git a/src/backend/langflow/services/monitor/utils.py b/src/backend/langflow/services/monitor/utils.py index 5324700a7..6ebba74eb 100644 --- a/src/backend/langflow/services/monitor/utils.py +++ b/src/backend/langflow/services/monitor/utils.py @@ -1,13 +1,12 @@ from typing import TYPE_CHECKING, Any, Dict, Optional, Type import duckdb +from langflow.services.deps import get_monitor_service from loguru import logger from pydantic import BaseModel -from langflow.services.deps import get_monitor_service - if TYPE_CHECKING: - from langflow.api.v1.schemas import ResultDict + from langflow.api.v1.schemas import ResultData INDEX_KEY = "index" @@ -44,7 +43,9 @@ def model_to_sql_column_definitions(model: Type[BaseModel]) -> dict: return columns -def drop_and_create_table_if_schema_mismatch(db_path: str, table_name: str, model: Type[BaseModel]): +def drop_and_create_table_if_schema_mismatch( + db_path: str, table_name: str, model: Type[BaseModel] +): with duckdb.connect(db_path) as conn: # Get the current schema from the database try: @@ -64,8 +65,12 @@ def drop_and_create_table_if_schema_mismatch(db_path: str, table_name: str, mode conn.execute(f"CREATE SEQUENCE seq_{table_name} START 1;") except duckdb.CatalogException: pass - desired_schema[INDEX_KEY] = f"INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq_{table_name}')" - columns_sql = ", ".join(f"{name} {data_type}" for name, data_type in desired_schema.items()) + desired_schema[INDEX_KEY] = ( + f"INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq_{table_name}')" + ) + columns_sql = ", ".join( + f"{name} {data_type}" for name, data_type in desired_schema.items() + ) create_table_sql = f"CREATE TABLE {table_name} ({columns_sql})" conn.execute(create_table_sql) @@ -133,7 +138,7 @@ async def log_vertex_build( vertex_id: str, valid: bool, params: Any, - data: "ResultDict", + data: "ResultData", artifacts: Optional[dict] = None, ): try: diff --git a/src/backend/langflow/services/socket/utils.py b/src/backend/langflow/services/socket/utils.py index 03b609919..38f1cf82b 100644 --- a/src/backend/langflow/services/socket/utils.py +++ b/src/backend/langflow/services/socket/utils.py @@ -3,7 +3,7 @@ from typing import Callable import socketio from langflow.api.utils import format_elapsed_time -from langflow.api.v1.schemas import ResultDict, VertexBuildResponse +from langflow.api.v1.schemas import ResultData, VertexBuildResponse from langflow.graph.graph.base import Graph from langflow.graph.vertex.base import StatelessVertex from langflow.services.database.models.flow.model import Flow @@ -73,11 +73,16 @@ async def build_vertex( artifacts = vertex.artifacts timedelta = time.perf_counter() - start_time duration = format_elapsed_time(timedelta) - result_dict = ResultDict(results=result_dict, artifacts=artifacts, duration=duration, timedelta=timedelta) + result_dict = ResultData( + results=result_dict, + artifacts=artifacts, + duration=duration, + timedelta=timedelta, + ) except Exception as exc: params = str(exc) valid = False - result_dict = ResultDict(results={}) + result_dict = ResultData(results={}) artifacts = {} set_cache(flow_id, graph) await log_vertex_build( @@ -90,7 +95,9 @@ async def build_vertex( ) # Emit the vertex build response - response = VertexBuildResponse(valid=valid, params=params, id=vertex.id, data=result_dict) + response = VertexBuildResponse( + valid=valid, params=params, id=vertex.id, data=result_dict + ) await sio.emit("vertex_build", data=response.model_dump(), to=sid) except Exception as exc: