Bugfix: add build status on backend shutdown (#1451)

This pull request introduces a new feature aimed at updating the build
flow status for each vertex when the backend is shut down. Additionally,
a minor refactor has been implemented in the interceptor to enhance code
readability.
This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-02-20 20:38:40 -03:00 committed by GitHub
commit 4cfb09a69a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 155 additions and 100 deletions

View file

@ -19,7 +19,7 @@ from langflow.api.v1.schemas import (
BuildStatus,
BuiltResponse,
InitResponse,
ResultDict,
ResultData,
StreamData,
VertexBuildResponse,
VerticesOrderResponse,
@ -318,6 +318,7 @@ async def build_vertex(
current_user=Depends(get_current_active_user),
):
"""Build a vertex instead of the entire graph."""
start_time = time.perf_counter()
try:
start_time = time.perf_counter()
cache = chat_service.get_cache(flow_id)
@ -327,7 +328,7 @@ async def build_vertex(
f"No cache found for {flow_id}. Building graph starting at {vertex_id}"
)
graph = build_and_cache_graph(
flow_id=flow_id, session=get_session(), chat_service=chat_service
flow_id=flow_id, session=next(get_session()), chat_service=chat_service
)
else:
graph = cache.get("result")
@ -345,13 +346,9 @@ async def build_vertex(
# to the frontend
vertex.set_artifacts()
artifacts = vertex.artifacts
timedelta = time.perf_counter() - start_time
duration = format_elapsed_time(timedelta)
result_dict = ResultDict(
result_dict = ResultData(
results=result_dict,
artifacts=artifacts,
duration=duration,
timedelta=timedelta,
)
vertex.set_result(result_dict)
elif vertex.result is not None:
@ -364,7 +361,7 @@ async def build_vertex(
except Exception as exc:
params = repr(exc)
valid = False
result_dict = ResultDict(results={})
result_dict = ResultData(results={})
artifacts = {}
# If there's an error building the vertex
# we need to clear the cache
@ -377,6 +374,12 @@ async def build_vertex(
data=result_dict,
artifacts=artifacts,
)
timedelta = time.perf_counter() - start_time
duration = format_elapsed_time(timedelta)
result_dict.duration = duration
result_dict.timedelta = timedelta
return VertexBuildResponse(
valid=valid,
params=params,

View file

@ -222,7 +222,7 @@ class VerticesOrderResponse(BaseModel):
ids: List[List[str]]
class ResultDict(BaseModel):
class ResultData(BaseModel):
results: Optional[Any] = Field(default_factory=dict)
artifacts: Optional[Any] = Field(default_factory=dict)
timedelta: Optional[float] = None
@ -240,7 +240,7 @@ class VertexBuildResponse(BaseModel):
valid: bool
params: Optional[str]
"""JSON string of the params."""
data: ResultDict
data: ResultData
"""Mapping of vertex ids to result dict containing the param name and result value."""
timestamp: Optional[datetime] = Field(default_factory=datetime.utcnow)
"""Timestamp of the build."""

View file

@ -417,6 +417,7 @@ class Graph:
if edge not in edges:
edges.append(edge)
vertices = self.layered_topological_sort(vertices, edges)
vertices = self.sort_interface_components_first(vertices)
return self.sort_chat_inputs_first(vertices)
def layered_topological_sort(
@ -522,11 +523,23 @@ class Graph:
# and sort the layers accordingly
# InterfaceComponentTypes is an enum
# check all values of the enum and sort the layers
for layer in vertices:
layer.sort(
key=lambda x: any(
comp_type.value in x
for comp_type in InterfaceComponentTypes.__members__.values()
)
)
vertices = self.sort_interface_components_first(vertices)
return self.sort_chat_inputs_first(vertices)
def sort_interface_components_first(self, vertices: List[Vertex]) -> List[Vertex]:
"""Sorts the vertices in the graph so that vertices containing ChatInput or ChatOutput come first."""
def contains_interface_component(vertex):
return any(
component.value in vertex for component in InterfaceComponentTypes
)
# Sort each inner list so that vertices containing ChatInput or ChatOutput come first
sorted_vertices = [
sorted(
inner_list,
key=lambda vertex: not contains_interface_component(vertex),
)
for inner_list in vertices
]
return sorted_vertices

View file

@ -14,7 +14,7 @@ from langflow.utils.util import sync_to_async
from loguru import logger
if TYPE_CHECKING:
from langflow.api.v1.schemas import ResultDict
from langflow.api.v1.schemas import ResultData
from langflow.graph.edge.base import ContractEdge
from langflow.graph.graph.base import Graph
@ -50,7 +50,7 @@ class Vertex:
self.parent_is_top_level = False
self.layer = None
self.should_run = True
self.result: Optional["ResultDict"] = None
self.result: Optional["ResultData"] = None
try:
self.is_interface_component = InterfaceComponentTypes(self.vertex_type)
except ValueError:
@ -81,7 +81,7 @@ class Vertex:
)
return edge_results
def set_result(self, result: "ResultDict") -> None:
def set_result(self, result: "ResultData") -> None:
self.result = result
def get_built_result(self):
@ -306,7 +306,7 @@ class Vertex:
params.pop(key, None)
# Add _type to params
self.params = params
self._raw_params = params
self._raw_params = params.copy()
async def _build(self, user_id=None):
"""

View file

@ -1,13 +1,12 @@
from typing import TYPE_CHECKING, Any, Dict, Optional, Type
import duckdb
from langflow.services.deps import get_monitor_service
from loguru import logger
from pydantic import BaseModel
from langflow.services.deps import get_monitor_service
if TYPE_CHECKING:
from langflow.api.v1.schemas import ResultDict
from langflow.api.v1.schemas import ResultData
INDEX_KEY = "index"
@ -44,7 +43,9 @@ def model_to_sql_column_definitions(model: Type[BaseModel]) -> dict:
return columns
def drop_and_create_table_if_schema_mismatch(db_path: str, table_name: str, model: Type[BaseModel]):
def drop_and_create_table_if_schema_mismatch(
db_path: str, table_name: str, model: Type[BaseModel]
):
with duckdb.connect(db_path) as conn:
# Get the current schema from the database
try:
@ -64,8 +65,12 @@ def drop_and_create_table_if_schema_mismatch(db_path: str, table_name: str, mode
conn.execute(f"CREATE SEQUENCE seq_{table_name} START 1;")
except duckdb.CatalogException:
pass
desired_schema[INDEX_KEY] = f"INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq_{table_name}')"
columns_sql = ", ".join(f"{name} {data_type}" for name, data_type in desired_schema.items())
desired_schema[INDEX_KEY] = (
f"INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq_{table_name}')"
)
columns_sql = ", ".join(
f"{name} {data_type}" for name, data_type in desired_schema.items()
)
create_table_sql = f"CREATE TABLE {table_name} ({columns_sql})"
conn.execute(create_table_sql)
@ -133,7 +138,7 @@ async def log_vertex_build(
vertex_id: str,
valid: bool,
params: Any,
data: "ResultDict",
data: "ResultData",
artifacts: Optional[dict] = None,
):
try:

View file

@ -3,7 +3,7 @@ from typing import Callable
import socketio
from langflow.api.utils import format_elapsed_time
from langflow.api.v1.schemas import ResultDict, VertexBuildResponse
from langflow.api.v1.schemas import ResultData, VertexBuildResponse
from langflow.graph.graph.base import Graph
from langflow.graph.vertex.base import StatelessVertex
from langflow.services.database.models.flow.model import Flow
@ -73,11 +73,16 @@ async def build_vertex(
artifacts = vertex.artifacts
timedelta = time.perf_counter() - start_time
duration = format_elapsed_time(timedelta)
result_dict = ResultDict(results=result_dict, artifacts=artifacts, duration=duration, timedelta=timedelta)
result_dict = ResultData(
results=result_dict,
artifacts=artifacts,
duration=duration,
timedelta=timedelta,
)
except Exception as exc:
params = str(exc)
valid = False
result_dict = ResultDict(results={})
result_dict = ResultData(results={})
artifacts = {}
set_cache(flow_id, graph)
await log_vertex_build(
@ -90,7 +95,9 @@ async def build_vertex(
)
# Emit the vertex build response
response = VertexBuildResponse(valid=valid, params=params, id=vertex.id, data=result_dict)
response = VertexBuildResponse(
valid=valid, params=params, id=vertex.id, data=result_dict
)
await sio.emit("vertex_build", data=response.model_dump(), to=sid)
except Exception as exc:

View file

@ -3,8 +3,10 @@ import { useContext, useEffect } from "react";
import { Cookies } from "react-cookie";
import { useNavigate } from "react-router-dom";
import { renewAccessToken } from ".";
import { BuildStatus } from "../../constants/enums";
import { AuthContext } from "../../contexts/authContext";
import useAlertStore from "../../stores/alertStore";
import useFlowStore from "../../stores/flowStore";
// Create a new Axios instance
const api: AxiosInstance = axios.create({
@ -24,45 +26,20 @@ function ApiInterceptor() {
async (error: AxiosError) => {
if (error.response?.status === 401) {
const accessToken = cookies.get("access_token_lf");
if (accessToken && !autoLogin) {
authenticationErrorCount = authenticationErrorCount + 1;
if (authenticationErrorCount > 3) {
authenticationErrorCount = 0;
logout();
}
try {
const res = await renewAccessToken();
if (res?.data?.access_token && res?.data?.refresh_token) {
login(res?.data?.access_token);
}
if (error?.config?.headers) {
delete error.config.headers["Authorization"];
error.config.headers["Authorization"] = `Bearer ${cookies.get(
"access_token_lf"
)}`;
const response = await axios.request(error.config);
return response;
}
} catch (error) {
if (axios.isAxiosError(error) && error.response?.status === 401) {
logout();
} else {
console.error(error);
logout();
}
}
checkErrorCount();
await tryToRenewAccessToken(error);
}
if (!accessToken && error?.config?.url?.includes("login")) {
return Promise.reject(error);
} else {
logout();
}
} else {
// if (URL_EXCLUDED_FROM_ERROR_RETRIES.includes(error.config?.url)) {
return Promise.reject(error);
// }
return logout();
}
await clearBuildVerticesState(error);
return Promise.reject(error);
}
);
@ -115,6 +92,41 @@ function ApiInterceptor() {
};
}, [accessToken, setErrorData]);
function checkErrorCount() {
authenticationErrorCount = authenticationErrorCount + 1;
if (authenticationErrorCount > 3) {
authenticationErrorCount = 0;
logout();
}
}
async function tryToRenewAccessToken(error: AxiosError) {
try {
const res = await renewAccessToken();
if (res?.data?.access_token && res?.data?.refresh_token) {
login(res?.data?.access_token);
}
if (error?.config?.headers) {
delete error.config.headers["Authorization"];
error.config.headers["Authorization"] = `Bearer ${cookies.get(
"access_token_lf"
)}`;
const response = await axios.request(error.config);
return response;
}
} catch (error) {
logout();
}
}
async function clearBuildVerticesState(error) {
if (error?.response?.status === 500) {
const vertices = useFlowStore.getState().verticesBuild;
useFlowStore.getState().updateBuildStatus(vertices, BuildStatus.BUILT);
}
}
return null;
}

View file

@ -431,6 +431,10 @@ const useFlowStore = create<FlowStoreType>((set, get) => ({
}
});
},
updateVerticesBuild: (vertices: string[]) => {
set({ verticesBuild: vertices });
},
verticesBuild: [],
}));
export default useFlowStore;

View file

@ -87,4 +87,6 @@ export type FlowStoreType = {
buildFlow: (nodeId?: string) => Promise<void>;
getFlow: () => { nodes: Node[]; edges: Edge[]; viewport: Viewport };
updateBuildStatus: (nodeId: string[], status: BuildStatus) => void;
updateVerticesBuild: (vertices: string[]) => void;
verticesBuild: string[];
};

View file

@ -47,50 +47,59 @@ export async function buildVertices({
const verticesIds = vertices.flatMap((v) => v);
useFlowStore.getState().updateBuildStatus(verticesIds, BuildStatus.TO_BUILD);
useFlowStore.getState().updateVerticesBuild(verticesIds);
// Set each vertex state to building
const buildResults: Array<boolean> = [];
for (let i = 0; i < vertices.length; i += 1) {
if (onBuildStart) onBuildStart(vertices[i]);
await Promise.all(
vertices[i].map(async (id) => {
try {
// Set vertex state to building
const buildRes = await postBuildVertex(flowId, id);
const buildData: VertexBuildTypeAPI = buildRes.data;
if (onBuildUpdate) {
let data = {};
if (!buildData.valid) {
if (onBuildError) {
onBuildError(
"Error Building Component",
[buildData.params],
verticesIds
);
}
}
data[buildData.id] = buildData;
onBuildUpdate({ data, id: buildData.id });
}
buildResults.push(buildData.valid);
} catch (error) {
if (onBuildError) {
console.log(error);
onBuildError(
"Error Building Component",
[
(error as AxiosError<any>).response?.data?.detail ??
"Unknown Error",
],
verticesIds
);
}
}
})
);
for (const id of vertices[i]) {
buildVertex(
flowId,
id,
onBuildUpdate,
onBuildError,
verticesIds,
buildResults
);
}
}
if (onBuildComplete) {
const allNodesValid = buildResults.every((result) => result);
onBuildComplete(allNodesValid);
}
}
async function buildVertex(
flowId,
id,
onBuildUpdate,
onBuildError,
verticesIds,
buildResults
) {
try {
const buildRes = await postBuildVertex(flowId, id);
const buildData: VertexBuildTypeAPI = buildRes.data;
if (onBuildUpdate) {
let data = {};
if (!buildData.valid) {
onBuildError!(
"Error Building Component",
[buildData.params],
verticesIds
);
}
data[buildData.id] = buildData;
onBuildUpdate({ data, id: buildData.id });
}
buildResults.push(buildData.valid);
} catch (error) {
onBuildError!(
"Error Building Component",
[(error as AxiosError<any>).response?.data?.detail ?? "Unknown Error"],
verticesIds
);
}
}