diff --git a/src/backend/langflow/api/v1/chat.py b/src/backend/langflow/api/v1/chat.py index d226df9bd..c7f344406 100644 --- a/src/backend/langflow/api/v1/chat.py +++ b/src/backend/langflow/api/v1/chat.py @@ -19,7 +19,7 @@ from langflow.api.v1.schemas import ( BuildStatus, BuiltResponse, InitResponse, - ResultDict, + ResultData, StreamData, VertexBuildResponse, VerticesOrderResponse, @@ -318,6 +318,7 @@ async def build_vertex( current_user=Depends(get_current_active_user), ): """Build a vertex instead of the entire graph.""" + start_time = time.perf_counter() try: start_time = time.perf_counter() cache = chat_service.get_cache(flow_id) @@ -327,7 +328,7 @@ async def build_vertex( f"No cache found for {flow_id}. Building graph starting at {vertex_id}" ) graph = build_and_cache_graph( - flow_id=flow_id, session=get_session(), chat_service=chat_service + flow_id=flow_id, session=next(get_session()), chat_service=chat_service ) else: graph = cache.get("result") @@ -345,13 +346,9 @@ async def build_vertex( # to the frontend vertex.set_artifacts() artifacts = vertex.artifacts - timedelta = time.perf_counter() - start_time - duration = format_elapsed_time(timedelta) - result_dict = ResultDict( + result_dict = ResultData( results=result_dict, artifacts=artifacts, - duration=duration, - timedelta=timedelta, ) vertex.set_result(result_dict) elif vertex.result is not None: @@ -364,7 +361,7 @@ async def build_vertex( except Exception as exc: params = repr(exc) valid = False - result_dict = ResultDict(results={}) + result_dict = ResultData(results={}) artifacts = {} # If there's an error building the vertex # we need to clear the cache @@ -377,6 +374,12 @@ async def build_vertex( data=result_dict, artifacts=artifacts, ) + + timedelta = time.perf_counter() - start_time + duration = format_elapsed_time(timedelta) + result_dict.duration = duration + result_dict.timedelta = timedelta + return VertexBuildResponse( valid=valid, params=params, diff --git a/src/backend/langflow/api/v1/schemas.py b/src/backend/langflow/api/v1/schemas.py index e84e65b51..228ee264f 100644 --- a/src/backend/langflow/api/v1/schemas.py +++ b/src/backend/langflow/api/v1/schemas.py @@ -222,7 +222,7 @@ class VerticesOrderResponse(BaseModel): ids: List[List[str]] -class ResultDict(BaseModel): +class ResultData(BaseModel): results: Optional[Any] = Field(default_factory=dict) artifacts: Optional[Any] = Field(default_factory=dict) timedelta: Optional[float] = None @@ -240,7 +240,7 @@ class VertexBuildResponse(BaseModel): valid: bool params: Optional[str] """JSON string of the params.""" - data: ResultDict + data: ResultData """Mapping of vertex ids to result dict containing the param name and result value.""" timestamp: Optional[datetime] = Field(default_factory=datetime.utcnow) """Timestamp of the build.""" diff --git a/src/backend/langflow/graph/graph/base.py b/src/backend/langflow/graph/graph/base.py index 2c1d7eaf3..c52991632 100644 --- a/src/backend/langflow/graph/graph/base.py +++ b/src/backend/langflow/graph/graph/base.py @@ -417,6 +417,7 @@ class Graph: if edge not in edges: edges.append(edge) vertices = self.layered_topological_sort(vertices, edges) + vertices = self.sort_interface_components_first(vertices) return self.sort_chat_inputs_first(vertices) def layered_topological_sort( @@ -522,11 +523,23 @@ class Graph: # and sort the layers accordingly # InterfaceComponentTypes is an enum # check all values of the enum and sort the layers - for layer in vertices: - layer.sort( - key=lambda x: any( - comp_type.value in x - for comp_type in InterfaceComponentTypes.__members__.values() - ) - ) + vertices = self.sort_interface_components_first(vertices) return self.sort_chat_inputs_first(vertices) + + def sort_interface_components_first(self, vertices: List[Vertex]) -> List[Vertex]: + """Sorts the vertices in the graph so that vertices containing ChatInput or ChatOutput come first.""" + + def contains_interface_component(vertex): + return any( + component.value in vertex for component in InterfaceComponentTypes + ) + + # Sort each inner list so that vertices containing ChatInput or ChatOutput come first + sorted_vertices = [ + sorted( + inner_list, + key=lambda vertex: not contains_interface_component(vertex), + ) + for inner_list in vertices + ] + return sorted_vertices diff --git a/src/backend/langflow/graph/vertex/base.py b/src/backend/langflow/graph/vertex/base.py index 01d182142..c43b8fd0d 100644 --- a/src/backend/langflow/graph/vertex/base.py +++ b/src/backend/langflow/graph/vertex/base.py @@ -14,7 +14,7 @@ from langflow.utils.util import sync_to_async from loguru import logger if TYPE_CHECKING: - from langflow.api.v1.schemas import ResultDict + from langflow.api.v1.schemas import ResultData from langflow.graph.edge.base import ContractEdge from langflow.graph.graph.base import Graph @@ -50,7 +50,7 @@ class Vertex: self.parent_is_top_level = False self.layer = None self.should_run = True - self.result: Optional["ResultDict"] = None + self.result: Optional["ResultData"] = None try: self.is_interface_component = InterfaceComponentTypes(self.vertex_type) except ValueError: @@ -81,7 +81,7 @@ class Vertex: ) return edge_results - def set_result(self, result: "ResultDict") -> None: + def set_result(self, result: "ResultData") -> None: self.result = result def get_built_result(self): @@ -306,7 +306,7 @@ class Vertex: params.pop(key, None) # Add _type to params self.params = params - self._raw_params = params + self._raw_params = params.copy() async def _build(self, user_id=None): """ diff --git a/src/backend/langflow/services/monitor/utils.py b/src/backend/langflow/services/monitor/utils.py index 5324700a7..6ebba74eb 100644 --- a/src/backend/langflow/services/monitor/utils.py +++ b/src/backend/langflow/services/monitor/utils.py @@ -1,13 +1,12 @@ from typing import TYPE_CHECKING, Any, Dict, Optional, Type import duckdb +from langflow.services.deps import get_monitor_service from loguru import logger from pydantic import BaseModel -from langflow.services.deps import get_monitor_service - if TYPE_CHECKING: - from langflow.api.v1.schemas import ResultDict + from langflow.api.v1.schemas import ResultData INDEX_KEY = "index" @@ -44,7 +43,9 @@ def model_to_sql_column_definitions(model: Type[BaseModel]) -> dict: return columns -def drop_and_create_table_if_schema_mismatch(db_path: str, table_name: str, model: Type[BaseModel]): +def drop_and_create_table_if_schema_mismatch( + db_path: str, table_name: str, model: Type[BaseModel] +): with duckdb.connect(db_path) as conn: # Get the current schema from the database try: @@ -64,8 +65,12 @@ def drop_and_create_table_if_schema_mismatch(db_path: str, table_name: str, mode conn.execute(f"CREATE SEQUENCE seq_{table_name} START 1;") except duckdb.CatalogException: pass - desired_schema[INDEX_KEY] = f"INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq_{table_name}')" - columns_sql = ", ".join(f"{name} {data_type}" for name, data_type in desired_schema.items()) + desired_schema[INDEX_KEY] = ( + f"INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq_{table_name}')" + ) + columns_sql = ", ".join( + f"{name} {data_type}" for name, data_type in desired_schema.items() + ) create_table_sql = f"CREATE TABLE {table_name} ({columns_sql})" conn.execute(create_table_sql) @@ -133,7 +138,7 @@ async def log_vertex_build( vertex_id: str, valid: bool, params: Any, - data: "ResultDict", + data: "ResultData", artifacts: Optional[dict] = None, ): try: diff --git a/src/backend/langflow/services/socket/utils.py b/src/backend/langflow/services/socket/utils.py index 03b609919..38f1cf82b 100644 --- a/src/backend/langflow/services/socket/utils.py +++ b/src/backend/langflow/services/socket/utils.py @@ -3,7 +3,7 @@ from typing import Callable import socketio from langflow.api.utils import format_elapsed_time -from langflow.api.v1.schemas import ResultDict, VertexBuildResponse +from langflow.api.v1.schemas import ResultData, VertexBuildResponse from langflow.graph.graph.base import Graph from langflow.graph.vertex.base import StatelessVertex from langflow.services.database.models.flow.model import Flow @@ -73,11 +73,16 @@ async def build_vertex( artifacts = vertex.artifacts timedelta = time.perf_counter() - start_time duration = format_elapsed_time(timedelta) - result_dict = ResultDict(results=result_dict, artifacts=artifacts, duration=duration, timedelta=timedelta) + result_dict = ResultData( + results=result_dict, + artifacts=artifacts, + duration=duration, + timedelta=timedelta, + ) except Exception as exc: params = str(exc) valid = False - result_dict = ResultDict(results={}) + result_dict = ResultData(results={}) artifacts = {} set_cache(flow_id, graph) await log_vertex_build( @@ -90,7 +95,9 @@ async def build_vertex( ) # Emit the vertex build response - response = VertexBuildResponse(valid=valid, params=params, id=vertex.id, data=result_dict) + response = VertexBuildResponse( + valid=valid, params=params, id=vertex.id, data=result_dict + ) await sio.emit("vertex_build", data=response.model_dump(), to=sid) except Exception as exc: diff --git a/src/frontend/src/controllers/API/api.tsx b/src/frontend/src/controllers/API/api.tsx index c6786cc1c..b659333a1 100644 --- a/src/frontend/src/controllers/API/api.tsx +++ b/src/frontend/src/controllers/API/api.tsx @@ -3,8 +3,10 @@ import { useContext, useEffect } from "react"; import { Cookies } from "react-cookie"; import { useNavigate } from "react-router-dom"; import { renewAccessToken } from "."; +import { BuildStatus } from "../../constants/enums"; import { AuthContext } from "../../contexts/authContext"; import useAlertStore from "../../stores/alertStore"; +import useFlowStore from "../../stores/flowStore"; // Create a new Axios instance const api: AxiosInstance = axios.create({ @@ -24,45 +26,20 @@ function ApiInterceptor() { async (error: AxiosError) => { if (error.response?.status === 401) { const accessToken = cookies.get("access_token_lf"); + if (accessToken && !autoLogin) { - authenticationErrorCount = authenticationErrorCount + 1; - if (authenticationErrorCount > 3) { - authenticationErrorCount = 0; - logout(); - } - try { - const res = await renewAccessToken(); - if (res?.data?.access_token && res?.data?.refresh_token) { - login(res?.data?.access_token); - } - if (error?.config?.headers) { - delete error.config.headers["Authorization"]; - error.config.headers["Authorization"] = `Bearer ${cookies.get( - "access_token_lf" - )}`; - const response = await axios.request(error.config); - return response; - } - } catch (error) { - if (axios.isAxiosError(error) && error.response?.status === 401) { - logout(); - } else { - console.error(error); - logout(); - } - } + checkErrorCount(); + await tryToRenewAccessToken(error); } if (!accessToken && error?.config?.url?.includes("login")) { return Promise.reject(error); - } else { - logout(); } - } else { - // if (URL_EXCLUDED_FROM_ERROR_RETRIES.includes(error.config?.url)) { - return Promise.reject(error); - // } + + return logout(); } + await clearBuildVerticesState(error); + return Promise.reject(error); } ); @@ -115,6 +92,41 @@ function ApiInterceptor() { }; }, [accessToken, setErrorData]); + function checkErrorCount() { + authenticationErrorCount = authenticationErrorCount + 1; + + if (authenticationErrorCount > 3) { + authenticationErrorCount = 0; + logout(); + } + } + + async function tryToRenewAccessToken(error: AxiosError) { + try { + const res = await renewAccessToken(); + if (res?.data?.access_token && res?.data?.refresh_token) { + login(res?.data?.access_token); + } + if (error?.config?.headers) { + delete error.config.headers["Authorization"]; + error.config.headers["Authorization"] = `Bearer ${cookies.get( + "access_token_lf" + )}`; + const response = await axios.request(error.config); + return response; + } + } catch (error) { + logout(); + } + } + + async function clearBuildVerticesState(error) { + if (error?.response?.status === 500) { + const vertices = useFlowStore.getState().verticesBuild; + useFlowStore.getState().updateBuildStatus(vertices, BuildStatus.BUILT); + } + } + return null; } diff --git a/src/frontend/src/stores/flowStore.ts b/src/frontend/src/stores/flowStore.ts index ea424d6fb..398b2b3f3 100644 --- a/src/frontend/src/stores/flowStore.ts +++ b/src/frontend/src/stores/flowStore.ts @@ -431,6 +431,10 @@ const useFlowStore = create((set, get) => ({ } }); }, + updateVerticesBuild: (vertices: string[]) => { + set({ verticesBuild: vertices }); + }, + verticesBuild: [], })); export default useFlowStore; diff --git a/src/frontend/src/types/zustand/flow/index.ts b/src/frontend/src/types/zustand/flow/index.ts index 429379155..f0bad1578 100644 --- a/src/frontend/src/types/zustand/flow/index.ts +++ b/src/frontend/src/types/zustand/flow/index.ts @@ -87,4 +87,6 @@ export type FlowStoreType = { buildFlow: (nodeId?: string) => Promise; getFlow: () => { nodes: Node[]; edges: Edge[]; viewport: Viewport }; updateBuildStatus: (nodeId: string[], status: BuildStatus) => void; + updateVerticesBuild: (vertices: string[]) => void; + verticesBuild: string[]; }; diff --git a/src/frontend/src/utils/buildUtils.ts b/src/frontend/src/utils/buildUtils.ts index 854a86f24..638d35886 100644 --- a/src/frontend/src/utils/buildUtils.ts +++ b/src/frontend/src/utils/buildUtils.ts @@ -47,50 +47,59 @@ export async function buildVertices({ const verticesIds = vertices.flatMap((v) => v); useFlowStore.getState().updateBuildStatus(verticesIds, BuildStatus.TO_BUILD); + useFlowStore.getState().updateVerticesBuild(verticesIds); // Set each vertex state to building const buildResults: Array = []; for (let i = 0; i < vertices.length; i += 1) { if (onBuildStart) onBuildStart(vertices[i]); - await Promise.all( - vertices[i].map(async (id) => { - try { - // Set vertex state to building - const buildRes = await postBuildVertex(flowId, id); - const buildData: VertexBuildTypeAPI = buildRes.data; - if (onBuildUpdate) { - let data = {}; - if (!buildData.valid) { - if (onBuildError) { - onBuildError( - "Error Building Component", - [buildData.params], - verticesIds - ); - } - } - data[buildData.id] = buildData; - onBuildUpdate({ data, id: buildData.id }); - } - buildResults.push(buildData.valid); - } catch (error) { - if (onBuildError) { - console.log(error); - onBuildError( - "Error Building Component", - [ - (error as AxiosError).response?.data?.detail ?? - "Unknown Error", - ], - verticesIds - ); - } - } - }) - ); + for (const id of vertices[i]) { + buildVertex( + flowId, + id, + onBuildUpdate, + onBuildError, + verticesIds, + buildResults + ); + } } + if (onBuildComplete) { const allNodesValid = buildResults.every((result) => result); onBuildComplete(allNodesValid); } } + +async function buildVertex( + flowId, + id, + onBuildUpdate, + onBuildError, + verticesIds, + buildResults +) { + try { + const buildRes = await postBuildVertex(flowId, id); + const buildData: VertexBuildTypeAPI = buildRes.data; + if (onBuildUpdate) { + let data = {}; + if (!buildData.valid) { + onBuildError!( + "Error Building Component", + [buildData.params], + verticesIds + ); + } + data[buildData.id] = buildData; + onBuildUpdate({ data, id: buildData.id }); + } + buildResults.push(buildData.valid); + } catch (error) { + onBuildError!( + "Error Building Component", + [(error as AxiosError).response?.data?.detail ?? "Unknown Error"], + verticesIds + ); + } +}