diff --git a/src/backend/langflow/api/v1/chat.py b/src/backend/langflow/api/v1/chat.py index 98dd0dfef..b72504da6 100644 --- a/src/backend/langflow/api/v1/chat.py +++ b/src/backend/langflow/api/v1/chat.py @@ -159,10 +159,10 @@ async def build_vertex( result_data_response.duration = duration result_data_response.timedelta = timedelta vertex.add_build_time(timedelta) - inactive_vertices = None - if graph.inactive_vertices: - inactive_vertices = list(graph.inactive_vertices) - graph.reset_inactive_vertices() + inactivated_vertices = None + inactivated_vertices = list(graph.inactivated_vertices) + graph.reset_inactivated_vertices() + graph.reset_activated_vertices() chat_service.set_cache(flow_id, graph) # graph.stop_vertex tells us if the user asked @@ -173,8 +173,8 @@ async def build_vertex( next_vertices_ids = [graph.stop_vertex] build_response = VertexBuildResponse( + inactivated_vertices=inactivated_vertices, next_vertices_ids=next_vertices_ids, - inactive_vertices=inactive_vertices, valid=valid, params=params, id=vertex.id, diff --git a/src/backend/langflow/api/v1/endpoints.py b/src/backend/langflow/api/v1/endpoints.py index bd07d38b2..c28d06bcd 100644 --- a/src/backend/langflow/api/v1/endpoints.py +++ b/src/backend/langflow/api/v1/endpoints.py @@ -13,6 +13,7 @@ from langflow.api.v1.schemas import ( ProcessResponse, RunResponse, TaskStatusResponse, + Tweaks, UploadFileResponse, ) from langflow.interface.custom.custom_component import CustomComponent @@ -56,19 +57,60 @@ def get_all( async def run_flow_with_caching( session: Annotated[Session, Depends(get_session)], flow_id: str, - inputs: Optional[InputValueRequest] = None, - tweaks: Optional[dict] = None, + inputs: Optional[List[InputValueRequest]] = None, + outputs: Optional[List[str]] = None, + tweaks: Annotated[Optional[Tweaks], Body(embed=True)] = None, # noqa: F821 stream: Annotated[bool, Body(embed=True)] = False, # noqa: F821 session_id: Annotated[Union[None, str], Body(embed=True)] = None, # noqa: F821 api_key_user: User = Depends(api_key_security), session_service: SessionService = Depends(get_session_service), ): + """ + Executes a specified flow by ID with optional input values, output selection, tweaks, and streaming capability. + This endpoint supports running flows with caching to enhance performance and efficiency. + + ### Parameters: + - `flow_id` (str): The unique identifier of the flow to be executed. + - `inputs` (List[InputValueRequest], optional): A list of inputs specifying the input values and components for the flow. Each input can target specific components and provide custom values. + - `outputs` (List[str], optional): A list of output names to retrieve from the executed flow. If not provided, all outputs are returned. + - `tweaks` (Optional[Tweaks], optional): A dictionary of tweaks to customize the flow execution. The tweaks can be used to modify the flow's parameters and components. Tweaks can be overridden by the input values. + - `stream` (bool, optional): Specifies whether the results should be streamed. Defaults to False. + - `session_id` (Union[None, str], optional): An optional session ID to utilize existing session data for the flow execution. + - `api_key_user` (User): The user associated with the current API key. Automatically resolved from the API key. + - `session_service` (SessionService): The session service object for managing flow sessions. + + ### Returns: + A `RunResponse` object containing the selected outputs (or all if not specified) of the executed flow and the session ID. The structure of the response accommodates multiple inputs, providing a nested list of outputs for each input. + + ### Raises: + HTTPException: Indicates issues with finding the specified flow, invalid input formats, or internal errors during flow execution. + + ### Example usage: + ```json + POST /run/{flow_id} + Payload: + { + "inputs": [ + {"components": ["component1"], "input_value": "value1"}, + {"components": ["component3"], "input_value": "value2"} + ], + "outputs": ["Component Name", "component_id"], + "tweaks": {"parameter_name": "value", "Component Name": {"parameter_name": "value"}, "component_id": {"parameter_name": "value"}} + "stream": false + } + ``` + + This endpoint facilitates complex flow executions with customized inputs, outputs, and configurations, catering to diverse application requirements. + """ try: if inputs is not None: input_values_dict: dict[str, Union[str, list[str]]] = inputs.model_dump() else: input_values_dict = {} + if outputs is None: + outputs = [] + if session_id: session_data = await session_service.load_session( session_id, flow_id=flow_id @@ -82,6 +124,7 @@ async def run_flow_with_caching( flow_id=flow_id, session_id=session_id, inputs=input_values_dict, + outputs=outputs, artifacts=artifacts, session_service=session_service, stream=stream, @@ -107,6 +150,7 @@ async def run_flow_with_caching( flow_id=flow_id, session_id=session_id, inputs=input_values_dict, + outputs=outputs, artifacts={}, session_service=session_service, stream=stream, diff --git a/src/backend/langflow/api/v1/schemas.py b/src/backend/langflow/api/v1/schemas.py index 7a91473e6..c47082540 100644 --- a/src/backend/langflow/api/v1/schemas.py +++ b/src/backend/langflow/api/v1/schemas.py @@ -4,7 +4,7 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Union from uuid import UUID -from pydantic import BaseModel, Field, field_validator, model_serializer +from pydantic import BaseModel, Field, RootModel, field_validator, model_serializer from langflow.services.database.models.api_key.model import ApiKeyRead from langflow.services.database.models.base import orjson_dumps @@ -229,8 +229,8 @@ class ResultDataResponse(BaseModel): class VertexBuildResponse(BaseModel): id: Optional[str] = None + inactivated_vertices: Optional[List[str]] = None next_vertices_ids: Optional[List[str]] = None - inactive_vertices: Optional[List[str]] = None valid: bool params: Optional[Any] = Field(default_factory=dict) """JSON string of the params.""" @@ -245,4 +245,49 @@ class VerticesBuiltResponse(BaseModel): class InputValueRequest(BaseModel): - input_value: str + components: Optional[List[str]] = None + input_value: Optional[str] = None + + # add an example + model_config = { + "json_schema_extra": { + "examples": [ + { + "components": ["components_id", "Component Name"], + "input_value": "input_value", + }, + {"components": ["Component Name"], "input_value": "input_value"}, + {"input_value": "input_value"}, + ] + } + } + + +class Tweaks(RootModel): + root: dict[str, Union[str, dict[str, str]]] = Field( + description="A dictionary of tweaks to adjust the flow's execution. Allows customizing flow behavior dynamically. All tweaks are overridden by the input values.", + ) + model_config = { + "json_schema_extra": { + "examples": [ + { + "parameter_name": "value", + "Component Name": {"parameter_name": "value"}, + "component_id": {"parameter_name": "value"}, + } + ] + } + } + + # This should behave like a dict + def __getitem__(self, key): + return self.root[key] + + def __setitem__(self, key, value): + self.root[key] = value + + def __delitem__(self, key): + del self.root[key] + + def items(self): + return self.root.items() diff --git a/src/backend/langflow/components/data/UrlLoader.py b/src/backend/langflow/components/data/UrlLoader.py deleted file mode 100644 index c1346f142..000000000 --- a/src/backend/langflow/components/data/UrlLoader.py +++ /dev/null @@ -1,49 +0,0 @@ -from typing import List - -from langchain import document_loaders -from langchain_core.documents import Document - -from langflow import CustomComponent - - -class UrlLoaderComponent(CustomComponent): - display_name: str = "Url Loader" - description: str = "Generic Url Loader Component" - - def build_config(self): - return { - "web_path": { - "display_name": "Url", - "required": True, - }, - "loader": { - "display_name": "Loader", - "is_list": True, - "required": True, - "options": [ - "AZLyricsLoader", - "CollegeConfidentialLoader", - "GitbookLoader", - "HNLoader", - "IFixitLoader", - "IMSDbLoader", - "WebBaseLoader", - ], - "value": "WebBaseLoader", - }, - "code": {"show": False}, - } - - def build(self, web_path: str, loader: str) -> List[Document]: - try: - loader_instance = getattr(document_loaders, loader)(web_path=web_path) - except Exception as e: - raise ValueError(f"No loader found for: {web_path}") from e - docs = loader_instance.load() - avg_length = sum( - len(doc.page_content) for doc in docs if hasattr(doc, "page_content") - ) / len(docs) - self.status = f"""{len(docs)} documents) - \nAvg. Document Length (characters): {int(avg_length)} - Documents: {docs[:3]}...""" - return docs diff --git a/src/backend/langflow/components/langchain_utilities/DocumentToRecord.py b/src/backend/langflow/components/utilities/DocumentToRecord.py similarity index 100% rename from src/backend/langflow/components/langchain_utilities/DocumentToRecord.py rename to src/backend/langflow/components/utilities/DocumentToRecord.py diff --git a/src/backend/langflow/components/utilities/GetNotified.py b/src/backend/langflow/components/utilities/GetNotified.py new file mode 100644 index 000000000..d97c30df2 --- /dev/null +++ b/src/backend/langflow/components/utilities/GetNotified.py @@ -0,0 +1,20 @@ +from langflow import CustomComponent +from langflow.schema import Record + + +class GetNotifiedComponent(CustomComponent): + display_name = "Get Notified" + description = "A component to get notified by Notify component." + + def build_config(self): + return { + "name": { + "display_name": "Name", + "info": "The name of the notification to listen for.", + }, + } + + def build(self, name: str) -> Record: + state = self.get_state(name) + self.status = state + return state diff --git a/src/backend/langflow/components/utilities/Notify.py b/src/backend/langflow/components/utilities/Notify.py new file mode 100644 index 000000000..2155b079e --- /dev/null +++ b/src/backend/langflow/components/utilities/Notify.py @@ -0,0 +1,41 @@ +from typing import Optional + +from langflow import CustomComponent +from langflow.schema import Record + + +class NotifyComponent(CustomComponent): + display_name = "Notify" + description = "A component to generate a notification to Get Notified component." + + def build_config(self): + return { + "name": {"display_name": "Name", "info": "The name of the notification."}, + "record": {"display_name": "Record", "info": "The record to store."}, + "append": { + "display_name": "Append", + "info": "If True, the record will be appended to the notification.", + }, + } + + def build( + self, name: str, record: Optional[Record] = None, append: bool = False + ) -> Record: + if record and not isinstance(record, Record): + if isinstance(record, str): + record = Record(text=record) + elif isinstance(record, dict): + record = Record(data=record) + else: + record = Record(text=str(record)) + elif not record: + record = Record(text="") + if record: + if append: + self.append_state(name, record) + else: + self.update_state(name, record) + else: + self.status = "No record provided." + self.status = record + return record diff --git a/src/backend/langflow/components/utilities/RecordsAsText.py b/src/backend/langflow/components/utilities/RecordsAsText.py index debf3eed2..18bf8be8c 100644 --- a/src/backend/langflow/components/utilities/RecordsAsText.py +++ b/src/backend/langflow/components/utilities/RecordsAsText.py @@ -25,6 +25,8 @@ class RecordsAsTextComponent(CustomComponent): records: list[Record], template: str = "Text: {text}\nData: {data}", ) -> Text: + if not records: + return "" if isinstance(records, Record): records = [records] diff --git a/src/backend/langflow/components/utilities/SharedState.py b/src/backend/langflow/components/utilities/SharedState.py index 7d29da9bb..95de17774 100644 --- a/src/backend/langflow/components/utilities/SharedState.py +++ b/src/backend/langflow/components/utilities/SharedState.py @@ -1,7 +1,6 @@ -from typing import Union +from typing import Optional from langflow import CustomComponent -from langflow.field_typing import Text from langflow.schema import Record @@ -20,19 +19,23 @@ class SharedState(CustomComponent): } def build( - self, name: str, record: Union[Text, Record], append: bool = False + self, name: str, record: Optional[Record] = None, append: bool = False ) -> Record: - if append: - self.append_state(name, record) - else: - self.update_state(name, record) + if record: + if append: + self.append_state(name, record) + else: + self.update_state(name, record) state = self.get_state(name) - if not isinstance(state, Record): + if state and not isinstance(state, Record): if isinstance(state, str): state = Record(text=state) elif isinstance(state, dict): state = Record(data=state) else: state = Record(text=str(state)) + elif not state: + state = Record(text="") + self.status = state return state diff --git a/src/backend/langflow/components/utilities/TextToRecord.py b/src/backend/langflow/components/utilities/TextToRecord.py new file mode 100644 index 000000000..277434da6 --- /dev/null +++ b/src/backend/langflow/components/utilities/TextToRecord.py @@ -0,0 +1,31 @@ +from typing import Optional + +from langflow import CustomComponent +from langflow.field_typing import Text +from langflow.schema import Record + + +class TextToRecordComponent(CustomComponent): + display_name = "Text to Record" + description = "Converts text to a Record." + + def build_config(self): + return { + "text": { + "display_name": "Text", + "info": "The text to convert to a record.", + }, + "data": { + "display_name": "Data", + "info": "The optional data to include in the record.", + }, + } + + def build( + self, + text: Text, + data: Optional[dict] = {}, + ) -> Record: + record = Record(text=text, data=data) + self.status = record + return record diff --git a/src/backend/langflow/graph/graph/base.py b/src/backend/langflow/graph/graph/base.py index b7ed754d4..32c1369d8 100644 --- a/src/backend/langflow/graph/graph/base.py +++ b/src/backend/langflow/graph/graph/base.py @@ -17,9 +17,11 @@ from langflow.graph.vertex.types import ( FileToolVertex, LLMVertex, RoutingVertex, + StateVertex, ToolkitVertex, ) from langflow.interface.tools.constants import FILE_TOOLS +from langflow.schema import Record from langflow.utils import payload if TYPE_CHECKING: @@ -43,9 +45,10 @@ class Graph: self.flow_id = flow_id self._is_input_vertices: List[str] = [] self._is_output_vertices: List[str] = [] + self._is_state_vertices: List[str] = [] self._has_session_id_vertices: List[str] = [] self._sorted_vertices_layers: List[List[str]] = [] - self.run_id = None + self._run_id = None self.top_level_vertices = [] for vertex in self._vertices: @@ -55,6 +58,8 @@ class Graph: self._vertices = self._graph_data["nodes"] self._edges = self._graph_data["edges"] + self.inactivated_vertices: set = set() + self.activated_vertices: List[str] = [] self.vertices_layers = [] self.vertices_to_run = set() self.stop_vertex = None @@ -67,13 +72,67 @@ class Graph: self.define_vertices_lists() self.state_manager = GraphStateManager() + def get_state(self, name: str) -> Optional[Record]: + """Returns the state of the graph.""" + return self.state_manager.get_state(name, run_id=self._run_id) + + def update_state( + self, name: str, record: Union[str, Record], caller: Optional[str] = None + ) -> None: + """Updates the state of the graph.""" + if caller: + # If there is a caller which is a vertex_id, I want to activate + # all StateVertex in self.vertices that are not the caller + # essentially notifying all the other vertices that the state has changed + # This also has to activate their successors + self.activate_state_vertices(name, caller) + + self.state_manager.update_state(name, record, run_id=self._run_id) + + def activate_state_vertices(self, name: str, caller: str): + vertices_ids = [] + for vertex_id in self._is_state_vertices: + if vertex_id == caller: + continue + vertex = self.get_vertex(vertex_id) + if ( + isinstance(vertex._raw_params["name"], str) + and name in vertex._raw_params["name"] + and vertex_id != caller + and isinstance(vertex, StateVertex) + ): + vertices_ids.append(vertex_id) + successors = self.get_all_successors(vertex, flat=True) + self.vertices_to_run.update(list(map(lambda x: x.id, successors))) + self.activated_vertices = vertices_ids + self.vertices_to_run.update(vertices_ids) + + def reset_activated_vertices(self): + self.activated_vertices = [] + + def append_state( + self, name: str, record: Union[str, Record], caller: Optional[str] = None + ) -> None: + """Appends the state of the graph.""" + if caller: + + self.activate_state_vertices(name, caller) + + self.state_manager.append_state(name, record, run_id=self._run_id) + + @property + def run_id(self): + if not self._run_id: + raise ValueError("Run ID not set") + return self._run_id + def set_run_id(self, run_id: str): for vertex in self.vertices: self.state_manager.subscribe(run_id, vertex.update_graph_state) - self.run_id = run_id + self._run_id = run_id def add_state(self, state: str): - self.state_manager.append_state(self.run_id, state) + self.state_manager.append_state(self._run_id, state) @property def sorted_vertices_layers(self) -> List[List[str]]: @@ -85,28 +144,44 @@ class Graph: """ Defines the lists of vertices that are inputs, outputs, and have session_id. """ - attributes = ["is_input", "is_output", "has_session_id"] + attributes = ["is_input", "is_output", "has_session_id", "is_state"] for vertex in self.vertices: for attribute in attributes: if getattr(vertex, attribute): getattr(self, f"_{attribute}_vertices").append(vertex.id) async def _run( - self, inputs: Dict[str, str], stream: bool + self, + inputs: Dict[str, str], + input_components: list[str], + outputs: list[str], + stream: bool, + session_id: str, ) -> List[Optional["ResultData"]]: """Runs the graph with the given inputs.""" for vertex_id in self._is_input_vertices: vertex = self.get_vertex(vertex_id) + if input_components and ( + vertex_id not in input_components + or vertex.display_name not in input_components + ): + continue if vertex is None: raise ValueError(f"Vertex {vertex_id} not found") - vertex.update_raw_params(inputs) + vertex.update_raw_params(inputs, overwrite=True) + # Update all the vertices with the session_id + for vertex_id in self._has_session_id_vertices: + vertex = self.get_vertex(vertex_id) + if vertex is None: + raise ValueError(f"Vertex {vertex_id} not found") + vertex.update_raw_params({"session_id": session_id}) try: await self.process() self.increment_run_count() except Exception as exc: logger.exception(exc) raise ValueError(f"Error running graph: {exc}") from exc - outputs = [] + vertex_outputs = [] for vertex_id in self._is_output_vertices: vertex = self.get_vertex(vertex_id) if vertex is None: @@ -118,11 +193,16 @@ class Graph: and hasattr(vertex, "consume_async_generator") ): await vertex.consume_async_generator() - outputs.append(vertex.result) - return outputs + if vertex.display_name in outputs or vertex.id in outputs: + vertex_outputs.append(vertex.result) + return vertex_outputs async def run( - self, inputs: Dict[str, Union[str, list[str]]], stream: bool + self, + inputs: Dict[str, Union[str, list[str]]], + outputs: list[str], + stream: bool, + session_id: str, ) -> List[Optional["ResultData"]]: """Runs the graph with the given inputs.""" @@ -130,17 +210,21 @@ class Graph: # we need to go through self.inputs and update the self._raw_params # of the vertices that are inputs # if the value is a list, we need to run multiple times - outputs = [] + vertex_outputs = [] inputs_values = inputs.get(INPUT_FIELD_NAME, "") if not isinstance(inputs_values, list): inputs_values = [inputs_values] for input_value in inputs_values: run_outputs = await self._run( - {INPUT_FIELD_NAME: input_value}, stream=stream + inputs={INPUT_FIELD_NAME: input_value}, + input_components=inputs.get("components", []), + outputs=outputs, + stream=stream, + session_id=session_id, ) logger.debug(f"Run outputs: {run_outputs}") - outputs.extend(run_outputs) - return outputs + vertex_outputs.append(run_outputs) + return vertex_outputs # vertices_layers is a list of lists ordered by the order the vertices # should be built. @@ -155,7 +239,7 @@ class Graph: return { "runs": self._runs, "updates": self._updates, - "inactive_vertices": self.inactive_vertices, + "inactivated_vertices": self.inactivated_vertices, } def build_graph_maps(self): @@ -163,8 +247,8 @@ class Graph: self.in_degree_map = self.build_in_degree() self.parent_child_map = self.build_parent_child_map() - def reset_inactive_vertices(self): - self.inactive_vertices = set() + def reset_inactivated_vertices(self): + self.inactivated_vertices = set() def mark_all_vertices(self, state: str): """Marks all vertices in the graph.""" @@ -562,6 +646,43 @@ class Graph: for source_id in self.predecessor_map.get(vertex.id, []) ] + def get_all_successors(self, vertex, recursive=True, flat=True): + # Recursively get the successors of the current vertex + # successors = vertex.successors + # if not successors: + # return [] + # successors_result = [] + # for successor in successors: + # # Just return a list of successors + # if recursive: + # next_successors = self.get_all_successors(successor) + # successors_result.extend(next_successors) + # successors_result.append(successor) + # return successors_result + # The above is the version without the flat parameter + # The below is the version with the flat parameter + # the flat parameter will define if each layer of successors + # becomes one list or if the result is a list of lists + # if flat is True, the result will be a list of vertices + # if flat is False, the result will be a list of lists of vertices + # each list will represent a layer of successors + successors = vertex.successors + if not successors: + return [] + successors_result = [] + for successor in successors: + if recursive: + next_successors = self.get_all_successors(successor) + if flat: + successors_result.extend(next_successors) + else: + successors_result.append(next_successors) + if flat: + successors_result.append(successor) + else: + successors_result.append([successor]) + return successors_result + def get_successors(self, vertex): """Returns the successors of a vertex.""" return [ @@ -596,14 +717,21 @@ class Graph: # if we can't find a vertex, we raise an error edges: List[ContractEdge] = [] + edges_added = set() for edge in self._edges: source = self.get_vertex(edge["source"]) target = self.get_vertex(edge["target"]) + if source is None: raise ValueError(f"Source vertex {edge['source']} not found") if target is None: raise ValueError(f"Target vertex {edge['target']} not found") + + if (source.id, target.id) in edges_added: + continue + edges.append(ContractEdge(source, target, edge)) + edges_added.add((source.id, target.id)) return edges def _get_vertex_class( @@ -616,6 +744,8 @@ class Graph: return ChatVertex elif node_name in ["ShouldRunNext"]: return RoutingVertex + elif node_name in ["SharedState", "Notify", "GetNotified"]: + return StateVertex elif node_base_type in lazy_load_vertex_dict.VERTEX_TYPE_MAP: return lazy_load_vertex_dict.VERTEX_TYPE_MAP[node_base_type] elif node_name in lazy_load_vertex_dict.VERTEX_TYPE_MAP: @@ -723,21 +853,28 @@ class Graph: def layered_topological_sort( self, vertices: List[Vertex], + filter_graphs: bool = False, ) -> List[List[str]]: """Performs a layered topological sort of the vertices in the graph.""" vertices_ids = {vertex.id for vertex in vertices} # Queue for vertices with no incoming edges queue = deque( - vertex.id for vertex in vertices if self.in_degree_map[vertex.id] == 0 + vertex.id + for vertex in vertices + # if filter_graphs then only vertex.is_input will be considered + if self.in_degree_map[vertex.id] == 0 + and (not filter_graphs or vertex.is_input) ) layers: List[List[str]] = [] - + visited = set(queue) current_layer = 0 while queue: layers.append([]) # Start a new layer layer_size = len(queue) for _ in range(layer_size): vertex_id = queue.popleft() + visited.add(vertex_id) + layers[current_layer].append(vertex_id) for neighbor in self.successor_map[vertex_id]: # only vertices in `vertices_ids` should be considered @@ -748,8 +885,16 @@ class Graph: continue self.in_degree_map[neighbor] -= 1 # 'remove' edge - if self.in_degree_map[neighbor] == 0: + if self.in_degree_map[neighbor] == 0 and neighbor not in visited: queue.append(neighbor) + + # if > 0 it might mean not all predecessors have added to the queue + # so we should process the neighbors predecessors + elif self.in_degree_map[neighbor] > 0: + for predecessor in self.predecessor_map[neighbor]: + if predecessor not in queue and predecessor not in visited: + queue.append(predecessor) + current_layer += 1 # Next layer new_layers = self.refine_layers(layers) return new_layers @@ -821,9 +966,12 @@ class Graph: vertices = self.sort_up_to_vertex(stop_component_id) elif start_component_id: vertices = self.sort_up_to_vertex(start_component_id, is_start=True) - else: vertices = self.vertices + # without component_id we are probably running in the chat + # so we want to pick only graphs that start with ChatInput or + # TextInput + vertices_layers = self.layered_topological_sort(vertices) vertices_layers = self.sort_by_avg_build_time(vertices_layers) # vertices_layers = self.sort_chat_inputs_first(vertices_layers) @@ -833,7 +981,7 @@ class Graph: # save the only the rest self.vertices_layers = vertices_layers[1:] self.vertices_to_run = { - vertex for vertex in chain.from_iterable(vertices_layers) + vertex_id for vertex_id in chain.from_iterable(vertices_layers) } # Return just the first layer return first_layer diff --git a/src/backend/langflow/graph/graph/state_manager.py b/src/backend/langflow/graph/graph/state_manager.py index 64476011b..ed5844d87 100644 --- a/src/backend/langflow/graph/graph/state_manager.py +++ b/src/backend/langflow/graph/graph/state_manager.py @@ -2,6 +2,8 @@ from collections import defaultdict from threading import Lock from typing import Callable +from loguru import logger + class GraphStateManager: def __init__(self): @@ -9,21 +11,29 @@ class GraphStateManager: self.observers = defaultdict(list) self.lock = Lock() - def append_state(self, key, new_state): + def append_state(self, key, new_state, run_id: str): with self.lock: - if key not in self.states: - self.states[key] = [] - self.states[key].append(new_state) + if run_id not in self.states: + self.states[run_id] = {} + if key not in self.states[run_id]: + self.states[run_id][key] = [] + elif not isinstance(self.states[key], list): + self.states[run_id][key] = [self.states[key]] + self.states[run_id][key].append(new_state) self.notify_append_observers(key, new_state) - def update_state(self, key, new_state): + def update_state(self, key, new_state, run_id: str): with self.lock: - self.states[key] = new_state + if run_id not in self.states: + self.states[run_id] = {} + if key not in self.states[run_id]: + self.states[run_id][key] = {} + self.states[run_id][key] = new_state self.notify_observers(key, new_state) - def get_state(self, key): + def get_state(self, key, run_id: str): with self.lock: - return self.states.get(key, None) + return self.states.get(run_id, {}).get(key, "") def subscribe(self, key, observer: Callable): with self.lock: @@ -36,4 +46,8 @@ class GraphStateManager: def notify_append_observers(self, key, new_state): for callback in self.observers[key]: - callback(key, new_state, append=True) + try: + callback(key, new_state, append=True) + except Exception as e: + logger.error(f"Error in observer {callback} for key {key}: {e}") + logger.warning("Callbacks not implemented yet") diff --git a/src/backend/langflow/graph/vertex/base.py b/src/backend/langflow/graph/vertex/base.py index 1df6b9637..66fdd44c6 100644 --- a/src/backend/langflow/graph/vertex/base.py +++ b/src/backend/langflow/graph/vertex/base.py @@ -58,6 +58,7 @@ class Vertex: self.will_stream = False self.updated_raw_params = False self.id: str = data["id"] + self.is_state = False self.is_input = any( input_component_name in self.id for input_component_name in INPUT_COMPONENTS ) @@ -99,12 +100,9 @@ class Vertex: def update_graph_state(self, key, new_state, append: bool): if append: - if key in self.graph_state: - self.graph_state[key].append(new_state) - else: - self.graph_state[key] = [new_state] + self.graph.append_state(key, new_state, caller=self.id) else: - self.graph_state[key] = new_state + self.graph.update_state(key, new_state, caller=self.id) def set_state(self, state: str): self.state = VertexStates[state] @@ -114,12 +112,12 @@ class Vertex: ): # If the vertex is inactive and has only one in degree # it means that it is not a merge point in the graph - self.graph.inactive_vertices.add(self.id) + self.graph.inactivated_vertices.add(self.id) elif ( self.state == VertexStates.ACTIVE - and self.id in self.graph.inactive_vertices + and self.id in self.graph.inactivated_vertices ): - self.graph.inactive_vertices.remove(self.id) + self.graph.inactivated_vertices.remove(self.id) @property def avg_build_time(self): @@ -385,7 +383,7 @@ class Vertex: self.params = params self._raw_params = params.copy() - def update_raw_params(self, new_params: Dict[str, str]): + def update_raw_params(self, new_params: Dict[str, str], overwrite: bool = False): """ Update the raw parameters of the vertex with the given new parameters. @@ -400,6 +398,10 @@ class Vertex: return if any(isinstance(self._raw_params.get(key), Vertex) for key in new_params): return + if not overwrite: + for key in new_params.copy(): + if key not in self._raw_params: + new_params.pop(key) self._raw_params.update(new_params) self.updated_raw_params = True @@ -560,10 +562,14 @@ class Vertex: self.params[key].extend(built) else: try: + if self.params[key] == built: + continue + self.params[key].append(built) except AttributeError as e: logger.exception(e) raise ValueError( + f"Params {key} ({self.params[key]}) is not a list and cannot be extended with {built}" f"Error building node {self.display_name}: {str(e)}" ) from e @@ -672,6 +678,10 @@ class Vertex: if self.frozen and self._built: return self.get_requester_result(requester) + elif self._built and requester is not None: + # This means that the vertex has already been built + # and we are just getting the result for the requester + return await self.get_requester_result(requester) self._reset() if self._is_chat_input() and inputs is not None: @@ -740,11 +750,3 @@ class Vertex: if self._built_object is not None else "Failed to build 😵‍💫" ) - - -class StatefulVertex(Vertex): - pass - - -class StatelessVertex(Vertex): - pass diff --git a/src/backend/langflow/graph/vertex/types.py b/src/backend/langflow/graph/vertex/types.py index adbdafd6e..1bfd7cacf 100644 --- a/src/backend/langflow/graph/vertex/types.py +++ b/src/backend/langflow/graph/vertex/types.py @@ -1,6 +1,7 @@ import ast import json -from typing import AsyncIterator, Callable, Dict, Iterator, List, Optional, Union +from typing import (AsyncIterator, Callable, Dict, Iterator, List, Optional, + Union) import yaml from langchain_core.messages import AIMessage @@ -8,14 +9,14 @@ from loguru import logger from langflow.graph.schema import INPUT_FIELD_NAME, InterfaceComponentTypes from langflow.graph.utils import UnbuiltObject, flatten_list, serialize_field -from langflow.graph.vertex.base import StatefulVertex, StatelessVertex +from langflow.graph.vertex.base import Vertex from langflow.interface.utils import extract_input_variables_from_prompt from langflow.schema import Record from langflow.services.monitor.utils import log_vertex_build from langflow.utils.schemas import ChatOutputResponse -class AgentVertex(StatelessVertex): +class AgentVertex(Vertex): def __init__(self, data: Dict, graph, params: Optional[Dict] = None): super().__init__(data, graph=graph, base_type="agents", params=params) @@ -58,12 +59,12 @@ class AgentVertex(StatelessVertex): await self._build(user_id=user_id) -class ToolVertex(StatelessVertex): +class ToolVertex(Vertex): def __init__(self, data: Dict, graph, params: Optional[Dict] = None): super().__init__(data, graph=graph, base_type="tools", params=params) -class LLMVertex(StatelessVertex): +class LLMVertex(Vertex): built_node_type = None class_built_object = None @@ -86,7 +87,7 @@ class LLMVertex(StatelessVertex): self.class_built_object = self._built_object -class ToolkitVertex(StatelessVertex): +class ToolkitVertex(Vertex): def __init__(self, data: Dict, graph, params=None): super().__init__(data, graph=graph, base_type="toolkits", params=params) @@ -100,7 +101,7 @@ class FileToolVertex(ToolVertex): ) -class WrapperVertex(StatelessVertex): +class WrapperVertex(Vertex): def __init__(self, data: Dict, graph, params=None): super().__init__(data, graph=graph, base_type="wrappers") self.steps: List[Callable] = [self._custom_build] @@ -114,7 +115,7 @@ class WrapperVertex(StatelessVertex): await self._build(user_id=user_id) -class DocumentLoaderVertex(StatefulVertex): +class DocumentLoaderVertex(Vertex): def __init__(self, data: Dict, graph, params: Optional[Dict] = None): super().__init__(data, graph=graph, base_type="documentloaders", params=params) @@ -134,12 +135,12 @@ class DocumentLoaderVertex(StatefulVertex): return f"{self.vertex_type}()" -class EmbeddingVertex(StatefulVertex): +class EmbeddingVertex(Vertex): def __init__(self, data: Dict, graph, params: Optional[Dict] = None): super().__init__(data, graph=graph, base_type="embeddings", params=params) -class VectorStoreVertex(StatefulVertex): +class VectorStoreVertex(Vertex): def __init__(self, data: Dict, graph, params=None): super().__init__(data, graph=graph, base_type="vectorstores") @@ -181,17 +182,17 @@ class VectorStoreVertex(StatefulVertex): self.remove_docs_and_texts_from_params() -class MemoryVertex(StatefulVertex): +class MemoryVertex(Vertex): def __init__(self, data: Dict, graph): super().__init__(data, graph=graph, base_type="memory") -class RetrieverVertex(StatefulVertex): +class RetrieverVertex(Vertex): def __init__(self, data: Dict, graph): super().__init__(data, graph=graph, base_type="retrievers") -class TextSplitterVertex(StatefulVertex): +class TextSplitterVertex(Vertex): def __init__(self, data: Dict, graph, params: Optional[Dict] = None): super().__init__(data, graph=graph, base_type="textsplitters", params=params) @@ -209,7 +210,7 @@ class TextSplitterVertex(StatefulVertex): return f"{self.vertex_type}()" -class ChainVertex(StatelessVertex): +class ChainVertex(Vertex): def __init__(self, data: Dict, graph): super().__init__(data, graph=graph, base_type="chains") self.steps = [self._custom_build] @@ -239,7 +240,7 @@ class ChainVertex(StatelessVertex): return super()._built_object_repr() -class PromptVertex(StatelessVertex): +class PromptVertex(Vertex): def __init__(self, data: Dict, graph): super().__init__(data, graph=graph, base_type="prompts") self.steps: List[Callable] = [self._custom_build] @@ -323,12 +324,12 @@ class PromptVertex(StatelessVertex): return str(self._built_object) -class OutputParserVertex(StatelessVertex): +class OutputParserVertex(Vertex): def __init__(self, data: Dict, graph): super().__init__(data, graph=graph, base_type="output_parsers") -class CustomComponentVertex(StatelessVertex): +class CustomComponentVertex(Vertex): def __init__(self, data: Dict, graph): super().__init__(data, graph=graph, base_type="custom_components") @@ -337,7 +338,7 @@ class CustomComponentVertex(StatelessVertex): return self.artifacts["repr"] or super()._built_object_repr() -class ChatVertex(StatelessVertex): +class ChatVertex(Vertex): def __init__(self, data: Dict, graph): super().__init__(data, graph=graph, base_type="custom_components", is_task=True) self.steps = [self._build, self._run] @@ -459,7 +460,7 @@ class ChatVertex(StatelessVertex): return self.vertex_type == InterfaceComponentTypes.ChatInput and self.is_input -class RoutingVertex(StatelessVertex): +class RoutingVertex(Vertex): def __init__(self, data: Dict, graph): super().__init__(data, graph=graph, base_type="custom_components") self.use_result = True @@ -494,6 +495,18 @@ class RoutingVertex(StatelessVertex): self._built_result = None +class StateVertex(Vertex): + def __init__(self, data: Dict, graph): + super().__init__(data, graph=graph, base_type="custom_components") + self.steps = [self._build] + self.is_state = True + + @property + def successors_ids(self) -> List[str]: + successors = self.graph.successor_map.get(self.id, []) + return successors + self.graph.activated_vertices + + def dict_to_codeblock(d: dict) -> str: serialized = {key: serialize_field(val) for key, val in d.items()} json_str = json.dumps(serialized, indent=4) diff --git a/src/backend/langflow/interface/custom/custom_component/custom_component.py b/src/backend/langflow/interface/custom/custom_component/custom_component.py index a7eaa2c05..d2913be3d 100644 --- a/src/backend/langflow/interface/custom/custom_component/custom_component.py +++ b/src/backend/langflow/interface/custom/custom_component/custom_component.py @@ -14,7 +14,6 @@ from uuid import UUID import yaml from cachetools import TTLCache, cachedmethod -from fastapi import HTTPException from langchain_core.documents import Document from pydantic import BaseModel from sqlmodel import select @@ -76,6 +75,28 @@ class CustomComponent(Component): """The status of the component. This is displayed on the frontend. Defaults to None.""" _flows_records: Optional[List[Record]] = None + def update_state(self, name: str, value: Any): + try: + self.vertex.graph.update_state( + name=name, record=value, caller=self.vertex.id + ) + except Exception as e: + raise ValueError(f"Error updating state: {e}") + + def append_state(self, name: str, value: Any): + try: + self.vertex.graph.append_state( + name=name, record=value, caller=self.vertex.id + ) + except Exception as e: + raise ValueError(f"Error appending state: {e}") + + def get_state(self, name: str): + try: + return self.vertex.graph.get_state(name=name) + except Exception as e: + raise ValueError(f"Error getting state: {e}") + _tree: Optional[dict] = None def __init__(self, **data): @@ -200,18 +221,7 @@ class CustomComponent(Component): args = build_method["args"] for arg in args: - if arg.get("type") == "prompt": - raise HTTPException( - status_code=400, - detail={ - "error": "Type hint Error", - "traceback": ( - "Prompt type is not supported in the build method." - " Try using PromptTemplate instead." - ), - }, - ) - elif not arg.get("type") and arg.get("name") != "self": + if not arg.get("type") and arg.get("name") != "self": # Set the type to Data arg["type"] = "Data" return args diff --git a/src/backend/langflow/interface/types.py b/src/backend/langflow/interface/types.py index 3dcddf99f..9a1375869 100644 --- a/src/backend/langflow/interface/types.py +++ b/src/backend/langflow/interface/types.py @@ -1,8 +1,9 @@ from cachetools import LRUCache, cached - from langflow.interface.agents.base import agent_creator from langflow.interface.chains.base import chain_creator -from langflow.interface.custom.directory_reader.utils import merge_nested_dicts_with_renaming +from langflow.interface.custom.directory_reader.utils import ( + merge_nested_dicts_with_renaming, +) from langflow.interface.custom.utils import build_custom_components from langflow.interface.document_loaders.base import documentloader_creator from langflow.interface.embeddings.base import embedding_creator @@ -13,7 +14,6 @@ from langflow.interface.retrievers.base import retriever_creator from langflow.interface.text_splitters.base import textsplitter_creator from langflow.interface.toolkits.base import toolkits_creator from langflow.interface.tools.base import tool_creator -from langflow.interface.utilities.base import utility_creator from langflow.interface.wrappers.base import wrapper_creator @@ -48,7 +48,7 @@ def build_langchain_types_dict(): # sourcery skip: dict-assign-update-to-union # vectorstore_creator, documentloader_creator, textsplitter_creator, - utility_creator, + # utility_creator, output_parser_creator, retriever_creator, ] @@ -66,4 +66,6 @@ def get_all_types_dict(settings_service): """Get all types dictionary combining native and custom components.""" native_components = build_langchain_types_dict() custom_components_from_file = build_custom_components(settings_service) - return merge_nested_dicts_with_renaming(native_components, custom_components_from_file) + return merge_nested_dicts_with_renaming( + native_components, custom_components_from_file + ) diff --git a/src/backend/langflow/processing/process.py b/src/backend/langflow/processing/process.py index 408db0a53..8ef4ae50f 100644 --- a/src/backend/langflow/processing/process.py +++ b/src/backend/langflow/processing/process.py @@ -126,7 +126,9 @@ async def process_runnable(runnable: Runnable, inputs: Union[dict, List[dict]]): elif isinstance(inputs, dict) and hasattr(runnable, "ainvoke"): result = await runnable.ainvoke(inputs) else: - raise ValueError(f"Runnable {runnable} does not support inputs of type {type(inputs)}") + raise ValueError( + f"Runnable {runnable} does not support inputs of type {type(inputs)}" + ) # Check if the result is a list of AIMessages if isinstance(result, list) and all(isinstance(r, AIMessage) for r in result): result = [r.content for r in result] @@ -135,7 +137,9 @@ async def process_runnable(runnable: Runnable, inputs: Union[dict, List[dict]]): return result -async def process_inputs_dict(built_object: Union[Chain, VectorStore, Runnable], inputs: dict): +async def process_inputs_dict( + built_object: Union[Chain, VectorStore, Runnable], inputs: dict +): if isinstance(built_object, Chain): if inputs is None: raise ValueError("Inputs must be provided for a Chain") @@ -170,7 +174,9 @@ async def process_inputs_list(built_object: Runnable, inputs: List[dict]): return await process_runnable(built_object, inputs) -async def generate_result(built_object: Union[Chain, VectorStore, Runnable], inputs: Union[dict, List[dict]]): +async def generate_result( + built_object: Union[Chain, VectorStore, Runnable], inputs: Union[dict, List[dict]] +): if isinstance(inputs, dict): result = await process_inputs_dict(built_object, inputs) elif isinstance(inputs, List) and isinstance(built_object, Runnable): @@ -198,6 +204,7 @@ async def run_graph( stream: bool, session_id: Optional[str] = None, inputs: Optional[dict[str, Union[List[str], str]]] = None, + outputs: Optional[List[str]] = None, artifacts: Optional[Dict[str, Any]] = None, session_service: Optional[SessionService] = None, ): @@ -208,24 +215,35 @@ async def run_graph( else: graph_data = graph._graph_data if not session_id and session_service is not None: - session_id = session_service.generate_key(session_id=flow_id, data_graph=graph_data) + session_id = session_service.generate_key( + session_id=flow_id, data_graph=graph_data + ) if inputs is None: inputs = {} - outputs = await graph.run(inputs, stream=stream) + outputs = await graph.run( + inputs, + outputs, + stream=stream, + session_id=session_id, + ) if session_id and session_service: session_service.update_session(session_id, (graph, artifacts)) return outputs, session_id -def validate_input(graph_data: Dict[str, Any], tweaks: Dict[str, Dict[str, Any]]) -> List[Dict[str, Any]]: +def validate_input( + graph_data: Dict[str, Any], tweaks: Dict[str, Dict[str, Any]] +) -> List[Dict[str, Any]]: if not isinstance(graph_data, dict) or not isinstance(tweaks, dict): raise ValueError("graph_data and tweaks should be dictionaries") nodes = graph_data.get("data", {}).get("nodes") or graph_data.get("nodes") if not isinstance(nodes, list): - raise ValueError("graph_data should contain a list of nodes under 'data' key or directly under 'nodes' key") + raise ValueError( + "graph_data should contain a list of nodes under 'data' key or directly under 'nodes' key" + ) return nodes @@ -234,7 +252,9 @@ def apply_tweaks(node: Dict[str, Any], node_tweaks: Dict[str, Any]) -> None: template_data = node.get("data", {}).get("node", {}).get("template") if not isinstance(template_data, dict): - logger.warning(f"Template data for node {node.get('id')} should be a dictionary") + logger.warning( + f"Template data for node {node.get('id')} should be a dictionary" + ) return for tweak_name, tweak_value in node_tweaks.items(): @@ -249,28 +269,33 @@ def apply_tweaks_on_vertex(vertex: Vertex, node_tweaks: Dict[str, Any]) -> None: vertex.params[tweak_name] = tweak_value -def process_tweaks(graph_data: Dict[str, Any], tweaks: Dict[str, Dict[str, Any]]) -> Dict[str, Any]: +def process_tweaks( + graph_data: Dict[str, Any], tweaks: Dict[str, Dict[str, Any]] +) -> Dict[str, Any]: """ This function is used to tweak the graph data using the node id and the tweaks dict. :param graph_data: The dictionary containing the graph data. It must contain a 'data' key with 'nodes' as its child or directly contain 'nodes' key. Each node should have an 'id' and 'data'. - :param tweaks: A dictionary where the key is the node id and the value is a dictionary of the tweaks. - The inner dictionary contains the name of a certain parameter as the key and the value to be tweaked. - + :param tweaks: The dictionary containing the tweaks. The keys can be the node id or the name of the tweak. + The values can be a dictionary containing the tweaks for the node or the value of the tweak. :return: The modified graph_data dictionary. :raises ValueError: If the input is not in the expected format. """ nodes = validate_input(graph_data, tweaks) + nodes_map = {node.get("id"): node for node in nodes} + + all_nodes_tweaks = {} + for key, value in tweaks.items(): + if isinstance(value, dict): + if node := nodes_map.get(key): + apply_tweaks(node, value) + else: + all_nodes_tweaks[key] = value for node in nodes: - if isinstance(node, dict) and isinstance(node.get("id"), str): - node_id = node["id"] - if node_tweaks := tweaks.get(node_id): - apply_tweaks(node, node_tweaks) - else: - logger.warning("Each node should be a dictionary with an 'id' key of type str") + apply_tweaks(node, all_nodes_tweaks) return graph_data @@ -282,6 +307,8 @@ def process_tweaks_on_graph(graph: Graph, tweaks: Dict[str, Dict[str, Any]]): if node_tweaks := tweaks.get(node_id): apply_tweaks_on_vertex(vertex, node_tweaks) else: - logger.warning("Each node should be a Vertex with an 'id' attribute of type str") + logger.warning( + "Each node should be a Vertex with an 'id' attribute of type str" + ) return graph diff --git a/src/backend/langflow/services/settings/base.py b/src/backend/langflow/services/settings/base.py index 93f8b3085..3a1ec956e 100644 --- a/src/backend/langflow/services/settings/base.py +++ b/src/backend/langflow/services/settings/base.py @@ -58,13 +58,17 @@ class Settings(BaseSettings): STORE: Optional[bool] = True STORE_URL: Optional[str] = "https://api.langflow.store" - DOWNLOAD_WEBHOOK_URL: Optional[ - str - ] = "https://api.langflow.store/flows/trigger/ec611a61-8460-4438-b187-a4f65e5559d4" - LIKE_WEBHOOK_URL: Optional[str] = "https://api.langflow.store/flows/trigger/64275852-ec00-45c1-984e-3bff814732da" + DOWNLOAD_WEBHOOK_URL: Optional[str] = ( + "https://api.langflow.store/flows/trigger/ec611a61-8460-4438-b187-a4f65e5559d4" + ) + LIKE_WEBHOOK_URL: Optional[str] = ( + "https://api.langflow.store/flows/trigger/64275852-ec00-45c1-984e-3bff814732da" + ) STORAGE_TYPE: str = "local" + CELERY_ENABLED: bool = False + @validator("CONFIG_DIR", pre=True, allow_reuse=True) def set_langflow_dir(cls, value): if not value: @@ -91,7 +95,9 @@ class Settings(BaseSettings): @validator("DATABASE_URL", pre=True) def set_database_url(cls, value, values): if not value: - logger.debug("No database_url provided, trying LANGFLOW_DATABASE_URL env variable") + logger.debug( + "No database_url provided, trying LANGFLOW_DATABASE_URL env variable" + ) if langflow_database_url := os.getenv("LANGFLOW_DATABASE_URL"): value = langflow_database_url logger.debug("Using LANGFLOW_DATABASE_URL env variable.") @@ -101,7 +107,9 @@ class Settings(BaseSettings): # so we need to migrate to the new format # if there is a database in that location if not values["CONFIG_DIR"]: - raise ValueError("CONFIG_DIR not set, please set it or provide a DATABASE_URL") + raise ValueError( + "CONFIG_DIR not set, please set it or provide a DATABASE_URL" + ) new_path = f"{values['CONFIG_DIR']}/langflow.db" if Path("./langflow.db").exists(): @@ -125,15 +133,22 @@ class Settings(BaseSettings): if os.getenv("LANGFLOW_COMPONENTS_PATH"): logger.debug("Adding LANGFLOW_COMPONENTS_PATH to components_path") langflow_component_path = os.getenv("LANGFLOW_COMPONENTS_PATH") - if Path(langflow_component_path).exists() and langflow_component_path not in value: + if ( + Path(langflow_component_path).exists() + and langflow_component_path not in value + ): if isinstance(langflow_component_path, list): for path in langflow_component_path: if path not in value: value.append(path) - logger.debug(f"Extending {langflow_component_path} to components_path") + logger.debug( + f"Extending {langflow_component_path} to components_path" + ) elif langflow_component_path not in value: value.append(langflow_component_path) - logger.debug(f"Appending {langflow_component_path} to components_path") + logger.debug( + f"Appending {langflow_component_path} to components_path" + ) if not value: value = [BASE_COMPONENTS_PATH] @@ -145,7 +160,9 @@ class Settings(BaseSettings): logger.debug(f"Components path: {value}") return value - model_config = SettingsConfigDict(validate_assignment=True, extra="ignore", env_prefix="LANGFLOW_") + model_config = SettingsConfigDict( + validate_assignment=True, extra="ignore", env_prefix="LANGFLOW_" + ) # @model_validator() # @classmethod diff --git a/src/backend/langflow/services/socket/utils.py b/src/backend/langflow/services/socket/utils.py index 4176d081c..a45b85cd6 100644 --- a/src/backend/langflow/services/socket/utils.py +++ b/src/backend/langflow/services/socket/utils.py @@ -7,7 +7,7 @@ from sqlmodel import select from langflow.api.utils import format_elapsed_time from langflow.api.v1.schemas import ResultDataResponse, VertexBuildResponse from langflow.graph.graph.base import Graph -from langflow.graph.vertex.base import StatelessVertex +from langflow.graph.vertex.base import Vertex from langflow.services.database.models.flow.model import Flow from langflow.services.deps import get_session from langflow.services.monitor.utils import log_vertex_build @@ -63,7 +63,7 @@ async def build_vertex( return start_time = time.perf_counter() try: - if isinstance(vertex, StatelessVertex) or not vertex._built: + if isinstance(vertex, Vertex) or not vertex._built: await vertex.build(user_id=None, session_id=sid) params = vertex._built_object_repr() valid = True @@ -96,7 +96,9 @@ async def build_vertex( ) # Emit the vertex build response - response = VertexBuildResponse(valid=valid, params=params, id=vertex.id, data=result_dict) + response = VertexBuildResponse( + valid=valid, params=params, id=vertex.id, data=result_dict + ) await sio.emit("vertex_build", data=response.model_dump(), to=sid) except Exception as exc: diff --git a/src/backend/langflow/services/task/service.py b/src/backend/langflow/services/task/service.py index 3f7a81f2c..e7f87d3f9 100644 --- a/src/backend/langflow/services/task/service.py +++ b/src/backend/langflow/services/task/service.py @@ -1,11 +1,14 @@ -from typing import Any, Callable, Coroutine, Union +from typing import TYPE_CHECKING, Any, Callable, Coroutine, Union + +from loguru import logger from langflow.services.base import Service from langflow.services.task.backends.anyio import AnyIOBackend from langflow.services.task.backends.base import TaskBackend from langflow.services.task.utils import get_celery_worker_status -from langflow.utils.logger import configure -from loguru import logger + +if TYPE_CHECKING: + from langflow.services.settings.service import SettingsService def check_celery_availability(): @@ -20,28 +23,31 @@ def check_celery_availability(): return status -try: - configure() - status = check_celery_availability() - - USE_CELERY = status.get("availability") is not None -except ImportError: - USE_CELERY = False - - class TaskService(Service): name = "task_service" - def __init__(self): - self.backend = self.get_backend() + def __init__(self, settings_service: "SettingsService"): + self.settings_service = settings_service + try: + if self.settings_service.settings.CELERY_ENABLED: + USE_CELERY = True + status = check_celery_availability() + + USE_CELERY = status.get("availability") is not None + else: + USE_CELERY = False + except ImportError: + USE_CELERY = False + self.use_celery = USE_CELERY + self.backend = self.get_backend() @property def backend_name(self) -> str: return self.backend.name def get_backend(self) -> TaskBackend: - if USE_CELERY: + if self.use_celery: from langflow.services.task.backends.celery import CeleryBackend logger.debug("Using Celery backend") @@ -68,7 +74,9 @@ class TaskService(Service): result = await result return task.id, result - async def launch_task(self, task_func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: + async def launch_task( + self, task_func: Callable[..., Any], *args: Any, **kwargs: Any + ) -> Any: logger.debug(f"Launching task {task_func} with args {args} and kwargs {kwargs}") logger.debug(f"Using backend {self.backend}") task = self.backend.launch_task(task_func, *args, **kwargs) diff --git a/src/frontend/src/App.tsx b/src/frontend/src/App.tsx index 22fd6c5bb..b57dad461 100644 --- a/src/frontend/src/App.tsx +++ b/src/frontend/src/App.tsx @@ -18,7 +18,6 @@ import { getHealth } from "./controllers/API"; import Router from "./routes"; import useAlertStore from "./stores/alertStore"; import { useDarkStore } from "./stores/darkStore"; -import useFlowStore from "./stores/flowStore"; import useFlowsManagerStore from "./stores/flowsManagerStore"; import { useStoreStore } from "./stores/storeStore"; import { useTypesStore } from "./stores/typesStore"; @@ -70,7 +69,7 @@ export default function App() { .catch(() => { setFetchError(true); }); - }, 20000); + }, 20000); // 20 seconds // Clean up the timer on component unmount return () => { diff --git a/src/frontend/src/stores/flowStore.ts b/src/frontend/src/stores/flowStore.ts index 782a01400..cbf183289 100644 --- a/src/frontend/src/stores/flowStore.ts +++ b/src/frontend/src/stores/flowStore.ts @@ -450,9 +450,10 @@ const useFlowStore = create((set, get) => ({ status: BuildStatus, runId: string ) { - if (vertexBuildData && vertexBuildData.inactive_vertices) { - get().removeFromVerticesBuild(vertexBuildData.inactive_vertices); + if (vertexBuildData && vertexBuildData.inactivated_vertices) { + get().removeFromVerticesBuild(vertexBuildData.inactivated_vertices); } + if (vertexBuildData.next_vertices_ids) { // next_vertices_ids is a list of vertices that are going to be built next // verticesLayers is a list of list of vertices ids, where each list is a layer of vertices @@ -513,6 +514,7 @@ const useFlowStore = create((set, get) => ({ get().setIsBuilding(false); }, onBuildStart: (idList) => { + console.log("onBuildStart", idList); useFlowStore.getState().updateBuildStatus(idList, BuildStatus.BUILDING); }, validateNodes: validateSubgraph, @@ -538,6 +540,16 @@ const useFlowStore = create((set, get) => ({ set({ verticesBuild: vertices }); }, verticesBuild: null, + addToVerticesBuild: (vertices: string[]) => { + const verticesBuild = get().verticesBuild; + if (!verticesBuild) return; + set({ + verticesBuild: { + ...verticesBuild, + verticesIds: [...verticesBuild.verticesIds, ...vertices], + }, + }); + }, removeFromVerticesBuild: (vertices: string[]) => { const verticesBuild = get().verticesBuild; if (!verticesBuild) return; diff --git a/src/frontend/src/stores/flowsManagerStore.ts b/src/frontend/src/stores/flowsManagerStore.ts index 49849ba54..fac806c5f 100644 --- a/src/frontend/src/stores/flowsManagerStore.ts +++ b/src/frontend/src/stores/flowsManagerStore.ts @@ -92,7 +92,6 @@ const useFlowsManagerStore = create((set, get) => ({ true ); } - set({ saveLoading: true }); }, 500); // Delay of 500ms because chat message depends on it. }, saveFlow: (flow: FlowType, silent?: boolean) => { diff --git a/src/frontend/src/types/api/index.ts b/src/frontend/src/types/api/index.ts index c277c916f..bcbb31730 100644 --- a/src/frontend/src/types/api/index.ts +++ b/src/frontend/src/types/api/index.ts @@ -141,8 +141,8 @@ export type VerticesOrderTypeAPI = { export type VertexBuildTypeAPI = { id: string; + inactivated_vertices: Array | null; next_vertices_ids: Array; - inactive_vertices: Array | null; run_id: string; valid: boolean; params: string; diff --git a/src/frontend/src/types/zustand/flow/index.ts b/src/frontend/src/types/zustand/flow/index.ts index a805b24fd..0f5fe6ecc 100644 --- a/src/frontend/src/types/zustand/flow/index.ts +++ b/src/frontend/src/types/zustand/flow/index.ts @@ -107,6 +107,7 @@ export type FlowStoreType = { runId: string; } | null ) => void; + addToVerticesBuild: (vertices: string[]) => void; removeFromVerticesBuild: (vertices: string[]) => void; verticesBuild: { verticesIds: string[]; diff --git a/src/frontend/src/utils/buildUtils.ts b/src/frontend/src/utils/buildUtils.ts index ba3327959..34edd1592 100644 --- a/src/frontend/src/utils/buildUtils.ts +++ b/src/frontend/src/utils/buildUtils.ts @@ -32,6 +32,7 @@ function getInactiveVertexData(vertexId: string): VertexBuildTypeAPI { id: vertexId, data: inactiveData, params: "Inactive", + inactivated_vertices: null, run_id: "", next_vertices_ids: [], inactive_vertices: null,