diff --git a/src/backend/base/langflow/services/flow/flow_runner.py b/src/backend/base/langflow/services/flow/flow_runner.py index 2e84fd0b9..fc289ed90 100644 --- a/src/backend/base/langflow/services/flow/flow_runner.py +++ b/src/backend/base/langflow/services/flow/flow_runner.py @@ -1,22 +1,28 @@ import json import os -from collections.abc import Callable from pathlib import Path -from typing import Any -from uuid import UUID +from uuid import UUID, uuid4 from aiofile import async_open from loguru import logger -from sqlalchemy import text +from sqlmodel import delete, text from langflow.api.utils import cascade_delete_flow from langflow.graph import Graph -from langflow.load import aload_flow_from_json -from langflow.processing.process import run_graph +from langflow.graph.vertex.param_handler import ParameterHandler +from langflow.load.utils import replace_tweaks_with_env +from langflow.logging.logger import configure +from langflow.processing.process import process_tweaks, run_graph +from langflow.services.auth.utils import ( + get_password_hash, +) from langflow.services.cache.service import AsyncBaseCacheService from langflow.services.database.models.flow import Flow +from langflow.services.database.models.user import User +from langflow.services.database.models.variable import Variable from langflow.services.database.utils import initialize_database -from langflow.services.deps import get_cache_service, session_scope +from langflow.services.deps import get_cache_service, get_storage_service, session_scope +from langflow.utils.util import update_settings class LangflowRunnerExperimental: @@ -36,7 +42,18 @@ class LangflowRunnerExperimental: """ - should_initialize_db: bool = True + def __init__( + self, + *, + should_initialize_db: bool = True, + log_level: str | None = None, + log_file: str | None = None, + disable_logs: bool = False, + async_log_file: bool = True, + ): + self.should_initialize_db = should_initialize_db + log_file_path = Path(log_file) if log_file else None + configure(log_level=log_level, log_file=log_file_path, disable=disable_logs, async_file=async_log_file) async def run( self, @@ -45,39 +62,135 @@ class LangflowRunnerExperimental: input_value: str, *, input_type: str = "chat", - output_type: str = "chat", + output_type: str = "all", + cache: str | None = None, + stream: bool = False, + user_id: str | None = None, + generate_user: bool = False, # If True, generates a new user for the flow + cleanup: bool = True, # If True, clears flow state after execution + tweaks_values: dict | None = None, + ): + try: + logger.info(f"Start Handling {session_id=}") + await self.init_db_if_needed() + # Update settings with cache and components path + await update_settings(cache=cache) + if generate_user: + user = await self.generate_user() + user_id = str(user.id) + flow_dict = await self.prepare_flow_and_add_to_db( + flow=flow, + user_id=user_id, + session_id=session_id, + tweaks_values=tweaks_values, + ) + return await self.run_flow( + input_value=input_value, + session_id=session_id, + flow_dict=flow_dict, + input_type=input_type, + output_type=output_type, + user_id=user_id, + stream=stream, + ) + finally: + if cleanup and user_id: + await self.clear_user_state(user_id=user_id) + + async def run_flow( + self, + *, + input_value: str, + session_id: str, + flow_dict: dict, + input_type: str = "chat", + output_type: str = "all", + user_id: str | None = None, stream: bool = False, ): - logger.info(f"Start Handling {session_id=}") - await self.init_db_if_needed() - flow_dict = await self.get_flow_dict(flow) - self.set_flow_id(session_id, flow_dict) - # we must modify the flow schema to set the session_id and for load_from_db=True we load the value from env vars - self.modification(flow_dict, lambda obj, parent, key: self.modify_flow_schema(session_id, obj, parent, key)) - await self.clear_flow_state(session_id, flow_dict) - await self.add_flow_to_db(session_id, flow_dict) - graph = await self.create_graph_from_flow(session_id, flow_dict) + graph = await self.create_graph_from_flow(session_id, flow_dict, user_id=user_id) try: result = await self.run_graph(input_value, input_type, output_type, session_id, graph, stream=stream) finally: - await self.clear_flow_state(session_id, flow_dict) + await self.clear_flow_state(flow_dict) logger.info(f"Finish Handling {session_id=}") return result - @staticmethod - def set_flow_id(session_id: str, flow_dict: dict) -> None: - flow_dict["id"] = session_id + async def prepare_flow_and_add_to_db( + self, + *, + flow: Path | str | dict, + user_id: str | None = None, + custom_flow_id: str | None = None, + session_id: str | None = None, + tweaks_values: dict | None = None, + ) -> dict: + flow_dict = await self.get_flow_dict(flow) + session_id = session_id or custom_flow_id or str(uuid4()) + if custom_flow_id: + flow_dict["id"] = custom_flow_id + flow_dict = self.process_tweaks(flow_dict, tweaks_values=tweaks_values) + await self.clear_flow_state(flow_dict) + await self.add_flow_to_db(flow_dict, user_id=user_id) + return flow_dict + + def process_tweaks(self, flow_dict: dict, tweaks_values: dict | None = None) -> dict: + tweaks: dict | None = None + tweaks_values = tweaks_values or os.environ.copy() + for vertex in Graph.from_payload(flow_dict).vertices: + param_handler = ParameterHandler(vertex, get_storage_service()) + field_params, load_from_db_fields = param_handler.process_field_parameters() + for db_field in load_from_db_fields: + if field_params[db_field]: + tweaks = tweaks or {} + tweaks[vertex.id] = tweaks.get(vertex.id, {}) + tweaks[vertex.id][db_field] = field_params[db_field] + if tweaks is not None: + tweaks = replace_tweaks_with_env(tweaks=tweaks, env_vars=tweaks_values) + flow_dict = process_tweaks(flow_dict, tweaks) + + # Recursively update load_from_db fields + def update_load_from_db(obj): + if isinstance(obj, dict): + for key, value in obj.items(): + if key == "load_from_db" and value is True: + obj[key] = False + else: + update_load_from_db(value) + elif isinstance(obj, list): + for item in obj: + update_load_from_db(item) + + update_load_from_db(flow_dict) + return flow_dict + + async def generate_user(self) -> User: + async with session_scope() as session: + user_id = str(uuid4()) + user = User(id=user_id, username=user_id, password=get_password_hash(str(uuid4())), is_active=True) + session.add(user) + await session.commit() + await session.refresh(user) + return user @staticmethod - async def add_flow_to_db(session_id: str, flow_dict: dict): + async def add_flow_to_db(flow_dict: dict, user_id: str | None): async with session_scope() as session: - flow_db = Flow(name=session_id, id=UUID(flow_dict["id"]), data=flow_dict.get("data", {})) + flow_db = Flow( + name=flow_dict.get("name"), id=UUID(flow_dict["id"]), data=flow_dict.get("data", {}), user_id=user_id + ) session.add(flow_db) await session.commit() @staticmethod async def run_graph( - input_value: str, input_type: str, output_type: str, session_id: str, graph: Graph, *, stream: bool + input_value: str, + input_type: str, + output_type: str, + session_id: str, + graph: Graph, + *, + stream: bool, ): return await run_graph( graph=graph, @@ -90,17 +203,18 @@ class LangflowRunnerExperimental: ) @staticmethod - async def create_graph_from_flow(session_id: str, flow_dict: dict): - graph = await aload_flow_from_json(flow=flow_dict, disable_logs=False) - graph.flow_id = flow_dict["id"] - graph.flow_name = flow_dict.get("name") + async def create_graph_from_flow(session_id: str, flow_dict: dict, user_id: str | None = None): + graph = Graph.from_payload( + payload=flow_dict, flow_id=flow_dict["id"], flow_name=flow_dict.get("name"), user_id=user_id + ) graph.session_id = session_id graph.set_run_id(session_id) + graph.user_id = user_id await graph.initialize_run() return graph @staticmethod - async def clear_flow_state(_session_id: str, flow_dict: dict): + async def clear_flow_state(flow_dict: dict): cache_service = get_cache_service() if isinstance(cache_service, AsyncBaseCacheService): await cache_service.clear() @@ -111,6 +225,13 @@ class LangflowRunnerExperimental: uuid_obj = flow_id if isinstance(flow_id, UUID) else UUID(str(flow_id)) await cascade_delete_flow(session, uuid_obj) + @staticmethod + async def clear_user_state(user_id: str): + async with session_scope() as session: + await session.exec(delete(Flow).where(Flow.user_id == user_id)) + await session.exec(delete(User).where(User.id == user_id)) + await session.exec(delete(Variable).where(Variable.user_id == user_id)) + async def init_db_if_needed(self): if not await self.database_exists_check() and self.should_initialize_db: logger.info("Initializing database...") @@ -139,42 +260,3 @@ class LangflowRunnerExperimental: return flow error_msg = "Input must be a file path (str or Path object) or a JSON object (dict)." raise TypeError(error_msg) - - @staticmethod - def modify_flow_schema(session_id: str, obj: Any, parent: Any | None, _key: str | None): - if not isinstance(obj, dict): - return - parent_dict = parent if isinstance(parent, dict) else {} - parent_display = parent_dict.get("display_name", parent_dict.get("name", parent_dict.get("id", "unknown"))) - if "session_id" in obj: - obj["session_id"] = session_id - logger.info(f"Setting {session_id=} for {parent_display=}") - if obj.get("load_from_db"): - obj["load_from_db"] = False - env_var_name = obj["value"] - if not env_var_name: - return - env_var_value = os.getenv(env_var_name) - if not env_var_value: - error_msg = f"Environment variable {env_var_name} not set for {parent_display}" - raise ValueError(error_msg) - obj["value"] = os.getenv(env_var_name) - logger.info(f"Loading env var {env_var_name=} for {parent_display=}") - - def modification(self, obj: Any, func: Callable[[Any, Any | None, str | None], None], parent: Any = None) -> None: - """Recursively apply a function to all elements in a nested structure (dict or list). - - The function is called with three arguments: the current object, its parent, and the key (if applicable). - """ - if isinstance(obj, dict): - for key, value in obj.items(): - func(value, parent, key) - self.modification(value, func, obj) - return - if isinstance(obj, list): - for item in obj: - func(item, parent, None) - self.modification(item, func, obj) - return - # primitive types (int, float, str, bool, None) - func(obj, parent, None) diff --git a/src/backend/tests/unit/services/flow/test_flow_runner.py b/src/backend/tests/unit/services/flow/test_flow_runner.py index 3d4ae5da4..fb047441c 100644 --- a/src/backend/tests/unit/services/flow/test_flow_runner.py +++ b/src/backend/tests/unit/services/flow/test_flow_runner.py @@ -7,10 +7,12 @@ from langflow.services.flow.flow_runner import LangflowRunnerExperimental @pytest.fixture def sample_flow_dict(): return { + "id": str(uuid4()), # Add required ID field + "name": "test_flow", # Add name field "data": { "nodes": [], "edges": [], - } + }, } @@ -48,9 +50,9 @@ async def test_run_with_dict_input(flow_runner, sample_flow_dict): input_value = "test input" result = await flow_runner.run( + session_id=session_id, flow=sample_flow_dict, input_value=input_value, - session_id=session_id, ) assert result is not None @@ -62,16 +64,16 @@ async def test_run_with_different_input_types(flow_runner, sample_flow_dict): test_cases = [ ("text input", "text", "text"), ("chat input", "chat", "chat"), - ("test input", "chat", "text"), + ("test input", "chat", "all"), # Updated to use "all" as default output_type ] for input_value, input_type, output_type in test_cases: result = await flow_runner.run( + session_id=session_id, flow=sample_flow_dict, input_value=input_value, input_type=input_type, output_type=output_type, - session_id=session_id, ) assert result is not None