feat: flow_runner better init and remove the depend on old load func (#7933)
* fixes * fix: Update SQLAlchemy import to SQLModel in flow_runner.py * [autofix.ci] apply automated fixes * Update flow_runner tests to match new LangflowRunnerExperimental API (#1) * Initial plan * Update test_flow_runner.py to match new LangflowRunnerExperimental API Co-authored-by: barnuri <13019522+barnuri@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: barnuri <13019522+barnuri@users.noreply.github.com> * [autofix.ci] apply automated fixes * patch-1 - fix lint * patch-1 - tweaks_values * patch-1 - tweaks_values * lint --------- Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Edwin Jose <edwin.jose@datastax.com> Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com> Co-authored-by: barnuri <13019522+barnuri@users.noreply.github.com>
This commit is contained in:
parent
2c74c291a8
commit
fe7b7dfd27
2 changed files with 157 additions and 73 deletions
|
|
@ -1,22 +1,28 @@
|
|||
import json
|
||||
import os
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from uuid import UUID
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
from aiofile import async_open
|
||||
from loguru import logger
|
||||
from sqlalchemy import text
|
||||
from sqlmodel import delete, text
|
||||
|
||||
from langflow.api.utils import cascade_delete_flow
|
||||
from langflow.graph import Graph
|
||||
from langflow.load import aload_flow_from_json
|
||||
from langflow.processing.process import run_graph
|
||||
from langflow.graph.vertex.param_handler import ParameterHandler
|
||||
from langflow.load.utils import replace_tweaks_with_env
|
||||
from langflow.logging.logger import configure
|
||||
from langflow.processing.process import process_tweaks, run_graph
|
||||
from langflow.services.auth.utils import (
|
||||
get_password_hash,
|
||||
)
|
||||
from langflow.services.cache.service import AsyncBaseCacheService
|
||||
from langflow.services.database.models.flow import Flow
|
||||
from langflow.services.database.models.user import User
|
||||
from langflow.services.database.models.variable import Variable
|
||||
from langflow.services.database.utils import initialize_database
|
||||
from langflow.services.deps import get_cache_service, session_scope
|
||||
from langflow.services.deps import get_cache_service, get_storage_service, session_scope
|
||||
from langflow.utils.util import update_settings
|
||||
|
||||
|
||||
class LangflowRunnerExperimental:
|
||||
|
|
@ -36,7 +42,18 @@ class LangflowRunnerExperimental:
|
|||
|
||||
"""
|
||||
|
||||
should_initialize_db: bool = True
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
should_initialize_db: bool = True,
|
||||
log_level: str | None = None,
|
||||
log_file: str | None = None,
|
||||
disable_logs: bool = False,
|
||||
async_log_file: bool = True,
|
||||
):
|
||||
self.should_initialize_db = should_initialize_db
|
||||
log_file_path = Path(log_file) if log_file else None
|
||||
configure(log_level=log_level, log_file=log_file_path, disable=disable_logs, async_file=async_log_file)
|
||||
|
||||
async def run(
|
||||
self,
|
||||
|
|
@ -45,39 +62,135 @@ class LangflowRunnerExperimental:
|
|||
input_value: str,
|
||||
*,
|
||||
input_type: str = "chat",
|
||||
output_type: str = "chat",
|
||||
output_type: str = "all",
|
||||
cache: str | None = None,
|
||||
stream: bool = False,
|
||||
user_id: str | None = None,
|
||||
generate_user: bool = False, # If True, generates a new user for the flow
|
||||
cleanup: bool = True, # If True, clears flow state after execution
|
||||
tweaks_values: dict | None = None,
|
||||
):
|
||||
try:
|
||||
logger.info(f"Start Handling {session_id=}")
|
||||
await self.init_db_if_needed()
|
||||
# Update settings with cache and components path
|
||||
await update_settings(cache=cache)
|
||||
if generate_user:
|
||||
user = await self.generate_user()
|
||||
user_id = str(user.id)
|
||||
flow_dict = await self.prepare_flow_and_add_to_db(
|
||||
flow=flow,
|
||||
user_id=user_id,
|
||||
session_id=session_id,
|
||||
tweaks_values=tweaks_values,
|
||||
)
|
||||
return await self.run_flow(
|
||||
input_value=input_value,
|
||||
session_id=session_id,
|
||||
flow_dict=flow_dict,
|
||||
input_type=input_type,
|
||||
output_type=output_type,
|
||||
user_id=user_id,
|
||||
stream=stream,
|
||||
)
|
||||
finally:
|
||||
if cleanup and user_id:
|
||||
await self.clear_user_state(user_id=user_id)
|
||||
|
||||
async def run_flow(
|
||||
self,
|
||||
*,
|
||||
input_value: str,
|
||||
session_id: str,
|
||||
flow_dict: dict,
|
||||
input_type: str = "chat",
|
||||
output_type: str = "all",
|
||||
user_id: str | None = None,
|
||||
stream: bool = False,
|
||||
):
|
||||
logger.info(f"Start Handling {session_id=}")
|
||||
await self.init_db_if_needed()
|
||||
flow_dict = await self.get_flow_dict(flow)
|
||||
self.set_flow_id(session_id, flow_dict)
|
||||
# we must modify the flow schema to set the session_id and for load_from_db=True we load the value from env vars
|
||||
self.modification(flow_dict, lambda obj, parent, key: self.modify_flow_schema(session_id, obj, parent, key))
|
||||
await self.clear_flow_state(session_id, flow_dict)
|
||||
await self.add_flow_to_db(session_id, flow_dict)
|
||||
graph = await self.create_graph_from_flow(session_id, flow_dict)
|
||||
graph = await self.create_graph_from_flow(session_id, flow_dict, user_id=user_id)
|
||||
try:
|
||||
result = await self.run_graph(input_value, input_type, output_type, session_id, graph, stream=stream)
|
||||
finally:
|
||||
await self.clear_flow_state(session_id, flow_dict)
|
||||
await self.clear_flow_state(flow_dict)
|
||||
logger.info(f"Finish Handling {session_id=}")
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def set_flow_id(session_id: str, flow_dict: dict) -> None:
|
||||
flow_dict["id"] = session_id
|
||||
async def prepare_flow_and_add_to_db(
|
||||
self,
|
||||
*,
|
||||
flow: Path | str | dict,
|
||||
user_id: str | None = None,
|
||||
custom_flow_id: str | None = None,
|
||||
session_id: str | None = None,
|
||||
tweaks_values: dict | None = None,
|
||||
) -> dict:
|
||||
flow_dict = await self.get_flow_dict(flow)
|
||||
session_id = session_id or custom_flow_id or str(uuid4())
|
||||
if custom_flow_id:
|
||||
flow_dict["id"] = custom_flow_id
|
||||
flow_dict = self.process_tweaks(flow_dict, tweaks_values=tweaks_values)
|
||||
await self.clear_flow_state(flow_dict)
|
||||
await self.add_flow_to_db(flow_dict, user_id=user_id)
|
||||
return flow_dict
|
||||
|
||||
def process_tweaks(self, flow_dict: dict, tweaks_values: dict | None = None) -> dict:
|
||||
tweaks: dict | None = None
|
||||
tweaks_values = tweaks_values or os.environ.copy()
|
||||
for vertex in Graph.from_payload(flow_dict).vertices:
|
||||
param_handler = ParameterHandler(vertex, get_storage_service())
|
||||
field_params, load_from_db_fields = param_handler.process_field_parameters()
|
||||
for db_field in load_from_db_fields:
|
||||
if field_params[db_field]:
|
||||
tweaks = tweaks or {}
|
||||
tweaks[vertex.id] = tweaks.get(vertex.id, {})
|
||||
tweaks[vertex.id][db_field] = field_params[db_field]
|
||||
if tweaks is not None:
|
||||
tweaks = replace_tweaks_with_env(tweaks=tweaks, env_vars=tweaks_values)
|
||||
flow_dict = process_tweaks(flow_dict, tweaks)
|
||||
|
||||
# Recursively update load_from_db fields
|
||||
def update_load_from_db(obj):
|
||||
if isinstance(obj, dict):
|
||||
for key, value in obj.items():
|
||||
if key == "load_from_db" and value is True:
|
||||
obj[key] = False
|
||||
else:
|
||||
update_load_from_db(value)
|
||||
elif isinstance(obj, list):
|
||||
for item in obj:
|
||||
update_load_from_db(item)
|
||||
|
||||
update_load_from_db(flow_dict)
|
||||
return flow_dict
|
||||
|
||||
async def generate_user(self) -> User:
|
||||
async with session_scope() as session:
|
||||
user_id = str(uuid4())
|
||||
user = User(id=user_id, username=user_id, password=get_password_hash(str(uuid4())), is_active=True)
|
||||
session.add(user)
|
||||
await session.commit()
|
||||
await session.refresh(user)
|
||||
return user
|
||||
|
||||
@staticmethod
|
||||
async def add_flow_to_db(session_id: str, flow_dict: dict):
|
||||
async def add_flow_to_db(flow_dict: dict, user_id: str | None):
|
||||
async with session_scope() as session:
|
||||
flow_db = Flow(name=session_id, id=UUID(flow_dict["id"]), data=flow_dict.get("data", {}))
|
||||
flow_db = Flow(
|
||||
name=flow_dict.get("name"), id=UUID(flow_dict["id"]), data=flow_dict.get("data", {}), user_id=user_id
|
||||
)
|
||||
session.add(flow_db)
|
||||
await session.commit()
|
||||
|
||||
@staticmethod
|
||||
async def run_graph(
|
||||
input_value: str, input_type: str, output_type: str, session_id: str, graph: Graph, *, stream: bool
|
||||
input_value: str,
|
||||
input_type: str,
|
||||
output_type: str,
|
||||
session_id: str,
|
||||
graph: Graph,
|
||||
*,
|
||||
stream: bool,
|
||||
):
|
||||
return await run_graph(
|
||||
graph=graph,
|
||||
|
|
@ -90,17 +203,18 @@ class LangflowRunnerExperimental:
|
|||
)
|
||||
|
||||
@staticmethod
|
||||
async def create_graph_from_flow(session_id: str, flow_dict: dict):
|
||||
graph = await aload_flow_from_json(flow=flow_dict, disable_logs=False)
|
||||
graph.flow_id = flow_dict["id"]
|
||||
graph.flow_name = flow_dict.get("name")
|
||||
async def create_graph_from_flow(session_id: str, flow_dict: dict, user_id: str | None = None):
|
||||
graph = Graph.from_payload(
|
||||
payload=flow_dict, flow_id=flow_dict["id"], flow_name=flow_dict.get("name"), user_id=user_id
|
||||
)
|
||||
graph.session_id = session_id
|
||||
graph.set_run_id(session_id)
|
||||
graph.user_id = user_id
|
||||
await graph.initialize_run()
|
||||
return graph
|
||||
|
||||
@staticmethod
|
||||
async def clear_flow_state(_session_id: str, flow_dict: dict):
|
||||
async def clear_flow_state(flow_dict: dict):
|
||||
cache_service = get_cache_service()
|
||||
if isinstance(cache_service, AsyncBaseCacheService):
|
||||
await cache_service.clear()
|
||||
|
|
@ -111,6 +225,13 @@ class LangflowRunnerExperimental:
|
|||
uuid_obj = flow_id if isinstance(flow_id, UUID) else UUID(str(flow_id))
|
||||
await cascade_delete_flow(session, uuid_obj)
|
||||
|
||||
@staticmethod
|
||||
async def clear_user_state(user_id: str):
|
||||
async with session_scope() as session:
|
||||
await session.exec(delete(Flow).where(Flow.user_id == user_id))
|
||||
await session.exec(delete(User).where(User.id == user_id))
|
||||
await session.exec(delete(Variable).where(Variable.user_id == user_id))
|
||||
|
||||
async def init_db_if_needed(self):
|
||||
if not await self.database_exists_check() and self.should_initialize_db:
|
||||
logger.info("Initializing database...")
|
||||
|
|
@ -139,42 +260,3 @@ class LangflowRunnerExperimental:
|
|||
return flow
|
||||
error_msg = "Input must be a file path (str or Path object) or a JSON object (dict)."
|
||||
raise TypeError(error_msg)
|
||||
|
||||
@staticmethod
|
||||
def modify_flow_schema(session_id: str, obj: Any, parent: Any | None, _key: str | None):
|
||||
if not isinstance(obj, dict):
|
||||
return
|
||||
parent_dict = parent if isinstance(parent, dict) else {}
|
||||
parent_display = parent_dict.get("display_name", parent_dict.get("name", parent_dict.get("id", "unknown")))
|
||||
if "session_id" in obj:
|
||||
obj["session_id"] = session_id
|
||||
logger.info(f"Setting {session_id=} for {parent_display=}")
|
||||
if obj.get("load_from_db"):
|
||||
obj["load_from_db"] = False
|
||||
env_var_name = obj["value"]
|
||||
if not env_var_name:
|
||||
return
|
||||
env_var_value = os.getenv(env_var_name)
|
||||
if not env_var_value:
|
||||
error_msg = f"Environment variable {env_var_name} not set for {parent_display}"
|
||||
raise ValueError(error_msg)
|
||||
obj["value"] = os.getenv(env_var_name)
|
||||
logger.info(f"Loading env var {env_var_name=} for {parent_display=}")
|
||||
|
||||
def modification(self, obj: Any, func: Callable[[Any, Any | None, str | None], None], parent: Any = None) -> None:
|
||||
"""Recursively apply a function to all elements in a nested structure (dict or list).
|
||||
|
||||
The function is called with three arguments: the current object, its parent, and the key (if applicable).
|
||||
"""
|
||||
if isinstance(obj, dict):
|
||||
for key, value in obj.items():
|
||||
func(value, parent, key)
|
||||
self.modification(value, func, obj)
|
||||
return
|
||||
if isinstance(obj, list):
|
||||
for item in obj:
|
||||
func(item, parent, None)
|
||||
self.modification(item, func, obj)
|
||||
return
|
||||
# primitive types (int, float, str, bool, None)
|
||||
func(obj, parent, None)
|
||||
|
|
|
|||
|
|
@ -7,10 +7,12 @@ from langflow.services.flow.flow_runner import LangflowRunnerExperimental
|
|||
@pytest.fixture
|
||||
def sample_flow_dict():
|
||||
return {
|
||||
"id": str(uuid4()), # Add required ID field
|
||||
"name": "test_flow", # Add name field
|
||||
"data": {
|
||||
"nodes": [],
|
||||
"edges": [],
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -48,9 +50,9 @@ async def test_run_with_dict_input(flow_runner, sample_flow_dict):
|
|||
input_value = "test input"
|
||||
|
||||
result = await flow_runner.run(
|
||||
session_id=session_id,
|
||||
flow=sample_flow_dict,
|
||||
input_value=input_value,
|
||||
session_id=session_id,
|
||||
)
|
||||
assert result is not None
|
||||
|
||||
|
|
@ -62,16 +64,16 @@ async def test_run_with_different_input_types(flow_runner, sample_flow_dict):
|
|||
test_cases = [
|
||||
("text input", "text", "text"),
|
||||
("chat input", "chat", "chat"),
|
||||
("test input", "chat", "text"),
|
||||
("test input", "chat", "all"), # Updated to use "all" as default output_type
|
||||
]
|
||||
|
||||
for input_value, input_type, output_type in test_cases:
|
||||
result = await flow_runner.run(
|
||||
session_id=session_id,
|
||||
flow=sample_flow_dict,
|
||||
input_value=input_value,
|
||||
input_type=input_type,
|
||||
output_type=output_type,
|
||||
session_id=session_id,
|
||||
)
|
||||
assert result is not None
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue