Merge branch 'cz/promise-each' of github.com:logspace-ai/langflow into cz/promise-each
This commit is contained in:
commit
11092c5955
5 changed files with 38 additions and 24 deletions
|
|
@ -20,7 +20,7 @@ from langflow.api.v1.schemas import (
|
|||
BuildStatus,
|
||||
BuiltResponse,
|
||||
InitResponse,
|
||||
ResultDict,
|
||||
ResultData,
|
||||
StreamData,
|
||||
VertexBuildResponse,
|
||||
VerticesOrderResponse,
|
||||
|
|
@ -322,6 +322,7 @@ async def build_vertex(
|
|||
inputs: dict = Body(None),
|
||||
):
|
||||
"""Build a vertex instead of the entire graph."""
|
||||
start_time = time.perf_counter()
|
||||
try:
|
||||
cache = chat_service.get_cache(flow_id)
|
||||
if not cache:
|
||||
|
|
@ -336,7 +337,6 @@ async def build_vertex(
|
|||
graph = cache.get("result")
|
||||
result_dict = {}
|
||||
duration = ""
|
||||
start_time = time.perf_counter()
|
||||
if tweaks:
|
||||
graph = process_tweaks_on_graph(graph, tweaks)
|
||||
if not (vertex := graph.get_vertex(vertex_id)):
|
||||
|
|
@ -351,13 +351,9 @@ async def build_vertex(
|
|||
# to the frontend
|
||||
vertex.set_artifacts()
|
||||
artifacts = vertex.artifacts
|
||||
timedelta = time.perf_counter() - start_time
|
||||
duration = format_elapsed_time(timedelta)
|
||||
result_dict = ResultDict(
|
||||
result_dict = ResultData(
|
||||
results=result_dict,
|
||||
artifacts=artifacts,
|
||||
duration=duration,
|
||||
timedelta=timedelta,
|
||||
)
|
||||
vertex.set_result(result_dict)
|
||||
elif vertex.result is not None:
|
||||
|
|
@ -370,7 +366,7 @@ async def build_vertex(
|
|||
except Exception as exc:
|
||||
params = str(exc)
|
||||
valid = False
|
||||
result_dict = ResultDict(results={})
|
||||
result_dict = ResultData(results={})
|
||||
artifacts = {}
|
||||
# If there's an error building the vertex
|
||||
# we need to clear the cache
|
||||
|
|
@ -383,6 +379,12 @@ async def build_vertex(
|
|||
data=result_dict,
|
||||
artifacts=artifacts,
|
||||
)
|
||||
|
||||
timedelta = time.perf_counter() - start_time
|
||||
duration = format_elapsed_time(timedelta)
|
||||
result_dict.duration = duration
|
||||
result_dict.timedelta = timedelta
|
||||
|
||||
return VertexBuildResponse(
|
||||
valid=valid,
|
||||
params=params,
|
||||
|
|
|
|||
|
|
@ -222,7 +222,7 @@ class VerticesOrderResponse(BaseModel):
|
|||
ids: List[List[str]]
|
||||
|
||||
|
||||
class ResultDict(BaseModel):
|
||||
class ResultData(BaseModel):
|
||||
results: Optional[Any] = Field(default_factory=dict)
|
||||
artifacts: Optional[Any] = Field(default_factory=dict)
|
||||
timedelta: Optional[float] = None
|
||||
|
|
@ -240,7 +240,7 @@ class VertexBuildResponse(BaseModel):
|
|||
valid: bool
|
||||
params: Optional[str]
|
||||
"""JSON string of the params."""
|
||||
data: ResultDict
|
||||
data: ResultData
|
||||
"""Mapping of vertex ids to result dict containing the param name and result value."""
|
||||
timestamp: Optional[datetime] = Field(default_factory=datetime.utcnow)
|
||||
"""Timestamp of the build."""
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ from langflow.utils.util import sync_to_async
|
|||
from loguru import logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from langflow.api.v1.schemas import ResultDict
|
||||
from langflow.api.v1.schemas import ResultData
|
||||
from langflow.graph.edge.base import ContractEdge
|
||||
from langflow.graph.graph.base import Graph
|
||||
|
||||
|
|
@ -50,7 +50,7 @@ class Vertex:
|
|||
self.parent_is_top_level = False
|
||||
self.layer = None
|
||||
self.should_run = True
|
||||
self.result: Optional["ResultDict"] = None
|
||||
self.result: Optional["ResultData"] = None
|
||||
try:
|
||||
self.is_interface_component = InterfaceComponentTypes(self.vertex_type)
|
||||
except ValueError:
|
||||
|
|
@ -81,7 +81,7 @@ class Vertex:
|
|||
)
|
||||
return edge_results
|
||||
|
||||
def set_result(self, result: "ResultDict") -> None:
|
||||
def set_result(self, result: "ResultData") -> None:
|
||||
self.result = result
|
||||
|
||||
def get_built_result(self):
|
||||
|
|
|
|||
|
|
@ -1,13 +1,12 @@
|
|||
from typing import TYPE_CHECKING, Any, Dict, Optional, Type
|
||||
|
||||
import duckdb
|
||||
from langflow.services.deps import get_monitor_service
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
from langflow.services.deps import get_monitor_service
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from langflow.api.v1.schemas import ResultDict
|
||||
from langflow.api.v1.schemas import ResultData
|
||||
|
||||
|
||||
INDEX_KEY = "index"
|
||||
|
|
@ -44,7 +43,9 @@ def model_to_sql_column_definitions(model: Type[BaseModel]) -> dict:
|
|||
return columns
|
||||
|
||||
|
||||
def drop_and_create_table_if_schema_mismatch(db_path: str, table_name: str, model: Type[BaseModel]):
|
||||
def drop_and_create_table_if_schema_mismatch(
|
||||
db_path: str, table_name: str, model: Type[BaseModel]
|
||||
):
|
||||
with duckdb.connect(db_path) as conn:
|
||||
# Get the current schema from the database
|
||||
try:
|
||||
|
|
@ -64,8 +65,12 @@ def drop_and_create_table_if_schema_mismatch(db_path: str, table_name: str, mode
|
|||
conn.execute(f"CREATE SEQUENCE seq_{table_name} START 1;")
|
||||
except duckdb.CatalogException:
|
||||
pass
|
||||
desired_schema[INDEX_KEY] = f"INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq_{table_name}')"
|
||||
columns_sql = ", ".join(f"{name} {data_type}" for name, data_type in desired_schema.items())
|
||||
desired_schema[INDEX_KEY] = (
|
||||
f"INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq_{table_name}')"
|
||||
)
|
||||
columns_sql = ", ".join(
|
||||
f"{name} {data_type}" for name, data_type in desired_schema.items()
|
||||
)
|
||||
create_table_sql = f"CREATE TABLE {table_name} ({columns_sql})"
|
||||
conn.execute(create_table_sql)
|
||||
|
||||
|
|
@ -133,7 +138,7 @@ async def log_vertex_build(
|
|||
vertex_id: str,
|
||||
valid: bool,
|
||||
params: Any,
|
||||
data: "ResultDict",
|
||||
data: "ResultData",
|
||||
artifacts: Optional[dict] = None,
|
||||
):
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ from typing import Callable
|
|||
|
||||
import socketio
|
||||
from langflow.api.utils import format_elapsed_time
|
||||
from langflow.api.v1.schemas import ResultDict, VertexBuildResponse
|
||||
from langflow.api.v1.schemas import ResultData, VertexBuildResponse
|
||||
from langflow.graph.graph.base import Graph
|
||||
from langflow.graph.vertex.base import StatelessVertex
|
||||
from langflow.services.database.models.flow.model import Flow
|
||||
|
|
@ -73,11 +73,16 @@ async def build_vertex(
|
|||
artifacts = vertex.artifacts
|
||||
timedelta = time.perf_counter() - start_time
|
||||
duration = format_elapsed_time(timedelta)
|
||||
result_dict = ResultDict(results=result_dict, artifacts=artifacts, duration=duration, timedelta=timedelta)
|
||||
result_dict = ResultData(
|
||||
results=result_dict,
|
||||
artifacts=artifacts,
|
||||
duration=duration,
|
||||
timedelta=timedelta,
|
||||
)
|
||||
except Exception as exc:
|
||||
params = str(exc)
|
||||
valid = False
|
||||
result_dict = ResultDict(results={})
|
||||
result_dict = ResultData(results={})
|
||||
artifacts = {}
|
||||
set_cache(flow_id, graph)
|
||||
await log_vertex_build(
|
||||
|
|
@ -90,7 +95,9 @@ async def build_vertex(
|
|||
)
|
||||
|
||||
# Emit the vertex build response
|
||||
response = VertexBuildResponse(valid=valid, params=params, id=vertex.id, data=result_dict)
|
||||
response = VertexBuildResponse(
|
||||
valid=valid, params=params, id=vertex.id, data=result_dict
|
||||
)
|
||||
await sio.emit("vertex_build", data=response.model_dump(), to=sid)
|
||||
|
||||
except Exception as exc:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue