This commit is contained in:
Cristhian Zanforlin Lousa 2023-09-26 11:21:38 -03:00
commit f56dcc03b0
147 changed files with 14203 additions and 1842 deletions

View file

@ -1,7 +1,7 @@
from importlib import metadata
# Deactivate cache manager for now
# from langflow.services.cache import cache_manager
# from langflow.services.cache import cache_service
from langflow.processing.process import load_flow_from_json
from langflow.interface.custom.custom_component import CustomComponent
@ -12,4 +12,4 @@ except metadata.PackageNotFoundError:
__version__ = ""
del metadata # optional, avoids polluting the results of dir(__package__)
__all__ = ["load_flow_from_json", "cache_manager", "CustomComponent"]
__all__ = ["load_flow_from_json", "cache_service", "CustomComponent"]

View file

@ -2,9 +2,8 @@ import sys
import time
import httpx
from langflow.services.database.utils import session_getter
from langflow.services.utils import initialize_services
from langflow.services.getters import get_db_manager, get_settings_manager
from langflow.services.utils import initialize_settings_manager
from langflow.services.manager import initialize_services, initialize_settings_service
from langflow.services.getters import get_db_service, get_settings_service
from multiprocess import Process, cpu_count # type: ignore
import platform
@ -64,20 +63,20 @@ def update_settings(
"""Update the settings from a config file."""
# Check for database_url in the environment variables
initialize_settings_manager()
settings_manager = get_settings_manager()
initialize_settings_service()
settings_service = get_settings_service()
if config:
logger.debug(f"Loading settings from {config}")
settings_manager.settings.update_from_yaml(config, dev=dev)
settings_service.settings.update_from_yaml(config, dev=dev)
if remove_api_keys:
logger.debug(f"Setting remove_api_keys to {remove_api_keys}")
settings_manager.settings.update_settings(REMOVE_API_KEYS=remove_api_keys)
settings_service.settings.update_settings(REMOVE_API_KEYS=remove_api_keys)
if cache:
logger.debug(f"Setting cache to {cache}")
settings_manager.settings.update_settings(CACHE=cache)
settings_service.settings.update_settings(CACHE=cache)
if components_path:
logger.debug(f"Adding component path {components_path}")
settings_manager.settings.update_settings(COMPONENTS_PATH=components_path)
settings_service.settings.update_settings(COMPONENTS_PATH=components_path)
def serve_on_jcloud():
@ -353,8 +352,8 @@ def superuser(
),
):
initialize_services()
db_manager = get_db_manager()
with session_getter(db_manager) as session:
db_service = get_db_service()
with session_getter(db_service) as session:
from langflow.services.auth.utils import create_super_user
if create_super_user(db=session, username=username, password=password):
@ -375,10 +374,10 @@ def superuser(
@app.command()
def migration(test: bool = typer.Option(False, help="Run migrations in test mode.")):
initialize_services()
db_manager = get_db_manager()
db_service = get_db_service()
if not test:
db_manager.run_migrations()
results = db_manager.run_migrations_test()
db_service.run_migrations()
results = db_service.run_migrations_test()
display_results(results)

View file

@ -59,33 +59,6 @@ def build_input_keys_response(langchain_object, artifacts):
return input_keys_response
def merge_nested_dicts(dict1, dict2):
for key, value in dict2.items():
if isinstance(value, dict) and isinstance(dict1.get(key), dict):
dict1[key] = merge_nested_dicts(dict1[key], value)
else:
dict1[key] = value
return dict1
def merge_nested_dicts_with_renaming(dict1, dict2):
for key, value in dict2.items():
if (
key in dict1
and isinstance(value, dict)
and isinstance(dict1.get(key), dict)
):
for sub_key, sub_value in value.items():
if sub_key in dict1[key]:
new_key = get_new_key(dict1[key], sub_key)
dict1[key][new_key] = sub_value
else:
dict1[key][sub_key] = sub_value
else:
dict1[key] = value
return dict1
def get_new_key(dictionary, original_key):
counter = 1
new_key = original_key + " (" + str(counter) + ")"

View file

@ -14,16 +14,14 @@ from langflow.api.v1.schemas import BuildStatus, BuiltResponse, InitResponse, St
from langflow.graph.graph.base import Graph
from langflow.services.auth.utils import get_current_active_user, get_current_user
from loguru import logger
from langflow.services.getters import get_chat_manager, get_session
from cachetools import LRUCache
from langflow.services.getters import get_chat_service, get_session, get_cache_service
from sqlmodel import Session
from langflow.services.chat.manager import ChatManager
from langflow.services.chat.manager import ChatService
from langflow.services.cache.manager import BaseCacheService
router = APIRouter(tags=["Chat"])
flow_data_store: LRUCache = LRUCache(maxsize=10)
@router.websocket("/chat/{client_id}")
async def chat(
@ -31,7 +29,7 @@ async def chat(
websocket: WebSocket,
token: str = Query(...),
db: Session = Depends(get_session),
chat_manager: "ChatManager" = Depends(get_chat_manager),
chat_service: "ChatService" = Depends(get_chat_service),
):
"""Websocket endpoint for chat."""
try:
@ -46,15 +44,15 @@ async def chat(
code=status.WS_1008_POLICY_VIOLATION, reason="Unauthorized"
)
if client_id in chat_manager.in_memory_cache:
await chat_manager.handle_websocket(client_id, websocket)
if client_id in chat_service.cache_service:
await chat_service.handle_websocket(client_id, websocket)
else:
# We accept the connection but close it immediately
# if the flow is not built yet
message = "Please, build the flow before sending messages"
await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=message)
except WebSocketException as exc:
logger.error(f"Websocket error: {exc}")
logger.error(f"Websocket exrror: {exc}")
await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=str(exc))
except Exception as exc:
logger.error(f"Error in chat websocket: {exc}")
@ -72,26 +70,26 @@ async def init_build(
graph_data: dict,
flow_id: str,
current_user=Depends(get_current_active_user),
chat_manager: "ChatManager" = Depends(get_chat_manager),
chat_service: "ChatService" = Depends(get_chat_service),
cache_service: "BaseCacheService" = Depends(get_cache_service),
):
"""Initialize the build by storing graph data and returning a unique session ID."""
try:
if flow_id is None:
raise ValueError("No ID provided")
# Check if already building
if (
flow_id in flow_data_store
and flow_data_store[flow_id]["status"] == BuildStatus.IN_PROGRESS
flow_id in cache_service
and isinstance(cache_service[flow_id], dict)
and cache_service[flow_id].get("status") == BuildStatus.IN_PROGRESS
):
return InitResponse(flowId=flow_id)
# Delete from cache if already exists
if flow_id in chat_manager.in_memory_cache:
with chat_manager.in_memory_cache._lock:
chat_manager.in_memory_cache.delete(flow_id)
logger.debug(f"Deleted flow {flow_id} from cache")
flow_data_store[flow_id] = {
if flow_id in chat_service.cache_service:
chat_service.cache_service.delete(flow_id)
logger.debug(f"Deleted flow {flow_id} from cache")
cache_service[flow_id] = {
"graph_data": graph_data,
"status": BuildStatus.STARTED,
"user_id": current_user.id,
@ -104,12 +102,14 @@ async def init_build(
@router.get("/build/{flow_id}/status", response_model=BuiltResponse)
async def build_status(flow_id: str):
"""Check the flow_id is in the flow_data_store."""
async def build_status(
flow_id: str, cache_service: "BaseCacheService" = Depends(get_cache_service)
):
"""Check the flow_id is in the cache_service."""
try:
built = (
flow_id in flow_data_store
and flow_data_store[flow_id]["status"] == BuildStatus.SUCCESS
flow_id in cache_service
and cache_service[flow_id]["status"] == BuildStatus.SUCCESS
)
return BuiltResponse(
@ -123,7 +123,9 @@ async def build_status(flow_id: str):
@router.get("/build/stream/{flow_id}", response_class=StreamingResponse)
async def stream_build(
flow_id: str, chat_manager: "ChatManager" = Depends(get_chat_manager)
flow_id: str,
chat_service: "ChatService" = Depends(get_chat_service),
cache_service: "BaseCacheService" = Depends(get_cache_service),
):
"""Stream the build process based on stored flow data."""
@ -131,18 +133,18 @@ async def stream_build(
final_response = {"end_of_stream": True}
artifacts = {}
try:
if flow_id not in flow_data_store:
if flow_id not in cache_service:
error_message = "Invalid session ID"
yield str(StreamData(event="error", data={"error": error_message}))
return
if flow_data_store[flow_id].get("status") == BuildStatus.IN_PROGRESS:
if cache_service[flow_id].get("status") == BuildStatus.IN_PROGRESS:
error_message = "Already building"
yield str(StreamData(event="error", data={"error": error_message}))
return
graph_data = flow_data_store[flow_id].get("graph_data")
user_id = flow_data_store[flow_id]["user_id"]
graph_data = cache_service[flow_id].get("graph_data")
cache_service[flow_id]["user_id"]
if not graph_data:
error_message = "No data provided"
@ -155,7 +157,7 @@ async def stream_build(
graph = Graph.from_payload(graph_data)
number_of_nodes = len(graph.nodes)
flow_data_store[flow_id]["status"] = BuildStatus.IN_PROGRESS
cache_service[flow_id]["status"] = BuildStatus.IN_PROGRESS
for i, vertex in enumerate(graph.generator_build(), 1):
try:
@ -163,7 +165,10 @@ async def stream_build(
"log": f"Building node {vertex.vertex_type}",
}
yield str(StreamData(event="log", data=log_dict))
vertex.build(user_id)
if vertex.is_task:
vertex = try_running_celery_task(vertex)
else:
vertex.build()
params = vertex._built_object_repr()
valid = True
logger.debug(f"Building node {str(vertex.vertex_type)}")
@ -179,7 +184,7 @@ async def stream_build(
logger.exception(exc)
params = str(exc)
valid = False
flow_data_store[flow_id]["status"] = BuildStatus.FAILURE
cache_service[flow_id]["status"] = BuildStatus.FAILURE
response = {
"valid": valid,
@ -203,14 +208,14 @@ async def stream_build(
"handle_keys": [],
}
yield str(StreamData(event="message", data=input_keys_response))
chat_manager.set_cache(flow_id, langchain_object)
chat_service.set_cache(flow_id, langchain_object)
# We need to reset the chat history
chat_manager.chat_history.empty_history(flow_id)
flow_data_store[flow_id]["status"] = BuildStatus.SUCCESS
chat_service.chat_history.empty_history(flow_id)
cache_service[flow_id]["status"] = BuildStatus.SUCCESS
except Exception as exc:
logger.exception(exc)
logger.error("Error while building the flow: %s", exc)
flow_data_store[flow_id]["status"] = BuildStatus.FAILURE
cache_service[flow_id]["status"] = BuildStatus.FAILURE
yield str(StreamData(event="error", data={"error": str(exc)}))
finally:
yield str(StreamData(event="message", data=final_response))
@ -220,3 +225,20 @@ async def stream_build(
except Exception as exc:
logger.error(f"Error streaming build: {exc}")
raise HTTPException(status_code=500, detail=str(exc))
def try_running_celery_task(vertex):
# Try running the task in celery
# and set the task_id to the local vertex
# if it fails, run the task locally
try:
from langflow.worker import build_vertex
task = build_vertex.delay(vertex)
vertex.task_id = task.id
except Exception as exc:
logger.exception(exc)
logger.error("Error running task in celery, running locally")
vertex.task_id = None
vertex.build()
return vertex

View file

@ -1,12 +1,17 @@
from http import HTTPStatus
from typing import Annotated, Any, Optional, Union
from typing import Annotated, Optional, Union
from langflow.services.auth.utils import api_key_security, get_current_active_user
from langflow.services.cache.utils import save_uploaded_file
from langflow.services.database.models.flow import Flow
from langflow.processing.process import process_graph_cached, process_tweaks
from langflow.services.database.models.user.user import User
from langflow.services.getters import get_settings_manager
from langflow.services.getters import (
get_session_service,
get_settings_service,
get_task_service,
)
from loguru import logger
from fastapi import APIRouter, Depends, HTTPException, UploadFile, Body, status
import sqlalchemy as sa
@ -15,66 +20,42 @@ from langflow.interface.custom.custom_component import CustomComponent
from langflow.api.v1.schemas import (
ProcessResponse,
TaskStatusResponse,
UploadFileResponse,
CustomComponentCode,
)
from langflow.api.utils import merge_nested_dicts_with_renaming
from langflow.interface.types import (
build_langchain_types_dict,
build_langchain_template_custom_component,
build_langchain_custom_component_list_from_path,
)
from langflow.services.getters import get_session
try:
from langflow.worker import process_graph_cached_task
except ImportError:
def process_graph_cached_task(*args, **kwargs):
raise NotImplementedError("Celery is not installed")
from sqlmodel import Session
from langflow.services.task.manager import TaskService
# build router
router = APIRouter(tags=["Base"])
@router.get("/all", dependencies=[Depends(get_current_active_user)])
def get_all(
settings_manager=Depends(get_settings_manager),
settings_service=Depends(get_settings_service),
):
from langflow.interface.types import get_all_types_dict
logger.debug("Building langchain types dict")
native_components = build_langchain_types_dict()
# custom_components is a list of dicts
# need to merge all the keys into one dict
custom_components_from_file: dict[str, Any] = {}
if settings_manager.settings.COMPONENTS_PATH:
logger.info(
f"Building custom components from {settings_manager.settings.COMPONENTS_PATH}"
)
custom_component_dicts = []
processed_paths = []
for path in settings_manager.settings.COMPONENTS_PATH:
if str(path) in processed_paths:
continue
custom_component_dict = build_langchain_custom_component_list_from_path(
str(path)
)
custom_component_dicts.append(custom_component_dict)
processed_paths.append(str(path))
logger.info(f"Loading {len(custom_component_dicts)} category(ies)")
for custom_component_dict in custom_component_dicts:
# custom_component_dict is a dict of dicts
if not custom_component_dict:
continue
category = list(custom_component_dict.keys())[0]
logger.info(
f"Loading {len(custom_component_dict[category])} component(s) from category {category}"
)
custom_components_from_file = merge_nested_dicts_with_renaming(
custom_components_from_file, custom_component_dict
)
return merge_nested_dicts_with_renaming(
native_components, custom_components_from_file
)
try:
return get_all_types_dict(settings_service)
except Exception as exc:
raise HTTPException(status_code=500, detail=str(exc)) from exc
# For backwards compatibility we will keep the old endpoint
@ -94,7 +75,9 @@ async def process_flow(
tweaks: Optional[dict] = None,
clear_cache: Annotated[bool, Body(embed=True)] = False, # noqa: F821
session_id: Annotated[Union[None, str], Body(embed=True)] = None, # noqa: F821
task_service: "TaskService" = Depends(get_task_service),
api_key_user: User = Depends(api_key_security),
sync: Annotated[bool, Body(embed=True)] = True, # noqa: F821
):
"""
Endpoint to process an input with a given flow_id.
@ -125,10 +108,49 @@ async def process_flow(
graph_data = process_tweaks(graph_data, tweaks)
except Exception as exc:
logger.error(f"Error processing tweaks: {exc}")
response, session_id = process_graph_cached(
graph_data, inputs, clear_cache, session_id
if sync:
task_id, result = await task_service.launch_and_await_task(
process_graph_cached_task
if task_service.use_celery
else process_graph_cached,
graph_data,
inputs,
clear_cache,
session_id,
)
if isinstance(result, dict) and "result" in result:
task_result = result["result"]
session_id = result["session_id"]
elif hasattr(result, "result") and hasattr(result, "session_id"):
task_result = result.result
session_id = result.session_id
else:
logger.warning(
"This is an experimental feature and may not work as expected."
"Please report any issues to our GitHub repository."
)
if session_id is None:
# Generate a session ID
session_id = get_session_service().generate_key(
session_id=session_id, data_graph=graph_data
)
task_id, task = await task_service.launch_task(
process_graph_cached_task
if task_service.use_celery
else process_graph_cached,
graph_data,
inputs,
clear_cache,
session_id,
)
task_result = task.status
return ProcessResponse(
result=task_result,
id=task_id,
session_id=session_id,
backend=str(type(task_service.backend)),
)
return ProcessResponse(result=response, session_id=session_id)
except sa.exc.StatementError as exc:
# StatementError('(builtins.ValueError) badly formed hexadecimal UUID string')
if "badly formed hexadecimal UUID string" in str(exc):
@ -151,6 +173,23 @@ async def process_flow(
raise HTTPException(status_code=500, detail=str(e)) from e
@router.get("/task/{task_id}/status", response_model=TaskStatusResponse)
async def get_task_status(task_id: str):
task_service = get_task_service()
task = task_service.get_task(task_id)
result = None
if task.ready():
result = task.result
if isinstance(result, dict) and "result" in result:
result = result["result"]
elif hasattr(result, "result"):
result = result.result
if task is None:
raise HTTPException(status_code=404, detail="Task not found")
return TaskStatusResponse(status=task.status, result=result)
@router.post(
"/upload/{flow_id}",
response_model=UploadFileResponse,
@ -182,6 +221,10 @@ def get_version():
async def custom_component(
raw_code: CustomComponentCode,
):
from langflow.interface.types import (
build_langchain_template_custom_component,
)
extractor = CustomComponent(code=raw_code.code)
extractor.is_check_valid()

View file

@ -13,7 +13,7 @@ from langflow.services.database.models.flow import (
)
from langflow.services.database.models.user.user import User
from langflow.services.getters import get_session
from langflow.services.getters import get_settings_manager
from langflow.services.getters import get_settings_service
import orjson
from sqlmodel import Session
from fastapi import APIRouter, Depends, HTTPException
@ -83,7 +83,7 @@ def update_flow(
flow_id: UUID,
flow: FlowUpdate,
current_user: User = Depends(get_current_active_user),
settings_manager=Depends(get_settings_manager),
settings_service=Depends(get_settings_service),
):
"""Update a flow."""
@ -91,7 +91,7 @@ def update_flow(
if not db_flow:
raise HTTPException(status_code=404, detail="Flow not found")
flow_data = flow.dict(exclude_unset=True)
if settings_manager.settings.REMOVE_API_KEYS:
if settings_service.settings.REMOVE_API_KEYS:
flow_data = remove_api_keys(flow_data)
for key, value in flow_data.items():
if value is not None:

View file

@ -12,7 +12,7 @@ from langflow.services.auth.utils import (
get_current_active_user,
)
from langflow.services.getters import get_settings_manager
from langflow.services.getters import get_settings_service
router = APIRouter(tags=["Login"])
@ -23,7 +23,17 @@ async def login_to_get_access_token(
db: Session = Depends(get_session),
# _: Session = Depends(get_current_active_user)
):
if user := authenticate_user(form_data.username, form_data.password, db):
try:
user = authenticate_user(form_data.username, form_data.password, db)
except Exception as exc:
if isinstance(exc, HTTPException):
raise exc
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(exc),
) from exc
if user:
return create_user_tokens(user_id=user.id, db=db, update_last_login=True)
else:
raise HTTPException(
@ -35,9 +45,9 @@ async def login_to_get_access_token(
@router.get("/auto_login")
async def auto_login(
db: Session = Depends(get_session), settings_manager=Depends(get_settings_manager)
db: Session = Depends(get_session), settings_service=Depends(get_settings_service)
):
if settings_manager.auth_settings.AUTO_LOGIN:
if settings_service.auth_settings.AUTO_LOGIN:
return create_user_longterm_token(db)
raise HTTPException(

View file

@ -50,8 +50,20 @@ class UpdateTemplateRequest(BaseModel):
class ProcessResponse(BaseModel):
"""Process response schema."""
result: dict
result: Any
id: Optional[str] = None
session_id: Optional[str] = None
backend: Optional[str] = None
# TaskStatusResponse(
# status=task.status, result=task.result if task.ready() else None
# )
class TaskStatusResponse(BaseModel):
"""Task status response schema."""
status: str
result: Optional[Any] = None
class ChatMessage(BaseModel):

View file

@ -13,7 +13,7 @@ from sqlalchemy.exc import IntegrityError
from sqlmodel import Session, select
from fastapi import APIRouter, Depends, HTTPException
from langflow.services.getters import get_session
from langflow.services.getters import get_session, get_settings_service
from langflow.services.auth.utils import (
get_current_active_superuser,
get_current_active_user,
@ -32,6 +32,7 @@ router = APIRouter(tags=["Users"], prefix="/users")
def add_user(
user: UserCreate,
session: Session = Depends(get_session),
settings_service=Depends(get_settings_service),
) -> User:
"""
Add a new user to the database.
@ -39,7 +40,7 @@ def add_user(
new_user = User.from_orm(user)
try:
new_user.password = get_password_hash(user.password)
new_user.is_active = settings_service.auth_settings.NEW_USER_IS_ACTIVE
session.add(new_user)
session.commit()
session.refresh(new_user)
@ -66,7 +67,7 @@ def read_current_user(
def read_all_users(
skip: int = 0,
limit: int = 10,
current_user: Session = Depends(get_current_active_superuser),
_: Session = Depends(get_current_active_superuser),
session: Session = Depends(get_session),
) -> UsersResponse:
"""
@ -99,9 +100,11 @@ def patch_user(
status_code=403, detail="You don't have the permission to update this user"
)
if user_update.password:
raise HTTPException(
status_code=400, detail="You can't change your password here"
)
if not user.is_superuser:
raise HTTPException(
status_code=400, detail="You can't change your password here"
)
user_update.password = get_password_hash(user_update.password)
if user_db := get_user_by_id(session, user_id):
return update_user(user_db, user_update, session)
@ -164,31 +167,3 @@ def delete_user(
session.commit()
return {"detail": "User deleted"}
# TODO: REMOVE - Just for testing purposes
@router.post("/super_user", response_model=User)
def add_super_user_for_testing_purposes_delete_me_before_merge_into_dev(
session: Session = Depends(get_session),
) -> User:
"""
Add a superuser for testing purposes.
(This should be removed in production)
"""
new_user = User(
username="superuser",
password=get_password_hash("12345"),
is_active=True,
is_superuser=True,
last_login_at=None,
)
try:
session.add(new_user)
session.commit()
session.refresh(new_user)
except IntegrityError as e:
session.rollback()
raise HTTPException(status_code=400, detail="User exists") from e
return new_user

View file

View file

@ -0,0 +1,11 @@
from celery import Celery # type: ignore
def make_celery(app_name: str, config: str) -> Celery:
celery_app = Celery(app_name)
celery_app.config_from_object(config)
celery_app.conf.task_routes = {"langflow.worker.tasks.*": {"queue": "langflow"}}
return celery_app
celery_app = make_celery("langflow", "langflow.core.celeryconfig")

View file

@ -0,0 +1,14 @@
# celeryconfig.py
import os
langflow_redis_host = os.environ.get("LANGFLOW_REDIS_HOST")
langflow_redis_port = os.environ.get("LANGFLOW_REDIS_PORT")
if "BROKER_URL" in os.environ and "RESULT_BACKEND" in os.environ:
# RabbitMQ
broker_url = os.environ.get("BROKER_URL", "amqp://localhost")
result_backend = os.environ.get("RESULT_BACKEND", "redis://localhost:6379/0")
elif langflow_redis_host and langflow_redis_port:
broker_url = f"redis://{langflow_redis_host}:{langflow_redis_port}/0"
result_backend = f"redis://{langflow_redis_host}:{langflow_redis_port}/0"
# tasks should be json or pickle
accept_content = ["json", "pickle"]

View file

@ -17,6 +17,17 @@ class Edge:
self.validate_edge()
def __setstate__(self, state):
self.source = state["source"]
self.target = state["target"]
self.target_param = state["target_param"]
self.source_handle = state["source_handle"]
self.target_handle = state["target_handle"]
def reset(self) -> None:
self.source._build_params()
self.target._build_params()
def validate_edge(self) -> None:
# Validate that the outputs of the source node are valid inputs
# for the target node

View file

@ -26,6 +26,12 @@ class Graph:
self._edges = edges
self._build_graph()
def __setstate__(self, state):
self.__dict__.update(state)
for edge in self.edges:
edge.reset()
edge.validate_edge()
@classmethod
def from_payload(cls, payload: Dict) -> "Graph":
"""
@ -48,6 +54,11 @@ class Graph:
f"Invalid payload. Expected keys 'nodes' and 'edges'. Found {list(payload.keys())}"
) from exc
def __eq__(self, other: object) -> bool:
if not isinstance(other, Graph):
return False
return self.__repr__() == other.__repr__()
def _build_graph(self) -> None:
"""Builds the graph from the nodes and edges."""
self.nodes = self._build_vertices()
@ -147,7 +158,7 @@ class Graph:
def generator_build(self) -> Generator[Vertex, None, None]:
"""Builds each vertex in the graph and yields it."""
sorted_vertices = self.topological_sort()
logger.debug("Sorted vertices: %s", sorted_vertices)
logger.debug("There are %s vertices in the graph", len(sorted_vertices))
yield from sorted_vertices
def get_node_neighbors(self, node: Vertex) -> Dict[Vertex, int]:

View file

@ -1,5 +1,7 @@
import ast
import pickle
from langflow.graph.utils import UnbuiltObject
from langflow.graph.vertex.utils import is_basic_type
from langflow.interface.initialize import loading
from langflow.interface.listing import lazy_load_dict
from langflow.utils.constants import DIRECT_TYPES
@ -12,12 +14,19 @@ import types
from typing import Any, Dict, List, Optional
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from langflow.graph.edge.base import Edge
class Vertex:
def __init__(self, data: Dict, base_type: Optional[str] = None) -> None:
def __init__(
self,
data: Dict,
base_type: Optional[str] = None,
is_task: bool = False,
params: Optional[Dict] = None,
) -> None:
self.id: str = data["id"]
self._data = data
self.edges: List["Edge"] = []
@ -26,6 +35,59 @@ class Vertex:
self._built_object = UnbuiltObject()
self._built = False
self.artifacts: Dict[str, Any] = {}
self.task_id: Optional[str] = None
self.is_task = is_task
self.params = params or {}
def reset_params(self):
for edge in self.edges:
if edge.source != self:
target_param = edge.target_param
if target_param in ["document", "texts"]:
# this means they got data and have already ingested it
# so we continue after removing the param
self.params.pop(target_param, None)
continue
if target_param in self.params and not is_basic_type(
self.params[target_param]
):
# edge.source.params = {}
edge.source._build_params()
edge.source._built_object = UnbuiltObject()
edge.source._built = False
self.params[target_param] = edge.source
def __getstate__(self):
state_dict = self.__dict__.copy()
try:
# try pickling the built object
# if it fails, then we need to delete it
# and build it again
pickle.dumps(state_dict["_built_object"])
except Exception:
self.reset_params()
del state_dict["_built_object"]
del state_dict["_built"]
return state_dict
def __setstate__(self, state):
self._data = state["_data"]
self.params = state["params"]
self.base_type = state["base_type"]
self.is_task = state["is_task"]
self.edges = state["edges"]
self.id = state["id"]
self._parse_data()
if "_built_object" in state:
self._built_object = state["_built_object"]
self._built = state["_built"]
else:
self._built_object = UnbuiltObject()
self._built = False
self.artifacts: Dict[str, Any] = {}
self.task_id: Optional[str] = None
def _parse_data(self) -> None:
self.data = self._data["data"]
@ -68,6 +130,13 @@ class Vertex:
self.base_type = base_type
break
def get_task(self):
# using the task_id, get the task from celery
# and return it
from celery.result import AsyncResult # type: ignore
return AsyncResult(self.task_id)
def _build_params(self):
# sourcery skip: merge-list-append, remove-redundant-if
# Some params are required, some are optional
@ -89,9 +158,11 @@ class Vertex:
for key, value in self.data["node"]["template"].items()
if isinstance(value, dict)
}
params = {}
params = self.params.copy() if self.params else {}
for edge in self.edges:
if not hasattr(edge, "target_param"):
continue
param_key = edge.target_param
if param_key in template_dict:
if template_dict[param_key]["list"]:
@ -102,6 +173,8 @@ class Vertex:
params[param_key] = edge.source
for key, value in template_dict.items():
if key in params:
continue
# Skip _type and any value that has show == False and is not code
# If we don't want to show code but we want to use it
if key == "_type" or (not value.get("show") and key != "code"):
@ -144,6 +217,7 @@ class Vertex:
else:
params.pop(key, None)
# Add _type to params
self._raw_params = params
self.params = params
def _build(self, user_id=None):
@ -151,13 +225,13 @@ class Vertex:
Initiate the build process.
"""
logger.debug(f"Building {self.vertex_type}")
self._build_each_node_in_params_dict()
self._build_each_node_in_params_dict(user_id)
self._get_and_instantiate_class(user_id)
self._validate_built_object()
self._built = True
def _build_each_node_in_params_dict(self):
def _build_each_node_in_params_dict(self, user_id=None):
"""
Iterates over each node in the params dictionary and builds it.
"""
@ -166,9 +240,9 @@ class Vertex:
if value == self:
del self.params[key]
continue
self._build_node_and_update_params(key, value)
self._build_node_and_update_params(key, value, user_id)
elif isinstance(value, list) and self._is_list_of_nodes(value):
self._build_list_of_nodes_and_update_params(key, value)
self._build_list_of_nodes_and_update_params(key, value, user_id)
def _is_node(self, value):
"""
@ -182,11 +256,31 @@ class Vertex:
"""
return all(self._is_node(node) for node in value)
def get_result(self, user_id=None, timeout=None) -> Any:
# Check if the Vertex was built already
if self._built:
return self._built_object
if self.is_task and self.task_id is not None:
task = self.get_task()
result = task.get(timeout=timeout)
if result is not None: # If result is ready
self._update_built_object_and_artifacts(result)
return self._built_object
else:
# Handle the case when the result is not ready (retry, throw exception, etc.)
pass
# If there's no task_id, build the vertex locally
self.build(user_id)
return self._built_object
def _build_node_and_update_params(self, key, node, user_id=None):
"""
Builds a given node and updates the params dictionary accordingly.
"""
result = node.build(user_id)
result = node.get_result(user_id)
self._handle_func(key, result)
if isinstance(result, list):
self._extend_params_list_with_result(key, result)
@ -200,7 +294,7 @@ class Vertex:
"""
self.params[key] = []
for node in nodes:
built = node.build(user_id)
built = node.get_result(user_id)
if isinstance(built, list):
if key not in self.params:
self.params[key] = []
@ -245,6 +339,7 @@ class Vertex:
)
self._update_built_object_and_artifacts(result)
except Exception as exc:
logger.exception(exc)
raise ValueError(
f"Error building node {self.vertex_type}: {str(exc)}"
) from exc
@ -285,7 +380,10 @@ class Vertex:
return f"Vertex(id={self.id}, data={self.data})"
def __eq__(self, __o: object) -> bool:
return self.id == __o.id if isinstance(__o, Vertex) else False
try:
return self.id == __o.id if isinstance(__o, Vertex) else False
except AttributeError:
return False
def __hash__(self) -> int:
return id(self)

View file

@ -7,14 +7,27 @@ from langflow.interface.utils import extract_input_variables_from_prompt
class AgentVertex(Vertex):
def __init__(self, data: Dict):
super().__init__(data, base_type="agents")
def __init__(self, data: Dict, params: Optional[Dict] = None):
super().__init__(data, base_type="agents", params=params)
self.tools: List[Union[ToolkitVertex, ToolVertex]] = []
self.chains: List[ChainVertex] = []
def __getstate__(self):
state = super().__getstate__()
state["tools"] = self.tools
state["chains"] = self.chains
return state
def __setstate__(self, state):
self.tools = state["tools"]
self.chains = state["chains"]
super().__setstate__(state)
def _set_tools_and_chains(self) -> None:
for edge in self.edges:
if not hasattr(edge, "source"):
continue
source_node = edge.source
if isinstance(source_node, (ToolVertex, ToolkitVertex)):
self.tools.append(source_node)
@ -38,16 +51,16 @@ class AgentVertex(Vertex):
class ToolVertex(Vertex):
def __init__(self, data: Dict):
super().__init__(data, base_type="tools")
def __init__(self, data: Dict, params: Optional[Dict] = None):
super().__init__(data, base_type="tools", params=params)
class LLMVertex(Vertex):
built_node_type = None
class_built_object = None
def __init__(self, data: Dict):
super().__init__(data, base_type="llms")
def __init__(self, data: Dict, params: Optional[Dict] = None):
super().__init__(data, base_type="llms", params=params)
def build(self, force: bool = False, user_id=None, *args, **kwargs) -> Any:
# LLM is different because some models might take up too much memory
@ -64,13 +77,13 @@ class LLMVertex(Vertex):
class ToolkitVertex(Vertex):
def __init__(self, data: Dict):
super().__init__(data, base_type="toolkits")
def __init__(self, data: Dict, params=None):
super().__init__(data, base_type="toolkits", params=params)
class FileToolVertex(ToolVertex):
def __init__(self, data: Dict):
super().__init__(data)
def __init__(self, data: Dict, params=None):
super().__init__(data, params=params)
class WrapperVertex(Vertex):
@ -86,17 +99,19 @@ class WrapperVertex(Vertex):
class DocumentLoaderVertex(Vertex):
def __init__(self, data: Dict):
super().__init__(data, base_type="documentloaders")
def __init__(self, data: Dict, params: Optional[Dict] = None):
super().__init__(data, base_type="documentloaders", params=params)
def _built_object_repr(self):
# This built_object is a list of documents. Maybe we should
# show how many documents are in the list?
if self._built_object:
avg_length = sum(len(doc.page_content) for doc in self._built_object) / len(
self._built_object
)
avg_length = sum(
len(doc.page_content)
for doc in self._built_object
if hasattr(doc, "page_content")
) / len(self._built_object)
return f"""{self.vertex_type}({len(self._built_object)} documents)
\nAvg. Document Length (characters): {int(avg_length)}
Documents: {self._built_object[:3]}..."""
@ -104,14 +119,51 @@ class DocumentLoaderVertex(Vertex):
class EmbeddingVertex(Vertex):
def __init__(self, data: Dict):
super().__init__(data, base_type="embeddings")
def __init__(self, data: Dict, params: Optional[Dict] = None):
super().__init__(data, base_type="embeddings", params=params)
class VectorStoreVertex(Vertex):
def __init__(self, data: Dict):
def __init__(self, data: Dict, params=None):
super().__init__(data, base_type="vectorstores")
self.params = params or {}
# VectorStores may contain databse connections
# so we need to define the __reduce__ method and the __setstate__ method
# to avoid pickling errors
def clean_edges_for_pickling(self):
# for each edge that has self as source
# we need to clear the _built_object of the target
# so that we don't try to pickle a database connection
for edge in self.edges:
if edge.source == self:
edge.target._built_object = None
edge.target._built = False
edge.target.params[edge.target_param] = self
def remove_docs_and_texts_from_params(self):
# remove documents and texts from params
# so that we don't try to pickle a database connection
self.params.pop("documents", None)
self.params.pop("texts", None)
def __getstate__(self):
# We want to save the params attribute
# and if "documents" or "texts" are in the params
# we want to remove them because they have already
# been processed.
params = self.params.copy()
params.pop("documents", None)
params.pop("texts", None)
self.clean_edges_for_pickling()
return super().__getstate__()
def __setstate__(self, state):
super().__setstate__(state)
self.remove_docs_and_texts_from_params()
class MemoryVertex(Vertex):
def __init__(self, data: Dict):
@ -124,8 +176,8 @@ class RetrieverVertex(Vertex):
class TextSplitterVertex(Vertex):
def __init__(self, data: Dict):
super().__init__(data, base_type="textsplitters")
def __init__(self, data: Dict, params: Optional[Dict] = None):
super().__init__(data, base_type="textsplitters", params=params)
def _built_object_repr(self):
# This built_object is a list of documents. Maybe we should
@ -211,7 +263,7 @@ class PromptVertex(Vertex):
self.params["input_variables"] = list(
set(self.params["input_variables"])
)
else:
elif isinstance(self.params, dict):
self.params.pop("input_variables", None)
self._build(user_id=user_id)
@ -258,8 +310,13 @@ class OutputParserVertex(Vertex):
class CustomComponentVertex(Vertex):
def __init__(self, data: Dict):
super().__init__(data, base_type="custom_components")
super().__init__(data, base_type="custom_components", is_task=True)
def _built_object_repr(self):
if self.task_id and self.is_task:
if task := self.get_task():
return str(task.info)
else:
return f"Task {self.task_id} is not running"
if self.artifacts and "repr" in self.artifacts:
return self.artifacts["repr"] or super()._built_object_repr()

View file

@ -0,0 +1,5 @@
from langflow.utils.constants import PYTHON_BASIC_TYPES
def is_basic_type(obj):
return type(obj) in PYTHON_BASIC_TYPES

View file

@ -5,7 +5,7 @@ from langchain.agents import types
from langflow.custom.customs import get_custom_nodes
from langflow.interface.agents.custom import CUSTOM_AGENTS
from langflow.interface.base import LangChainTypeCreator
from langflow.services.getters import get_settings_manager
from langflow.services.getters import get_settings_service
from langflow.template.frontend_node.agents import AgentFrontendNode
from loguru import logger
@ -54,7 +54,7 @@ class AgentCreator(LangChainTypeCreator):
# Now this is a generator
def to_list(self) -> List[str]:
names = []
settings_manager = get_settings_manager()
settings_service = get_settings_service()
for _, agent in self.type_to_loader_dict.items():
agent_name = (
agent.function_name()
@ -62,8 +62,8 @@ class AgentCreator(LangChainTypeCreator):
else agent.__name__
)
if (
agent_name in settings_manager.settings.AGENTS
or settings_manager.settings.DEV
agent_name in settings_service.settings.AGENTS
or settings_service.settings.DEV
):
names.append(agent_name)
return names

View file

@ -2,7 +2,7 @@ from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Type, Union
from langchain.chains.base import Chain
from langchain.agents import AgentExecutor
from langflow.services.getters import get_settings_manager
from langflow.services.getters import get_settings_service
from pydantic import BaseModel
from langflow.template.field.base import TemplateField
@ -27,11 +27,11 @@ class LangChainTypeCreator(BaseModel, ABC):
@property
def docs_map(self) -> Dict[str, str]:
"""A dict with the name of the component as key and the documentation link as value."""
settings_manager = get_settings_manager()
settings_service = get_settings_service()
if self.name_docs_dict is None:
try:
type_settings = getattr(
settings_manager.settings, self.type_name.upper()
settings_service.settings, self.type_name.upper()
)
self.name_docs_dict = {
name: value_dict["documentation"]

View file

@ -3,7 +3,7 @@ from typing import Any, Dict, List, Optional, Type
from langflow.custom.customs import get_custom_nodes
from langflow.interface.base import LangChainTypeCreator
from langflow.interface.importing.utils import import_class
from langflow.services.getters import get_settings_manager
from langflow.services.getters import get_settings_service
from langflow.template.frontend_node.chains import ChainFrontendNode
from loguru import logger
@ -31,7 +31,7 @@ class ChainCreator(LangChainTypeCreator):
@property
def type_to_loader_dict(self) -> Dict:
if self.type_dict is None:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
self.type_dict: dict[str, Any] = {
chain_name: import_class(f"langchain.chains.{chain_name}")
for chain_name in chains.__all__
@ -45,8 +45,8 @@ class ChainCreator(LangChainTypeCreator):
self.type_dict = {
name: chain
for name, chain in self.type_dict.items()
if name in settings_manager.settings.CHAINS
or settings_manager.settings.DEV
if name in settings_service.settings.CHAINS
or settings_service.settings.DEV
}
return self.type_dict

View file

@ -4,7 +4,7 @@ from fastapi import HTTPException
from langflow.interface.custom.constants import CUSTOM_COMPONENT_SUPPORTED_TYPES
from langflow.interface.custom.component import Component
from langflow.interface.custom.directory_reader import DirectoryReader
from langflow.services.getters import get_db_manager
from langflow.services.getters import get_db_service
from langflow.interface.custom.utils import extract_inner_type
from langflow.utils import validate
@ -176,25 +176,25 @@ class CustomComponent(Component, extra=Extra.allow):
return validate.create_function(self.code, self.function_entrypoint_name)
def load_flow(self, flow_id: str, tweaks: Optional[dict] = None) -> Any:
from langflow.processing.process import build_sorted_vertices_with_caching
from langflow.processing.process import build_sorted_vertices
from langflow.processing.process import process_tweaks
db_manager = get_db_manager()
with session_getter(db_manager) as session:
db_service = get_db_service()
with session_getter(db_service) as session:
graph_data = flow.data if (flow := session.get(Flow, flow_id)) else None
if not graph_data:
raise ValueError(f"Flow {flow_id} not found")
if tweaks:
graph_data = process_tweaks(graph_data=graph_data, tweaks=tweaks)
return build_sorted_vertices_with_caching(graph_data)
return build_sorted_vertices(graph_data)
def list_flows(self, *, get_session: Optional[Callable] = None) -> List[Flow]:
if not self.user_id:
raise ValueError("Session is invalid")
try:
get_session = get_session or session_getter
db_manager = get_db_manager()
with get_session(db_manager) as session:
db_service = get_db_service()
with get_session(db_service) as session:
flows = session.query(Flow).filter(Flow.user_id == self.user_id).all()
return flows
except Exception as e:
@ -209,8 +209,8 @@ class CustomComponent(Component, extra=Extra.allow):
get_session: Optional[Callable] = None,
) -> Flow:
get_session = get_session or session_getter
db_manager = get_db_manager()
with get_session(db_manager) as session:
db_service = get_db_service()
with get_session(db_service) as session:
if flow_id:
flow = session.query(Flow).get(flow_id)
elif flow_name:

View file

@ -1,7 +1,7 @@
from typing import Dict, List, Optional, Type
from langflow.interface.base import LangChainTypeCreator
from langflow.services.getters import get_settings_manager
from langflow.services.getters import get_settings_service
from langflow.template.frontend_node.documentloaders import DocumentLoaderFrontNode
from langflow.interface.custom_lists import documentloaders_type_to_cls_dict
@ -31,12 +31,12 @@ class DocumentLoaderCreator(LangChainTypeCreator):
return None
def to_list(self) -> List[str]:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
return [
documentloader.__name__
for documentloader in self.type_to_loader_dict.values()
if documentloader.__name__ in settings_manager.settings.DOCUMENTLOADERS
or settings_manager.settings.DEV
if documentloader.__name__ in settings_service.settings.DOCUMENTLOADERS
or settings_service.settings.DEV
]

View file

@ -2,7 +2,7 @@ from typing import Dict, List, Optional, Type
from langflow.interface.base import LangChainTypeCreator
from langflow.interface.custom_lists import embedding_type_to_cls_dict
from langflow.services.getters import get_settings_manager
from langflow.services.getters import get_settings_service
from langflow.template.frontend_node.base import FrontendNode
from langflow.template.frontend_node.embeddings import EmbeddingFrontendNode
@ -33,12 +33,12 @@ class EmbeddingCreator(LangChainTypeCreator):
return None
def to_list(self) -> List[str]:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
return [
embedding.__name__
for embedding in self.type_to_loader_dict.values()
if embedding.__name__ in settings_manager.settings.EMBEDDINGS
or settings_manager.settings.DEV
if embedding.__name__ in settings_service.settings.EMBEDDINGS
or settings_service.settings.DEV
]

View file

@ -1,7 +1,7 @@
import json
import orjson
from typing import Any, Callable, Dict, Sequence, Type, TYPE_CHECKING
from langchain.schema import Document
from langchain.agents import agent as agent_module
from langchain.agents.agent import AgentExecutor
from langchain.agents.agent_toolkits.base import BaseToolkit
@ -40,12 +40,23 @@ if TYPE_CHECKING:
from langflow import CustomComponent
def build_vertex_in_params(params: Dict) -> Dict:
from langflow.graph.vertex.base import Vertex
# If any of the values in params is a Vertex, we will build it
return {
key: value.build() if isinstance(value, Vertex) else value
for key, value in params.items()
}
def instantiate_class(
node_type: str, base_type: str, params: Dict, user_id=None
) -> Any:
"""Instantiate class from module type and key, and params"""
params = convert_params_to_sets(params)
params = convert_kwargs(params)
if node_type in CUSTOM_NODES:
if custom_node := CUSTOM_NODES.get(node_type):
if hasattr(custom_node, "initialize"):
@ -289,6 +300,13 @@ def instantiate_embedding(node_type, class_object, params: Dict):
def instantiate_vectorstore(class_object: Type[VectorStore], params: Dict):
search_kwargs = params.pop("search_kwargs", {})
# clean up docs or texts to have only documents
if "texts" in params:
params["documents"] = params.pop("texts")
if "documents" in params:
params["documents"] = [
doc for doc in params["documents"] if isinstance(doc, Document)
]
if initializer := vecstore_initializer.get(class_object.__name__):
vecstore = initializer(class_object, params)
else:

View file

@ -8,7 +8,7 @@ from langchain.vectorstores import (
SupabaseVectorStore,
MongoDBAtlasVectorSearch,
)
from langchain.schema import Document
import os
import orjson
@ -201,11 +201,16 @@ def initialize_chroma(class_object: Type[Chroma], params: dict):
if "texts" in params:
params["documents"] = params.pop("texts")
for doc in params["documents"]:
if not isinstance(doc, Document):
# remove any non-Document objects from the list
params["documents"].remove(doc)
continue
if doc.metadata is None:
doc.metadata = {}
for key, value in doc.metadata.items():
if value is None:
doc.metadata[key] = ""
chromadb = class_object.from_documents(**params)
if persist:
chromadb.persist()

View file

@ -1,19 +1,4 @@
from langflow.interface.agents.base import agent_creator
from langflow.interface.chains.base import chain_creator
from langflow.interface.document_loaders.base import documentloader_creator
from langflow.interface.embeddings.base import embedding_creator
from langflow.interface.llms.base import llm_creator
from langflow.interface.memories.base import memory_creator
from langflow.interface.prompts.base import prompt_creator
from langflow.interface.text_splitters.base import textsplitter_creator
from langflow.interface.toolkits.base import toolkits_creator
from langflow.interface.tools.base import tool_creator
from langflow.interface.utilities.base import utility_creator
from langflow.interface.vector_store.base import vectorstore_creator
from langflow.interface.wrappers.base import wrapper_creator
from langflow.interface.output_parsers.base import output_parser_creator
from langflow.interface.retrievers.base import retriever_creator
from langflow.interface.custom.base import custom_component_creator
from langflow.services.getters import get_settings_service
from langflow.utils.lazy_load import LazyLoadDictBase
@ -33,24 +18,10 @@ class AllTypesDict(LazyLoadDictBase):
}
def get_type_dict(self):
return {
"agents": agent_creator.to_list(),
"prompts": prompt_creator.to_list(),
"llms": llm_creator.to_list(),
"tools": tool_creator.to_list(),
"chains": chain_creator.to_list(),
"memory": memory_creator.to_list(),
"toolkits": toolkits_creator.to_list(),
"wrappers": wrapper_creator.to_list(),
"documentLoaders": documentloader_creator.to_list(),
"vectorStore": vectorstore_creator.to_list(),
"embeddings": embedding_creator.to_list(),
"textSplitters": textsplitter_creator.to_list(),
"utilities": utility_creator.to_list(),
"outputParsers": output_parser_creator.to_list(),
"retrievers": retriever_creator.to_list(),
"custom_components": custom_component_creator.to_list(),
}
from langflow.interface.types import get_all_types_dict
settings_service = get_settings_service()
return get_all_types_dict(settings_service=settings_service)
lazy_load_dict = AllTypesDict()

View file

@ -2,7 +2,7 @@ from typing import Dict, List, Optional, Type
from langflow.interface.base import LangChainTypeCreator
from langflow.interface.custom_lists import llm_type_to_cls_dict
from langflow.services.getters import get_settings_manager
from langflow.services.getters import get_settings_service
from langflow.template.frontend_node.llms import LLMFrontendNode
from loguru import logger
@ -34,12 +34,12 @@ class LLMCreator(LangChainTypeCreator):
return None
def to_list(self) -> List[str]:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
return [
llm.__name__
for llm in self.type_to_loader_dict.values()
if llm.__name__ in settings_manager.settings.LLMS
or settings_manager.settings.DEV
if llm.__name__ in settings_service.settings.LLMS
or settings_service.settings.DEV
]

View file

@ -2,7 +2,7 @@ from typing import Dict, List, Optional, Type
from langflow.interface.base import LangChainTypeCreator
from langflow.interface.custom_lists import memory_type_to_cls_dict
from langflow.services.getters import get_settings_manager
from langflow.services.getters import get_settings_service
from langflow.template.frontend_node.base import FrontendNode
from langflow.template.frontend_node.memories import MemoryFrontendNode
@ -49,12 +49,12 @@ class MemoryCreator(LangChainTypeCreator):
return None
def to_list(self) -> List[str]:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
return [
memory.__name__
for memory in self.type_to_loader_dict.values()
if memory.__name__ in settings_manager.settings.MEMORIES
or settings_manager.settings.DEV
if memory.__name__ in settings_service.settings.MEMORIES
or settings_service.settings.DEV
]

View file

@ -4,7 +4,7 @@ from langchain import output_parsers
from langflow.interface.base import LangChainTypeCreator
from langflow.interface.importing.utils import import_class
from langflow.services.getters import get_settings_manager
from langflow.services.getters import get_settings_service
from langflow.template.frontend_node.output_parsers import OutputParserFrontendNode
from loguru import logger
@ -24,7 +24,7 @@ class OutputParserCreator(LangChainTypeCreator):
@property
def type_to_loader_dict(self) -> Dict:
if self.type_dict is None:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
self.type_dict = {
output_parser_name: import_class(
f"langchain.output_parsers.{output_parser_name}"
@ -35,8 +35,8 @@ class OutputParserCreator(LangChainTypeCreator):
self.type_dict = {
name: output_parser
for name, output_parser in self.type_dict.items()
if name in settings_manager.settings.OUTPUT_PARSERS
or settings_manager.settings.DEV
if name in settings_service.settings.OUTPUT_PARSERS
or settings_service.settings.DEV
}
return self.type_dict

View file

@ -5,7 +5,7 @@ from langchain import prompts
from langflow.custom.customs import get_custom_nodes
from langflow.interface.base import LangChainTypeCreator
from langflow.interface.importing.utils import import_class
from langflow.services.getters import get_settings_manager
from langflow.services.getters import get_settings_service
from langflow.template.frontend_node.prompts import PromptFrontendNode
from loguru import logger
@ -21,7 +21,7 @@ class PromptCreator(LangChainTypeCreator):
@property
def type_to_loader_dict(self) -> Dict:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
if self.type_dict is None:
self.type_dict = {
prompt_name: import_class(f"langchain.prompts.{prompt_name}")
@ -36,8 +36,8 @@ class PromptCreator(LangChainTypeCreator):
self.type_dict = {
name: prompt
for name, prompt in self.type_dict.items()
if name in settings_manager.settings.PROMPTS
or settings_manager.settings.DEV
if name in settings_service.settings.PROMPTS
or settings_service.settings.DEV
}
return self.type_dict

View file

@ -4,7 +4,7 @@ from langchain import retrievers
from langflow.interface.base import LangChainTypeCreator
from langflow.interface.importing.utils import import_class
from langflow.services.getters import get_settings_manager
from langflow.services.getters import get_settings_service
from langflow.template.frontend_node.retrievers import RetrieverFrontendNode
from loguru import logger
@ -49,12 +49,12 @@ class RetrieverCreator(LangChainTypeCreator):
return None
def to_list(self) -> List[str]:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
return [
retriever
for retriever in self.type_to_loader_dict.keys()
if retriever in settings_manager.settings.RETRIEVERS
or settings_manager.settings.DEV
if retriever in settings_service.settings.RETRIEVERS
or settings_service.settings.DEV
]

View file

@ -1,22 +1,9 @@
from typing import Any, Dict, Tuple
from langflow.services.cache.utils import memoize_dict
from typing import Dict, Tuple
from langflow.graph import Graph
from loguru import logger
@memoize_dict(maxsize=10)
def build_langchain_object_with_caching(data_graph):
"""
Build langchain object from data_graph.
"""
logger.debug("Building langchain object")
graph = Graph.from_payload(data_graph)
return graph.build()
@memoize_dict(maxsize=10)
def build_sorted_vertices_with_caching(data_graph) -> Tuple[Any, Dict]:
def build_sorted_vertices(data_graph) -> Tuple[Graph, Dict]:
"""
Build langchain object from data_graph.
"""
@ -29,7 +16,7 @@ def build_sorted_vertices_with_caching(data_graph) -> Tuple[Any, Dict]:
vertex.build()
if vertex.artifacts:
artifacts.update(vertex.artifacts)
return graph.build(), artifacts
return graph, artifacts
def build_langchain_object(data_graph):

View file

@ -1,7 +1,7 @@
from typing import Dict, List, Optional, Type
from langflow.interface.base import LangChainTypeCreator
from langflow.services.getters import get_settings_manager
from langflow.services.getters import get_settings_service
from langflow.template.frontend_node.textsplitters import TextSplittersFrontendNode
from langflow.interface.custom_lists import textsplitter_type_to_cls_dict
@ -31,12 +31,12 @@ class TextSplitterCreator(LangChainTypeCreator):
return None
def to_list(self) -> List[str]:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
return [
textsplitter.__name__
for textsplitter in self.type_to_loader_dict.values()
if textsplitter.__name__ in settings_manager.settings.TEXTSPLITTERS
or settings_manager.settings.DEV
if textsplitter.__name__ in settings_service.settings.TEXTSPLITTERS
or settings_service.settings.DEV
]

View file

@ -4,7 +4,7 @@ from langchain.agents import agent_toolkits
from langflow.interface.base import LangChainTypeCreator
from langflow.interface.importing.utils import import_class, import_module
from langflow.services.getters import get_settings_manager
from langflow.services.getters import get_settings_service
from loguru import logger
from langflow.utils.util import build_template_from_class
@ -30,7 +30,7 @@ class ToolkitCreator(LangChainTypeCreator):
@property
def type_to_loader_dict(self) -> Dict:
if self.type_dict is None:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
self.type_dict = {
toolkit_name: import_class(
f"langchain.agents.agent_toolkits.{toolkit_name}"
@ -38,7 +38,7 @@ class ToolkitCreator(LangChainTypeCreator):
# if toolkit_name is not lower case it is a class
for toolkit_name in agent_toolkits.__all__
if not toolkit_name.islower()
and toolkit_name in settings_manager.settings.TOOLKITS
and toolkit_name in settings_service.settings.TOOLKITS
}
return self.type_dict

View file

@ -15,7 +15,7 @@ from langflow.interface.tools.constants import (
OTHER_TOOLS,
)
from langflow.interface.tools.util import get_tool_params
from langflow.services.getters import get_settings_manager
from langflow.services.getters import get_settings_service
from langflow.template.field.base import TemplateField
from langflow.template.template.base import Template
@ -67,7 +67,7 @@ class ToolCreator(LangChainTypeCreator):
@property
def type_to_loader_dict(self) -> Dict:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
if self.tools_dict is None:
all_tools = {}
@ -77,8 +77,8 @@ class ToolCreator(LangChainTypeCreator):
tool_name = tool_params.get("name") or tool
if (
tool_name in settings_manager.settings.TOOLS
or settings_manager.settings.DEV
tool_name in settings_service.settings.TOOLS
or settings_service.settings.DEV
):
if tool_name == "JsonSpec":
tool_params["path"] = tool_params.pop("dict_") # type: ignore

View file

@ -4,7 +4,6 @@ import textwrap
from typing import Dict, Union
from langchain.agents.tools import Tool
from loguru import logger
def get_func_tool_params(func, **kwargs) -> Union[Dict, None]:

View file

@ -1,7 +1,7 @@
import ast
import contextlib
from typing import Any, List
from langflow.api.utils import merge_nested_dicts_with_renaming
from langflow.api.utils import get_new_key
from langflow.interface.agents.base import agent_creator
from langflow.interface.chains.base import chain_creator
from langflow.interface.custom.constants import CUSTOM_COMPONENT_SUPPORTED_TYPES
@ -433,6 +433,24 @@ def build_invalid_menu(invalid_components):
return invalid_menu
def merge_nested_dicts_with_renaming(dict1, dict2):
for key, value in dict2.items():
if (
key in dict1
and isinstance(value, dict)
and isinstance(dict1.get(key), dict)
):
for sub_key, sub_value in value.items():
if sub_key in dict1[key]:
new_key = get_new_key(dict1[key], sub_key)
dict1[key][new_key] = sub_value
else:
dict1[key][sub_key] = sub_value
else:
dict1[key] = value
return dict1
def build_langchain_custom_component_list_from_path(path: str):
"""Build a list of custom components for the langchain from a given path"""
file_list = load_files_from_path(path)
@ -446,3 +464,51 @@ def build_langchain_custom_component_list_from_path(path: str):
invalid_menu = build_invalid_menu(invalid_components)
return merge_nested_dicts_with_renaming(valid_menu, invalid_menu)
def get_all_types_dict(settings_service):
native_components = build_langchain_types_dict()
# custom_components is a list of dicts
# need to merge all the keys into one dict
custom_components_from_file: dict[str, Any] = {}
if settings_service.settings.COMPONENTS_PATH:
logger.info(
f"Building custom components from {settings_service.settings.COMPONENTS_PATH}"
)
custom_component_dicts = []
processed_paths = []
for path in settings_service.settings.COMPONENTS_PATH:
if str(path) in processed_paths:
continue
custom_component_dict = build_langchain_custom_component_list_from_path(
str(path)
)
custom_component_dicts.append(custom_component_dict)
processed_paths.append(str(path))
logger.info(f"Loading {len(custom_component_dicts)} category(ies)")
for custom_component_dict in custom_component_dicts:
# custom_component_dict is a dict of dicts
if not custom_component_dict:
continue
category = list(custom_component_dict.keys())[0]
logger.info(
f"Loading {len(custom_component_dict[category])} component(s) from category {category}"
)
custom_components_from_file = merge_nested_dicts_with_renaming(
custom_components_from_file, custom_component_dict
)
return merge_nested_dicts_with_renaming(
native_components, custom_components_from_file
)
def merge_nested_dicts(dict1, dict2):
for key, value in dict2.items():
if isinstance(value, dict) and isinstance(dict1.get(key), dict):
dict1[key] = merge_nested_dicts(dict1[key], value)
else:
dict1[key] = value
return dict1

View file

@ -5,7 +5,7 @@ from langchain import SQLDatabase, utilities
from langflow.custom.customs import get_custom_nodes
from langflow.interface.base import LangChainTypeCreator
from langflow.interface.importing.utils import import_class
from langflow.services.getters import get_settings_manager
from langflow.services.getters import get_settings_service
from langflow.template.frontend_node.utilities import UtilitiesFrontendNode
from loguru import logger
@ -27,7 +27,7 @@ class UtilityCreator(LangChainTypeCreator):
from the langchain.chains module and filtering them according to the settings.utilities list.
"""
if self.type_dict is None:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
self.type_dict = {
utility_name: import_class(f"langchain.utilities.{utility_name}")
for utility_name in utilities.__all__
@ -37,8 +37,8 @@ class UtilityCreator(LangChainTypeCreator):
self.type_dict = {
name: utility
for name, utility in self.type_dict.items()
if name in settings_manager.settings.UTILITIES
or settings_manager.settings.DEV
if name in settings_service.settings.UTILITIES
or settings_service.settings.DEV
}
return self.type_dict

View file

@ -10,7 +10,7 @@ from langchain.base_language import BaseLanguageModel
from PIL.Image import Image
from loguru import logger
from langflow.services.chat.config import ChatConfig
from langflow.services.getters import get_settings_manager
from langflow.services.getters import get_settings_service
def load_file_into_dict(file_path: str) -> dict:
@ -64,11 +64,11 @@ def extract_input_variables_from_prompt(prompt: str) -> list[str]:
def setup_llm_caching():
"""Setup LLM caching."""
settings_manager = get_settings_manager()
settings_service = get_settings_service()
try:
set_langchain_cache(settings_manager.settings)
set_langchain_cache(settings_service.settings)
except ImportError:
logger.warning(f"Could not import {settings_manager.settings.CACHE}. ")
logger.warning(f"Could not import {settings_service.settings.CACHE_TYPE}. ")
except Exception as exc:
logger.warning(f"Could not setup LLM caching. Error: {exc}")
@ -80,7 +80,7 @@ def set_langchain_cache(settings):
if cache_type := os.getenv("LANGFLOW_LANGCHAIN_CACHE"):
try:
cache_class = import_class(
f"langchain.cache.{cache_type or settings.CACHE}"
f"langchain.cache.{cache_type or settings.LANGCHAIN_CACHE}"
)
logger.debug(f"Setting up LLM caching with {cache_class.__name__}")

View file

@ -4,7 +4,7 @@ from langchain import vectorstores
from langflow.interface.base import LangChainTypeCreator
from langflow.interface.importing.utils import import_class
from langflow.services.getters import get_settings_manager
from langflow.services.getters import get_settings_service
from langflow.template.frontend_node.vectorstores import VectorStoreFrontendNode
from loguru import logger
@ -44,12 +44,12 @@ class VectorstoreCreator(LangChainTypeCreator):
return None
def to_list(self) -> List[str]:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
return [
vectorstore
for vectorstore in self.type_to_loader_dict.keys()
if vectorstore in settings_manager.settings.VECTORSTORES
or settings_manager.settings.DEV
if vectorstore in settings_service.settings.VECTORSTORES
or settings_service.settings.DEV
]

View file

@ -2,10 +2,11 @@ import json
from pathlib import Path
from langchain.schema import AgentAction
from langflow.interface.run import (
build_sorted_vertices_with_caching,
build_sorted_vertices,
get_memory_key,
update_memory_keys,
)
from langflow.services.getters import get_session_service
from loguru import logger
from langflow.graph import Graph
from langchain.chains.base import Chain
@ -13,6 +14,8 @@ from langchain.vectorstores.base import VectorStore
from typing import Any, Dict, List, Optional, Tuple, Union
from langchain.schema import Document
from pydantic import BaseModel
def fix_memory_inputs(langchain_object):
"""
@ -65,7 +68,7 @@ def get_result_and_thought(langchain_object: Any, inputs: dict):
langchain_object.verbose = True
if hasattr(langchain_object, "return_intermediate_steps"):
langchain_object.return_intermediate_steps = True
langchain_object.return_intermediate_steps = False
fix_memory_inputs(langchain_object)
@ -93,26 +96,19 @@ def get_build_result(data_graph, session_id):
# otherwise, build the graph and return the result
if session_id:
logger.debug(f"Loading LangChain object from session {session_id}")
result = build_sorted_vertices_with_caching.get_result_by_session_id(session_id)
result = build_sorted_vertices(data_graph=data_graph)
if result is not None:
logger.debug("Loaded LangChain object")
return result
logger.debug("Building langchain object")
return build_sorted_vertices_with_caching(data_graph)
def clear_caches_if_needed(clear_cache: bool):
if clear_cache:
build_sorted_vertices_with_caching.clear_cache()
logger.debug("Cleared cache")
return build_sorted_vertices(data_graph)
def load_langchain_object(
data_graph: Dict[str, Any], session_id: str
) -> Tuple[Union[Chain, VectorStore], Dict[str, Any], str]:
langchain_object, artifacts = get_build_result(data_graph, session_id)
session_id = build_sorted_vertices_with_caching.hash
logger.debug("Loaded LangChain object")
if langchain_object is None:
@ -140,6 +136,7 @@ def generate_result(langchain_object: Union[Chain, VectorStore], inputs: dict):
raise ValueError("Inputs must be provided for a Chain")
logger.debug("Generating result and thought")
result = get_result_and_thought(langchain_object, inputs)
logger.debug("Generated result and thought")
elif isinstance(langchain_object, VectorStore):
result = langchain_object.search(**inputs)
@ -152,22 +149,34 @@ def generate_result(langchain_object: Union[Chain, VectorStore], inputs: dict):
return result
def process_graph_cached(
class Result(BaseModel):
result: Any
session_id: str
async def process_graph_cached(
data_graph: Dict[str, Any],
inputs: Optional[dict] = None,
clear_cache=False,
session_id=None,
) -> Tuple[Any, str]:
clear_caches_if_needed(clear_cache)
# If session_id is provided, load the langchain_object from the session
# else build the graph and return the result and the new session_id
langchain_object, artifacts, session_id = load_langchain_object(
data_graph, session_id
)
) -> Result:
session_service = get_session_service()
if clear_cache:
session_service.clear_session(session_id)
if session_id is None:
session_id = session_service.generate_key(
session_id=session_id, data_graph=data_graph
)
# Load the graph using SessionService
graph, artifacts = session_service.load_session(session_id, data_graph)
built_object = graph.build()
processed_inputs = process_inputs(inputs, artifacts)
result = generate_result(langchain_object, processed_inputs)
result = generate_result(built_object, processed_inputs)
# langchain_object is now updated with the new memory
# we need to update the cache with the updated langchain_object
session_service.update_session(session_id, (graph, artifacts))
return result, session_id
return Result(result=result, session_id=session_id)
def load_flow_from_json(

View file

@ -1,12 +1,12 @@
from langflow.services.factory import ServiceFactory
from langflow.services.auth.service import AuthManager
from langflow.services.auth.service import AuthService
class AuthManagerFactory(ServiceFactory):
name = "auth_manager"
class AuthServiceFactory(ServiceFactory):
name = "auth_service"
def __init__(self):
super().__init__(AuthManager)
super().__init__(AuthService)
def create(self, settings_manager):
return AuthManager(settings_manager)
def create(self, settings_service):
return AuthService(settings_service)

View file

@ -2,11 +2,11 @@ from langflow.services.base import Service
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from langflow.services.settings.manager import SettingsManager
from langflow.services.settings.manager import SettingsService
class AuthManager(Service):
name = "auth_manager"
class AuthService(Service):
name = "auth_service"
def __init__(self, settings_manager: "SettingsManager"):
self.settings_manager = settings_manager
def __init__(self, settings_service: "SettingsService"):
self.settings_service = settings_service

View file

@ -12,12 +12,12 @@ from langflow.services.database.models.user.crud import (
get_user_by_username,
update_user_last_login_at,
)
from langflow.services.getters import get_session, get_settings_manager
from langflow.services.getters import get_session, get_settings_service
from sqlmodel import Session
oauth2_login = OAuth2PasswordBearer(tokenUrl="api/v1/login")
API_KEY_NAME = "api-key"
API_KEY_NAME = "x-api-key"
api_key_query = APIKeyQuery(
name=API_KEY_NAME, scheme_name="API key query", auto_error=False
@ -33,17 +33,17 @@ async def api_key_security(
header_param: str = Security(api_key_header),
db: Session = Depends(get_session),
) -> Optional[User]:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
result: Optional[Union[ApiKey, User]] = None
if settings_manager.auth_settings.AUTO_LOGIN:
if settings_service.auth_settings.AUTO_LOGIN:
# Get the first user
if not settings_manager.auth_settings.SUPERUSER:
if not settings_service.auth_settings.SUPERUSER:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Missing first superuser credentials",
)
result = get_user_by_username(db, settings_manager.auth_settings.SUPERUSER)
result = get_user_by_username(db, settings_service.auth_settings.SUPERUSER)
elif not query_param and not header_param:
raise HTTPException(
@ -72,7 +72,7 @@ async def get_current_user(
token: Annotated[str, Depends(oauth2_login)],
db: Session = Depends(get_session),
) -> User:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
@ -83,14 +83,14 @@ async def get_current_user(
if isinstance(token, Coroutine):
token = await token
if settings_manager.auth_settings.SECRET_KEY is None:
if settings_service.auth_settings.SECRET_KEY is None:
raise credentials_exception
try:
payload = jwt.decode(
token,
settings_manager.auth_settings.SECRET_KEY,
algorithms=[settings_manager.auth_settings.ALGORITHM],
settings_service.auth_settings.SECRET_KEY,
algorithms=[settings_service.auth_settings.ALGORITHM],
)
user_id: UUID = payload.get("sub") # type: ignore
token_type: str = payload.get("type") # type: ignore
@ -130,19 +130,19 @@ def get_current_active_superuser(
def verify_password(plain_password, hashed_password):
settings_manager = get_settings_manager()
return settings_manager.auth_settings.pwd_context.verify(
settings_service = get_settings_service()
return settings_service.auth_settings.pwd_context.verify(
plain_password, hashed_password
)
def get_password_hash(password):
settings_manager = get_settings_manager()
return settings_manager.auth_settings.pwd_context.hash(password)
settings_service = get_settings_service()
return settings_service.auth_settings.pwd_context.hash(password)
def create_token(data: dict, expires_delta: timedelta):
settings_manager = get_settings_manager()
settings_service = get_settings_service()
to_encode = data.copy()
expire = datetime.now(timezone.utc) + expires_delta
@ -150,8 +150,8 @@ def create_token(data: dict, expires_delta: timedelta):
return jwt.encode(
to_encode,
settings_manager.auth_settings.SECRET_KEY,
algorithm=settings_manager.auth_settings.ALGORITHM,
settings_service.auth_settings.SECRET_KEY,
algorithm=settings_service.auth_settings.ALGORITHM,
)
@ -179,9 +179,9 @@ def create_super_user(
def create_user_longterm_token(db: Session = Depends(get_session)) -> dict:
settings_manager = get_settings_manager()
username = settings_manager.auth_settings.SUPERUSER
password = settings_manager.auth_settings.SUPERUSER_PASSWORD
settings_service = get_settings_service()
username = settings_service.auth_settings.SUPERUSER
password = settings_service.auth_settings.SUPERUSER_PASSWORD
if not username or not password:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
@ -225,10 +225,10 @@ def get_user_id_from_token(token: str) -> UUID:
def create_user_tokens(
user_id: UUID, db: Session = Depends(get_session), update_last_login: bool = False
) -> dict:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
access_token_expires = timedelta(
minutes=settings_manager.auth_settings.ACCESS_TOKEN_EXPIRE_MINUTES
minutes=settings_service.auth_settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
access_token = create_token(
data={"sub": str(user_id)},
@ -236,7 +236,7 @@ def create_user_tokens(
)
refresh_token_expires = timedelta(
minutes=settings_manager.auth_settings.REFRESH_TOKEN_EXPIRE_MINUTES
minutes=settings_service.auth_settings.REFRESH_TOKEN_EXPIRE_MINUTES
)
refresh_token = create_token(
data={"sub": str(user_id), "type": "rf"},
@ -255,13 +255,13 @@ def create_user_tokens(
def create_refresh_token(refresh_token: str, db: Session = Depends(get_session)):
settings_manager = get_settings_manager()
settings_service = get_settings_service()
try:
payload = jwt.decode(
refresh_token,
settings_manager.auth_settings.SECRET_KEY,
algorithms=[settings_manager.auth_settings.ALGORITHM],
settings_service.auth_settings.SECRET_KEY,
algorithms=[settings_service.auth_settings.ALGORITHM],
)
user_id: UUID = payload.get("sub") # type: ignore
token_type: str = payload.get("type") # type: ignore

View file

@ -1,10 +1,8 @@
from . import factory, manager
from langflow.services.cache.manager import cache_manager
from langflow.services.cache.flow import InMemoryCache
from langflow.services.cache.manager import InMemoryCache
__all__ = [
"cache_manager",
"factory",
"manager",
"InMemoryCache",

View file

@ -1,11 +1,13 @@
import abc
class BaseCache(abc.ABC):
class BaseCacheService(abc.ABC):
"""
Abstract base class for a cache.
"""
name = "cache_service"
@abc.abstractmethod
def get(self, key):
"""
@ -28,6 +30,16 @@ class BaseCache(abc.ABC):
value: The value to cache.
"""
@abc.abstractmethod
def upsert(self, key, value):
"""
Add an item to the cache if it doesn't exist, or update it if it does.
Args:
key: The key of the item.
value: The value to cache.
"""
@abc.abstractmethod
def delete(self, key):
"""

View file

@ -1,11 +1,35 @@
from langflow.services.cache.manager import CacheManager
from langflow.services.cache.manager import InMemoryCache, RedisCache, BaseCacheService
from langflow.services.factory import ServiceFactory
from langflow.utils.logger import logger
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from langflow.services.settings.manager import SettingsService
class CacheManagerFactory(ServiceFactory):
class CacheServiceFactory(ServiceFactory):
def __init__(self):
super().__init__(CacheManager)
super().__init__(BaseCacheService)
def create(self):
# Here you would have logic to create and configure a CacheManager
return CacheManager()
def create(self, settings_service: "SettingsService"):
# Here you would have logic to create and configure a CacheService
# based on the settings_service
if settings_service.settings.CACHE_TYPE == "redis":
logger.debug("Creating Redis cache")
redis_cache = RedisCache(
host=settings_service.settings.REDIS_HOST,
port=settings_service.settings.REDIS_PORT,
db=settings_service.settings.REDIS_DB,
expiration_time=settings_service.settings.REDIS_CACHE_EXPIRE,
)
if redis_cache.is_connected():
logger.debug("Redis cache is connected")
return redis_cache
logger.warning(
"Redis cache is not connected, falling back to in-memory cache"
)
return InMemoryCache()
elif settings_service.settings.CACHE_TYPE == "memory":
return InMemoryCache()

View file

@ -1,146 +0,0 @@
import threading
import time
from collections import OrderedDict
from langflow.services.cache.base import BaseCache
class InMemoryCache(BaseCache):
"""
A simple in-memory cache using an OrderedDict.
This cache supports setting a maximum size and expiration time for cached items.
When the cache is full, it uses a Least Recently Used (LRU) eviction policy.
Thread-safe using a threading Lock.
Attributes:
max_size (int, optional): Maximum number of items to store in the cache.
expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour.
Example:
cache = InMemoryCache(max_size=3, expiration_time=5)
# setting cache values
cache.set("a", 1)
cache.set("b", 2)
cache["c"] = 3
# getting cache values
a = cache.get("a")
b = cache["b"]
"""
def __init__(self, max_size=None, expiration_time=60 * 60):
"""
Initialize a new InMemoryCache instance.
Args:
max_size (int, optional): Maximum number of items to store in the cache.
expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour.
"""
self._cache = OrderedDict()
self._lock = threading.Lock()
self.max_size = max_size
self.expiration_time = expiration_time
def get(self, key):
"""
Retrieve an item from the cache.
Args:
key: The key of the item to retrieve.
Returns:
The value associated with the key, or None if the key is not found or the item has expired.
"""
with self._lock:
if key in self._cache:
item = self._cache.pop(key)
if (
self.expiration_time is None
or time.time() - item["time"] < self.expiration_time
):
# Move the key to the end to make it recently used
self._cache[key] = item
return item["value"]
else:
self.delete(key)
return None
def set(self, key, value):
"""
Add an item to the cache.
If the cache is full, the least recently used item is evicted.
Args:
key: The key of the item.
value: The value to cache.
"""
with self._lock:
if key in self._cache:
# Remove existing key before re-inserting to update order
self.delete(key)
elif self.max_size and len(self._cache) >= self.max_size:
# Remove least recently used item
self._cache.popitem(last=False)
self._cache[key] = {"value": value, "time": time.time()}
def get_or_set(self, key, value):
"""
Retrieve an item from the cache. If the item does not exist, set it with the provided value.
Args:
key: The key of the item.
value: The value to cache if the item doesn't exist.
Returns:
The cached value associated with the key.
"""
with self._lock:
if key in self._cache:
return self.get(key)
self.set(key, value)
return value
def delete(self, key):
"""
Remove an item from the cache.
Args:
key: The key of the item to remove.
"""
# with self._lock:
self._cache.pop(key, None)
def clear(self):
"""
Clear all items from the cache.
"""
with self._lock:
self._cache.clear()
def __contains__(self, key):
"""Check if the key is in the cache."""
return key in self._cache
def __getitem__(self, key):
"""Retrieve an item from the cache using the square bracket notation."""
return self.get(key)
def __setitem__(self, key, value):
"""Add an item to the cache using the square bracket notation."""
self.set(key, value)
def __delitem__(self, key):
"""Remove an item from the cache using the square bracket notation."""
self.delete(key)
def __len__(self):
"""Return the number of items in the cache."""
return len(self._cache)
def __repr__(self):
"""Return a string representation of the InMemoryCache instance."""
return f"InMemoryCache(max_size={self.max_size}, expiration_time={self.expiration_time})"

View file

@ -1,153 +1,330 @@
from contextlib import contextmanager
from typing import Any, Awaitable, Callable, List, Optional
import threading
import time
from collections import OrderedDict
from langflow.services.base import Service
import pandas as pd
from PIL import Image
from langflow.services.cache.base import BaseCacheService
import pickle
from loguru import logger
class Subject:
"""Base class for implementing the observer pattern."""
class InMemoryCache(BaseCacheService, Service):
def __init__(self):
self.observers: List[Callable[[], None]] = []
"""
A simple in-memory cache using an OrderedDict.
def attach(self, observer: Callable[[], None]):
"""Attach an observer to the subject."""
self.observers.append(observer)
This cache supports setting a maximum size and expiration time for cached items.
When the cache is full, it uses a Least Recently Used (LRU) eviction policy.
Thread-safe using a threading Lock.
def detach(self, observer: Callable[[], None]):
"""Detach an observer from the subject."""
self.observers.remove(observer)
Attributes:
max_size (int, optional): Maximum number of items to store in the cache.
expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour.
def notify(self):
"""Notify all observers about an event."""
for observer in self.observers:
if observer is None:
continue
observer()
Example:
cache = InMemoryCache(max_size=3, expiration_time=5)
class AsyncSubject:
"""Base class for implementing the async observer pattern."""
# setting cache values
cache.set("a", 1)
cache.set("b", 2)
cache["c"] = 3
def __init__(self):
self.observers: List[Callable[[], Awaitable]] = []
# getting cache values
a = cache.get("a")
b = cache["b"]
"""
def attach(self, observer: Callable[[], Awaitable]):
"""Attach an observer to the subject."""
self.observers.append(observer)
def detach(self, observer: Callable[[], Awaitable]):
"""Detach an observer from the subject."""
self.observers.remove(observer)
async def notify(self):
"""Notify all observers about an event."""
for observer in self.observers:
if observer is None:
continue
await observer()
class CacheManager(Subject, Service):
"""Manages cache for different clients and notifies observers on changes."""
name = "cache_manager"
def __init__(self):
super().__init__()
self._cache = {}
self.current_client_id = None
self.current_cache = {}
@contextmanager
def set_client_id(self, client_id: str):
def __init__(self, max_size=None, expiration_time=60 * 60):
"""
Context manager to set the current client_id and associated cache.
Initialize a new InMemoryCache instance.
Args:
client_id (str): The client identifier.
max_size (int, optional): Maximum number of items to store in the cache.
expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour.
"""
self._cache = OrderedDict()
self._lock = threading.RLock()
self.max_size = max_size
self.expiration_time = expiration_time
def get(self, key):
"""
Retrieve an item from the cache.
Args:
key: The key of the item to retrieve.
Returns:
The value associated with the key, or None if the key is not found or the item has expired.
"""
with self._lock:
return self._get_without_lock(key)
def _get_without_lock(self, key):
"""
Retrieve an item from the cache without acquiring the lock.
"""
if item := self._cache.get(key):
if (
self.expiration_time is None
or time.time() - item["time"] < self.expiration_time
):
# Move the key to the end to make it recently used
self._cache.move_to_end(key)
unpickled = pickle.loads(item["value"])
return unpickled
else:
self.delete(key)
return None
def set(self, key, value):
"""
Add an item to the cache.
If the cache is full, the least recently used item is evicted.
Args:
key: The key of the item.
value: The value to cache.
"""
with self._lock:
if key in self._cache:
# Remove existing key before re-inserting to update order
self.delete(key)
elif self.max_size and len(self._cache) >= self.max_size:
# Remove least recently used item
self._cache.popitem(last=False)
# pickle locally to mimic Redis
pickled = pickle.dumps(value)
self._cache[key] = {"value": pickled, "time": time.time()}
def upsert(self, key, value):
"""
Inserts or updates a value in the cache.
If the existing value and the new value are both dictionaries, they are merged.
Args:
key: The key of the item.
value: The value to insert or update.
"""
with self._lock:
existing_value = self._get_without_lock(key)
if (
existing_value is not None
and isinstance(existing_value, dict)
and isinstance(value, dict)
):
existing_value.update(value)
value = existing_value
self.set(key, value)
def get_or_set(self, key, value):
"""
Retrieve an item from the cache. If the item does not exist,
set it with the provided value.
Args:
key: The key of the item.
value: The value to cache if the item doesn't exist.
Returns:
The cached value associated with the key.
"""
with self._lock:
if key in self._cache:
return self.get(key)
self.set(key, value)
return value
def delete(self, key):
"""
Remove an item from the cache.
Args:
key: The key of the item to remove.
"""
with self._lock:
self._cache.pop(key, None)
def clear(self):
"""
Clear all items from the cache.
"""
with self._lock:
self._cache.clear()
def __contains__(self, key):
"""Check if the key is in the cache."""
return key in self._cache
def __getitem__(self, key):
"""Retrieve an item from the cache using the square bracket notation."""
return self.get(key)
def __setitem__(self, key, value):
"""Add an item to the cache using the square bracket notation."""
self.set(key, value)
def __delitem__(self, key):
"""Remove an item from the cache using the square bracket notation."""
self.delete(key)
def __len__(self):
"""Return the number of items in the cache."""
return len(self._cache)
def __repr__(self):
"""Return a string representation of the InMemoryCache instance."""
return f"InMemoryCache(max_size={self.max_size}, expiration_time={self.expiration_time})"
class RedisCache(BaseCacheService, Service):
"""
A Redis-based cache implementation.
This cache supports setting an expiration time for cached items.
Attributes:
expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour.
Example:
cache = RedisCache(expiration_time=5)
# setting cache values
cache.set("a", 1)
cache.set("b", 2)
cache["c"] = 3
# getting cache values
a = cache.get("a")
b = cache["b"]
"""
def __init__(self, host="localhost", port=6379, db=0, expiration_time=60 * 60):
"""
Initialize a new RedisCache instance.
Args:
host (str, optional): Redis host.
port (int, optional): Redis port.
db (int, optional): Redis DB.
expiration_time (int, optional): Time in seconds after which a
ached item expires. Default is 1 hour.
"""
previous_client_id = self.current_client_id
self.current_client_id = client_id
self.current_cache = self._cache.setdefault(client_id, {})
try:
yield
finally:
self.current_client_id = previous_client_id
self.current_cache = self._cache.get(self.current_client_id, {})
import redis
except ImportError as exc:
raise ImportError(
"RedisCache requires the redis-py package."
" Please install Langflow with the deploy extra: pip install langflow[deploy]"
) from exc
logger.warning(
"RedisCache is an experimental feature and may not work as expected."
" Please report any issues to our GitHub repository."
)
self._client = redis.StrictRedis(host=host, port=port, db=db)
self.expiration_time = expiration_time
def add(self, name: str, obj: Any, obj_type: str, extension: Optional[str] = None):
# check connection
def is_connected(self):
"""
Add an object to the current client's cache.
Check if the Redis client is connected.
"""
import redis
try:
self._client.ping()
return True
except redis.exceptions.ConnectionError:
return False
def get(self, key):
"""
Retrieve an item from the cache.
Args:
name (str): The cache key.
obj (Any): The object to cache.
obj_type (str): The type of the object.
"""
object_extensions = {
"image": "png",
"pandas": "csv",
}
if obj_type in object_extensions:
_extension = object_extensions[obj_type]
else:
_extension = type(obj).__name__.lower()
self.current_cache[name] = {
"obj": obj,
"type": obj_type,
"extension": extension or _extension,
}
self.notify()
def add_pandas(self, name: str, obj: Any):
"""
Add a pandas DataFrame or Series to the current client's cache.
Args:
name (str): The cache key.
obj (Any): The pandas DataFrame or Series object.
"""
if isinstance(obj, (pd.DataFrame, pd.Series)):
self.add(name, obj.to_csv(), "pandas", extension="csv")
else:
raise ValueError("Object is not a pandas DataFrame or Series")
def add_image(self, name: str, obj: Any, extension: str = "png"):
"""
Add a PIL Image to the current client's cache.
Args:
name (str): The cache key.
obj (Any): The PIL Image object.
"""
if isinstance(obj, Image.Image):
self.add(name, obj, "image", extension=extension)
else:
raise ValueError("Object is not a PIL Image")
def get(self, name: str):
"""
Get an object from the current client's cache.
Args:
name (str): The cache key.
key: The key of the item to retrieve.
Returns:
The cached object associated with the given cache key.
The value associated with the key, or None if the key is not found.
"""
return self.current_cache[name]
value = self._client.get(key)
return pickle.loads(value) if value else None
def get_last(self):
def set(self, key, value):
"""
Get the last added item in the current client's cache.
Add an item to the cache.
Returns:
The last added item in the cache.
Args:
key: The key of the item.
value: The value to cache.
"""
return list(self.current_cache.values())[-1]
try:
if pickled := pickle.dumps(value):
result = self._client.setex(key, self.expiration_time, pickled)
if not result:
raise ValueError("RedisCache could not set the value.")
except TypeError as exc:
raise TypeError(
"RedisCache only accepts values that can be pickled. "
) from exc
def upsert(self, key, value):
"""
Inserts or updates a value in the cache.
If the existing value and the new value are both dictionaries, they are merged.
cache_manager = CacheManager()
Args:
key: The key of the item.
value: The value to insert or update.
"""
existing_value = self.get(key)
if (
existing_value is not None
and isinstance(existing_value, dict)
and isinstance(value, dict)
):
existing_value.update(value)
value = existing_value
self.set(key, value)
def delete(self, key):
"""
Remove an item from the cache.
Args:
key: The key of the item to remove.
"""
self._client.delete(key)
def clear(self):
"""
Clear all items from the cache.
"""
self._client.flushdb()
def __contains__(self, key):
"""Check if the key is in the cache."""
return False if key is None else self._client.exists(key)
def __getitem__(self, key):
"""Retrieve an item from the cache using the square bracket notation."""
return self.get(key)
def __setitem__(self, key, value):
"""Add an item to the cache using the square bracket notation."""
self.set(key, value)
def __delitem__(self, key):
"""Remove an item from the cache using the square bracket notation."""
self.delete(key)
def __repr__(self):
"""Return a string representation of the RedisCache instance."""
return f"RedisCache(expiration_time={self.expiration_time})"

View file

@ -6,11 +6,14 @@ import os
import tempfile
from collections import OrderedDict
from pathlib import Path
from typing import Any, Dict
from typing import TYPE_CHECKING, Any, Dict
from appdirs import user_cache_dir
from fastapi import UploadFile
from langflow.services.database.models.base import orjson_dumps
if TYPE_CHECKING:
pass
CACHE: Dict[str, Any] = {}
CACHE_DIR = user_cache_dir("langflow", "langflow")
@ -166,7 +169,11 @@ def save_uploaded_file(file: UploadFile, folder_name):
"""
cache_path = Path(CACHE_DIR)
folder_path = cache_path / folder_name
file_extension = Path(file.filename).suffix
filename = file.filename
if isinstance(filename, str) or isinstance(filename, Path):
file_extension = Path(filename).suffix
else:
file_extension = ""
file_object = file.file
# Create the folder if it doesn't exist

View file

@ -0,0 +1,153 @@
from contextlib import contextmanager
from typing import Any, Awaitable, Callable, List, Optional
from langflow.services.base import Service
import pandas as pd
from PIL import Image
class Subject:
"""Base class for implementing the observer pattern."""
def __init__(self):
self.observers: List[Callable[[], None]] = []
def attach(self, observer: Callable[[], None]):
"""Attach an observer to the subject."""
self.observers.append(observer)
def detach(self, observer: Callable[[], None]):
"""Detach an observer from the subject."""
self.observers.remove(observer)
def notify(self):
"""Notify all observers about an event."""
for observer in self.observers:
if observer is None:
continue
observer()
class AsyncSubject:
"""Base class for implementing the async observer pattern."""
def __init__(self):
self.observers: List[Callable[[], Awaitable]] = []
def attach(self, observer: Callable[[], Awaitable]):
"""Attach an observer to the subject."""
self.observers.append(observer)
def detach(self, observer: Callable[[], Awaitable]):
"""Detach an observer from the subject."""
self.observers.remove(observer)
async def notify(self):
"""Notify all observers about an event."""
for observer in self.observers:
if observer is None:
continue
await observer()
class CacheService(Subject, Service):
"""Manages cache for different clients and notifies observers on changes."""
name = "cache_service"
def __init__(self):
super().__init__()
self._cache = {}
self.current_client_id = None
self.current_cache = {}
@contextmanager
def set_client_id(self, client_id: str):
"""
Context manager to set the current client_id and associated cache.
Args:
client_id (str): The client identifier.
"""
previous_client_id = self.current_client_id
self.current_client_id = client_id
self.current_cache = self._cache.setdefault(client_id, {})
try:
yield
finally:
self.current_client_id = previous_client_id
self.current_cache = self._cache.get(self.current_client_id, {})
def add(self, name: str, obj: Any, obj_type: str, extension: Optional[str] = None):
"""
Add an object to the current client's cache.
Args:
name (str): The cache key.
obj (Any): The object to cache.
obj_type (str): The type of the object.
"""
object_extensions = {
"image": "png",
"pandas": "csv",
}
if obj_type in object_extensions:
_extension = object_extensions[obj_type]
else:
_extension = type(obj).__name__.lower()
self.current_cache[name] = {
"obj": obj,
"type": obj_type,
"extension": extension or _extension,
}
self.notify()
def add_pandas(self, name: str, obj: Any):
"""
Add a pandas DataFrame or Series to the current client's cache.
Args:
name (str): The cache key.
obj (Any): The pandas DataFrame or Series object.
"""
if isinstance(obj, (pd.DataFrame, pd.Series)):
self.add(name, obj.to_csv(), "pandas", extension="csv")
else:
raise ValueError("Object is not a pandas DataFrame or Series")
def add_image(self, name: str, obj: Any, extension: str = "png"):
"""
Add a PIL Image to the current client's cache.
Args:
name (str): The cache key.
obj (Any): The PIL Image object.
"""
if isinstance(obj, Image.Image):
self.add(name, obj, "image", extension=extension)
else:
raise ValueError("Object is not a PIL Image")
def get(self, name: str):
"""
Get an object from the current client's cache.
Args:
name (str): The cache key.
Returns:
The cached object associated with the given cache key.
"""
return self.current_cache[name]
def get_last(self):
"""
Get the last added item in the current client's cache.
Returns:
The last added item in the cache.
"""
return list(self.current_cache.values())[-1]
cache_service = CacheService()

View file

@ -1,11 +1,11 @@
from langflow.services.chat.manager import ChatManager
from langflow.services.chat.manager import ChatService
from langflow.services.factory import ServiceFactory
class ChatManagerFactory(ServiceFactory):
class ChatServiceFactory(ServiceFactory):
def __init__(self):
super().__init__(ChatManager)
super().__init__(ChatService)
def create(self):
# Here you would have logic to create and configure a ChatManager
return ChatManager()
# Here you would have logic to create and configure a ChatService
return ChatService()

View file

@ -2,19 +2,17 @@ from collections import defaultdict
import uuid
from fastapi import WebSocket, status
from langflow.api.v1.schemas import ChatMessage, ChatResponse, FileResponse
from langflow.services.base import Service
from langflow.services import service_manager
from langflow.services.cache.manager import Subject
from langflow.services.chat.utils import process_graph
from langflow.interface.utils import pil_to_base64
from langflow.services.schema import ServiceType
from langflow.services.base import Service
from langflow.services.chat.cache import Subject
from langflow.services.chat.utils import process_graph
from loguru import logger
from .cache import cache_service
import asyncio
from typing import Any, Dict, List
from langflow.services.cache.flow import InMemoryCache
from langflow.services import service_manager, ServiceType
import orjson
@ -45,20 +43,20 @@ class ChatHistory(Subject):
self.history[client_id] = []
class ChatManager(Service):
name = "chat_manager"
class ChatService(Service):
name = "chat_service"
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
self.connection_ids: Dict[str, str] = {}
self.chat_history = ChatHistory()
self.cache_manager = service_manager.get(ServiceType.CACHE_MANAGER)
self.cache_manager.attach(self.update)
self.in_memory_cache = InMemoryCache()
self.chat_cache = cache_service
self.chat_cache.attach(self.update)
self.cache_service = service_manager.get(ServiceType.CACHE_SERVICE)
def on_chat_history_update(self):
"""Send the last chat message to the client."""
client_id = self.cache_manager.current_client_id
client_id = self.chat_cache.current_client_id
if client_id in self.active_connections:
chat_response = self.chat_history.get_history(
client_id, filter_messages=False
@ -79,8 +77,8 @@ class ChatManager(Service):
asyncio.run_coroutine_threadsafe(coroutine, loop)
def update(self):
if self.cache_manager.current_client_id in self.active_connections:
self.last_cached_object_dict = self.cache_manager.get_last()
if self.chat_cache.current_client_id in self.active_connections:
self.last_cached_object_dict = self.chat_cache.get_last()
# Add a new ChatResponse with the data
chat_response = FileResponse(
message=None,
@ -90,7 +88,7 @@ class ChatManager(Service):
)
self.chat_history.add_message(
self.cache_manager.current_client_id, chat_response
self.chat_cache.current_client_id, chat_response
)
async def connect(self, client_id: str, websocket: WebSocket):
@ -145,6 +143,7 @@ class ChatManager(Service):
websocket=self.active_connections[client_id],
session_id=self.connection_ids[client_id],
)
self.set_cache(client_id, langchain_object)
except Exception as e:
# Log stack trace
logger.exception(e)
@ -180,9 +179,15 @@ class ChatManager(Service):
"""
Set the cache for a client.
"""
# client_id is the flow id but that already exists in the cache
# so we need to change it to something else
self.in_memory_cache.set(client_id, langchain_object)
return client_id in self.in_memory_cache
result_dict = {
"result": langchain_object,
"type": type(langchain_object),
}
self.cache_service.upsert(client_id, result_dict)
return client_id in self.cache_service
async def handle_websocket(self, client_id: str, websocket: WebSocket):
await self.connect(client_id, websocket)
@ -203,10 +208,16 @@ class ChatManager(Service):
self.chat_history.history[client_id] = []
continue
with self.cache_manager.set_client_id(client_id):
langchain_object = self.in_memory_cache.get(client_id)
await self.process_message(client_id, payload, langchain_object)
with self.chat_cache.set_client_id(client_id):
if langchain_object := self.cache_service.get(client_id).get(
"result"
):
await self.process_message(client_id, payload, langchain_object)
else:
raise RuntimeError(
f"Could not find a LangChain object for client_id {client_id}"
)
except Exception as exc:
# Handle any exceptions that might occur
logger.error(f"Error handling websocket: {exc}")

View file

@ -1,17 +1,17 @@
from typing import TYPE_CHECKING
from langflow.services.database.manager import DatabaseManager
from langflow.services.database.manager import DatabaseService
from langflow.services.factory import ServiceFactory
if TYPE_CHECKING:
from langflow.services.settings.manager import SettingsManager
from langflow.services.settings.manager import SettingsService
class DatabaseManagerFactory(ServiceFactory):
class DatabaseServiceFactory(ServiceFactory):
def __init__(self):
super().__init__(DatabaseManager)
super().__init__(DatabaseService)
def create(self, settings_manager: "SettingsManager"):
# Here you would have logic to create and configure a DatabaseManager
if not settings_manager.settings.DATABASE_URL:
def create(self, settings_service: "SettingsService"):
# Here you would have logic to create and configure a DatabaseService
if not settings_service.settings.DATABASE_URL:
raise ValueError("No database URL provided")
return DatabaseManager(settings_manager.settings.DATABASE_URL)
return DatabaseService(settings_service.settings.DATABASE_URL)

View file

@ -3,7 +3,7 @@ from typing import TYPE_CHECKING
from langflow.services.base import Service
from langflow.services.database.models.user.crud import get_user_by_username
from langflow.services.database.utils import Result, TableResults
from langflow.services.getters import get_settings_manager
from langflow.services.getters import get_settings_service
from sqlalchemy import inspect
import sqlalchemy as sa
from sqlmodel import SQLModel, Session, create_engine
@ -16,8 +16,8 @@ if TYPE_CHECKING:
from sqlalchemy.engine import Engine
class DatabaseManager(Service):
name = "database_manager"
class DatabaseService(Service):
name = "database_service"
def __init__(self, database_url: str):
self.database_url = database_url
@ -30,10 +30,10 @@ class DatabaseManager(Service):
def _create_engine(self) -> "Engine":
"""Create the engine for the database."""
settings_manager = get_settings_manager()
settings_service = get_settings_service()
if (
settings_manager.settings.DATABASE_URL
and settings_manager.settings.DATABASE_URL.startswith("sqlite")
settings_service.settings.DATABASE_URL
and settings_service.settings.DATABASE_URL.startswith("sqlite")
):
connect_args = {"check_same_thread": False}
else:
@ -107,12 +107,10 @@ class DatabaseManager(Service):
# We will check that all models are in the database
# and that the database is up to date with all columns
sql_models = [models.Flow, models.User, models.ApiKey]
results = []
for sql_model in sql_models:
results.append(
TableResults(sql_model.__tablename__, self.check_table(sql_model))
)
return results
return [
TableResults(sql_model.__tablename__, self.check_table(sql_model))
for sql_model in sql_models
]
def check_table(self, model):
results = []
@ -164,12 +162,12 @@ class DatabaseManager(Service):
def teardown(self):
logger.debug("Tearing down database")
try:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
# remove the default superuser if auto_login is enabled
# using the SUPERUSER to get the user
if settings_manager.auth_settings.AUTO_LOGIN:
if settings_service.auth_settings.AUTO_LOGIN:
logger.debug("Removing default superuser")
username = settings_manager.auth_settings.SUPERUSER
username = settings_service.auth_settings.SUPERUSER
with Session(self.engine) as session:
user = get_user_by_username(session, username)
session.delete(user)

View file

@ -22,8 +22,11 @@ class ApiKey(ApiKeyBase, table=True):
api_key: str = Field(index=True, unique=True)
# User relationship
# Delete API keys when user is deleted
user_id: UUID = Field(index=True, foreign_key="user.id")
user: "User" = Relationship(back_populates="api_keys")
user: "User" = Relationship(
back_populates="api_keys",
)
class ApiKeyCreate(ApiKeyBase):

View file

@ -63,9 +63,15 @@ def check_key(session: Session, api_key: str) -> Optional[ApiKey]:
def update_total_uses(session, api_key: ApiKey):
"""Update the total uses and last used at."""
api_key.total_uses += 1
api_key.last_used_at = datetime.datetime.now(datetime.timezone.utc)
session.add(api_key)
session.commit()
session.refresh(api_key)
return api_key
# This is running in a separate thread to avoid slowing down the request
# but session is not thread safe so we need to create a new session
with Session(session.get_bind()) as new_session:
new_api_key = new_session.get(ApiKey, api_key.id)
if new_api_key is None:
raise ValueError("API Key not found")
new_api_key.total_uses += 1
new_api_key.last_used_at = datetime.datetime.now(datetime.timezone.utc)
new_session.add(new_api_key)
new_session.commit()
return new_api_key

View file

@ -21,7 +21,10 @@ class User(SQLModelSerializable, table=True):
create_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
last_login_at: Optional[datetime] = Field()
api_keys: list["ApiKey"] = Relationship(back_populates="user")
api_keys: list["ApiKey"] = Relationship(
back_populates="user",
sa_relationship_kwargs={"cascade": "delete"},
)
flows: list["Flow"] = Relationship(back_populates="user")
@ -42,6 +45,7 @@ class UserRead(SQLModel):
class UserUpdate(SQLModel):
username: Optional[str] = Field()
profile_image: Optional[str] = Field()
password: Optional[str] = Field()
is_active: Optional[bool] = Field()

View file

@ -6,21 +6,21 @@ from alembic.util.exc import CommandError
from sqlmodel import Session
if TYPE_CHECKING:
from langflow.services.database.manager import DatabaseManager
from langflow.services.database.manager import DatabaseService
def initialize_database():
logger.debug("Initializing database")
from langflow.services import service_manager, ServiceType
database_manager = service_manager.get(ServiceType.DATABASE_MANAGER)
database_service = service_manager.get(ServiceType.DATABASE_SERVICE)
try:
database_manager.check_schema_health()
database_service.check_schema_health()
except Exception as exc:
logger.error(f"Error checking schema health: {exc}")
raise RuntimeError("Error checking schema health") from exc
try:
database_manager.run_migrations()
database_service.run_migrations()
except CommandError as exc:
if "Can't locate revision identified by" not in str(exc):
raise exc
@ -30,23 +30,23 @@ def initialize_database():
logger.warning(
"Wrong revision in DB, deleting alembic_version table and running migrations again"
)
with session_getter(database_manager) as session:
with session_getter(database_service) as session:
session.execute("DROP TABLE alembic_version")
database_manager.run_migrations()
database_service.run_migrations()
except Exception as exc:
# if the exception involves tables already existing
# we can ignore it
if "already exists" not in str(exc):
logger.error(f"Error running migrations: {exc}")
raise RuntimeError("Error running migrations") from exc
database_manager.create_db_and_tables()
database_service.create_db_and_tables()
logger.debug("Database initialized")
@contextmanager
def session_getter(db_manager: "DatabaseManager"):
def session_getter(db_service: "DatabaseService"):
try:
session = Session(db_manager.engine)
session = Session(db_service.engine)
yield session
except Exception as e:
print("Session rollback because of exception:", e)

View file

@ -3,24 +3,46 @@ from typing import TYPE_CHECKING, Generator
if TYPE_CHECKING:
from langflow.services.database.manager import DatabaseManager
from langflow.services.settings.manager import SettingsManager
from langflow.services.chat.manager import ChatManager
from langflow.services.database.manager import DatabaseService
from langflow.services.settings.manager import SettingsService
from langflow.services.cache.manager import BaseCacheService
from langflow.services.session.manager import SessionService
from langflow.services.task.manager import TaskService
from langflow.services.chat.manager import ChatService
from sqlmodel import Session
def get_settings_manager() -> "SettingsManager":
return service_manager.get(ServiceType.SETTINGS_MANAGER)
def get_settings_service() -> "SettingsService":
try:
return service_manager.get(ServiceType.SETTINGS_SERVICE)
except ValueError:
# initialize settings service
from langflow.services.manager import initialize_settings_service
initialize_settings_service()
return service_manager.get(ServiceType.SETTINGS_SERVICE)
def get_db_manager() -> "DatabaseManager":
return service_manager.get(ServiceType.DATABASE_MANAGER)
def get_db_service() -> "DatabaseService":
return service_manager.get(ServiceType.DATABASE_SERVICE)
def get_session() -> Generator["Session", None, None]:
db_manager = service_manager.get(ServiceType.DATABASE_MANAGER)
yield from db_manager.get_session()
db_service = service_manager.get(ServiceType.DATABASE_SERVICE)
yield from db_service.get_session()
def get_chat_manager() -> "ChatManager":
return service_manager.get(ServiceType.CHAT_MANAGER)
def get_cache_service() -> "BaseCacheService":
return service_manager.get(ServiceType.CACHE_SERVICE)
def get_session_service() -> "SessionService":
return service_manager.get(ServiceType.SESSION_SERVICE)
def get_task_service() -> "TaskService":
return service_manager.get(ServiceType.TASK_SERVICE)
def get_chat_service() -> "ChatService":
return service_manager.get(ServiceType.CHAT_SERVICE)

View file

@ -1,9 +1,10 @@
from langflow.services.schema import ServiceType
from typing import TYPE_CHECKING, List, Optional
from typing import TYPE_CHECKING, Dict, List, Optional
from loguru import logger
if TYPE_CHECKING:
from langflow.services.factory import ServiceFactory
from langflow.services.base import Service
class ServiceManager:
@ -12,7 +13,7 @@ class ServiceManager:
"""
def __init__(self):
self.services = {}
self.services: Dict[str, "Service"] = {}
self.factories = {}
self.dependencies = {}
@ -86,6 +87,8 @@ class ServiceManager:
Teardown all the services.
"""
for service in self.services.values():
if service is None:
continue
logger.debug(f"Teardown service {service.name}")
service.teardown()
self.services = {}
@ -94,3 +97,107 @@ class ServiceManager:
service_manager = ServiceManager()
def initialize_services():
"""
Initialize all the services needed.
"""
from langflow.services.database import factory as database_factory
from langflow.services.cache import factory as cache_factory
from langflow.services.chat import factory as chat_factory
from langflow.services.settings import factory as settings_factory
from langflow.services.session import factory as session_service_factory
from langflow.services.auth import factory as auth_factory
from langflow.services.task import factory as task_factory
service_manager.register_factory(settings_factory.SettingsServiceFactory())
service_manager.register_factory(
database_factory.DatabaseServiceFactory(),
dependencies=[ServiceType.SETTINGS_SERVICE],
)
service_manager.register_factory(
cache_factory.CacheServiceFactory(), dependencies=[ServiceType.SETTINGS_SERVICE]
)
service_manager.register_factory(
auth_factory.AuthServiceFactory(), dependencies=[ServiceType.SETTINGS_SERVICE]
)
service_manager.register_factory(chat_factory.ChatServiceFactory())
service_manager.register_factory(
session_service_factory.SessionServiceFactory(),
dependencies=[ServiceType.CACHE_SERVICE],
)
service_manager.register_factory(
task_factory.TaskServiceFactory(),
)
# Test cache connection
service_manager.get(ServiceType.CACHE_SERVICE)
# Test database connection
service_manager.get(ServiceType.DATABASE_SERVICE)
# Test cache connection
service_manager.get(ServiceType.CACHE_SERVICE)
# Test database connection
service_manager.get(ServiceType.DATABASE_SERVICE)
def reinitialize_services():
"""
Reinitialize all the services needed.
"""
service_manager.update(ServiceType.SETTINGS_SERVICE)
service_manager.update(ServiceType.DATABASE_SERVICE)
service_manager.update(ServiceType.CACHE_SERVICE)
service_manager.update(ServiceType.CHAT_SERVICE)
service_manager.update(ServiceType.SESSION_SERVICE)
service_manager.update(ServiceType.AUTH_SERVICE)
service_manager.update(ServiceType.TASK_SERVICE)
# Test cache connection
service_manager.get(ServiceType.CACHE_SERVICE)
# Test database connection
service_manager.get(ServiceType.DATABASE_SERVICE)
# Test cache connection
service_manager.get(ServiceType.CACHE_SERVICE)
# Test database connection
service_manager.get(ServiceType.DATABASE_SERVICE)
def initialize_settings_service():
"""
Initialize the settings manager.
"""
from langflow.services.settings import factory as settings_factory
service_manager.register_factory(settings_factory.SettingsServiceFactory())
def initialize_session_service():
"""
Initialize the session manager.
"""
from langflow.services.session import factory as session_service_factory # type: ignore
from langflow.services.cache import factory as cache_factory
initialize_settings_service()
service_manager.register_factory(
cache_factory.CacheServiceFactory(), dependencies=[ServiceType.SETTINGS_SERVICE]
)
service_manager.register_factory(
session_service_factory.SessionServiceFactory(),
dependencies=[ServiceType.CACHE_SERVICE],
)
def teardown_services():
"""
Teardown all the services.
"""
service_manager.teardown()

View file

@ -1,4 +1,4 @@
from langflow.services.getters import get_settings_manager
from langflow.services.getters import get_settings_service
from langflow.utils.logger import logger
### Temporary implementation
@ -20,7 +20,7 @@ class LangfuseInstance:
logger.debug("Creating Langfuse instance")
from langfuse import Langfuse # type: ignore
settings_manager = get_settings_manager()
settings_manager = get_settings_service()
if (
settings_manager.settings.LANGFUSE_PUBLIC_KEY

View file

@ -7,8 +7,10 @@ class ServiceType(str, Enum):
registered with the service manager.
"""
AUTH_MANAGER = "auth_manager"
CACHE_MANAGER = "cache_manager"
SETTINGS_MANAGER = "settings_manager"
DATABASE_MANAGER = "database_manager"
CHAT_MANAGER = "chat_manager"
AUTH_SERVICE = "auth_service"
CACHE_SERVICE = "cache_service"
SETTINGS_SERVICE = "settings_service"
DATABASE_SERVICE = "database_service"
CHAT_SERVICE = "chat_service"
SESSION_SERVICE = "session_service"
TASK_SERVICE = "task_service"

View file

@ -0,0 +1,14 @@
from typing import TYPE_CHECKING
from langflow.services.session.manager import SessionService
from langflow.services.factory import ServiceFactory
if TYPE_CHECKING:
from langflow.services.cache.manager import BaseCacheService
class SessionServiceFactory(ServiceFactory):
def __init__(self):
super().__init__(SessionService)
def create(self, cache_service: "BaseCacheService"):
return SessionService(cache_service)

View file

@ -0,0 +1,47 @@
from typing import TYPE_CHECKING
from langflow.interface.run import build_sorted_vertices
from langflow.services.base import Service
from langflow.services.cache.utils import compute_dict_hash
from langflow.services.session.utils import session_id_generator
if TYPE_CHECKING:
from langflow.services.cache.base import BaseCacheService
class SessionService(Service):
name = "session_service"
def __init__(self, cache_service):
self.cache_service: "BaseCacheService" = cache_service
def load_session(self, key, data_graph):
# Check if the data is cached
if key in self.cache_service:
return self.cache_service.get(key)
if key is None:
key = self.generate_key(session_id=None, data_graph=data_graph)
# If not cached, build the graph and cache it
graph, artifacts = build_sorted_vertices(data_graph)
self.cache_service.set(key, (graph, artifacts))
return graph, artifacts
def build_key(self, session_id, data_graph):
json_hash = compute_dict_hash(data_graph)
return f"{session_id}{':' if session_id else ''}{json_hash}"
def generate_key(self, session_id, data_graph):
# Hash the JSON and combine it with the session_id to create a unique key
if session_id is None:
# generate a 5 char session_id to concatenate with the json_hash
session_id = session_id_generator()
return self.build_key(session_id, data_graph=data_graph)
def update_session(self, session_id, value):
self.cache_service.set(session_id, value)
def clear_session(self, session_id):
self.cache_service.delete(session_id)

View file

@ -0,0 +1,8 @@
import random
import string
def session_id_generator(size=6):
return "".join(
random.SystemRandom().choices(string.ascii_uppercase + string.digits, k=size)
)

View file

@ -35,6 +35,7 @@ class AuthSettings(BaseSettings):
# If AUTO_LOGIN = True
# > The application does not request login and logs in automatically as a super user.
AUTO_LOGIN: bool = False
NEW_USER_IS_ACTIVE: bool = False
SUPERUSER: str = DEFAULT_SUPERUSER
SUPERUSER_PASSWORD: str = DEFAULT_SUPERUSER_PASSWORD

View file

@ -37,9 +37,16 @@ class Settings(BaseSettings):
DEV: bool = False
DATABASE_URL: Optional[str] = None
CACHE: Optional[str] = None
CACHE_TYPE: str = "memory"
REMOVE_API_KEYS: bool = False
COMPONENTS_PATH: List[str] = []
LANGCHAIN_CACHE: str = "InMemoryCache"
# Redis
REDIS_HOST: str = "localhost"
REDIS_PORT: int = 6379
REDIS_DB: int = 0
REDIS_CACHE_EXPIRE: int = 3600
LANGFUSE_SECRET_KEY: Optional[str] = None
LANGFUSE_PUBLIC_KEY: Optional[str] = None

View file

@ -1,15 +1,15 @@
from pathlib import Path
from langflow.services.settings.manager import SettingsManager
from langflow.services.settings.manager import SettingsService
from langflow.services.factory import ServiceFactory
class SettingsManagerFactory(ServiceFactory):
class SettingsServiceFactory(ServiceFactory):
def __init__(self):
super().__init__(SettingsManager)
super().__init__(SettingsService)
def create(self):
# Here you would have logic to create and configure a SettingsManager
# Here you would have logic to create and configure a SettingsService
langflow_dir = Path(__file__).parent.parent.parent
return SettingsManager.load_settings_from_yaml(
return SettingsService.load_settings_from_yaml(
str(langflow_dir / "config.yaml")
)

View file

@ -6,8 +6,8 @@ import os
import yaml
class SettingsManager(Service):
name = "settings_manager"
class SettingsService(Service):
name = "settings_service"
def __init__(self, settings: Settings, auth_settings: AuthSettings):
super().__init__()
@ -15,7 +15,7 @@ class SettingsManager(Service):
self.auth_settings = auth_settings
@classmethod
def load_settings_from_yaml(cls, file_path: str) -> "SettingsManager":
def load_settings_from_yaml(cls, file_path: str) -> "SettingsService":
# Check if a string is a valid path or a file name
if "/" not in file_path:
# Get current path

View file

@ -0,0 +1,67 @@
from typing import Any, Callable, Optional, Tuple
import anyio
from langflow.services.task.backends.base import TaskBackend
from loguru import logger
class AnyIOTaskResult:
def __init__(self, scope):
self._scope = scope
self._status = "PENDING"
self._result = None
self._exception = None
@property
def status(self) -> str:
if self._status == "DONE":
return "FAILURE" if self._exception is not None else "SUCCESS"
return self._status
@property
def result(self) -> Any:
return self._result
def ready(self) -> bool:
return self._status == "DONE"
async def run(self, func, *args, **kwargs):
try:
self._result = await func(*args, **kwargs)
except Exception as e:
self._exception = e
finally:
self._status = "DONE"
class AnyIOBackend(TaskBackend):
def __init__(self):
self.tasks = {}
async def launch_task(
self, task_func: Callable[..., Any], *args: Any, **kwargs: Any
) -> Tuple[Optional[str], Optional[AnyIOTaskResult]]:
"""
Launch a new task in an asynchronous manner.
Parameters:
task_func: The asynchronous function to run.
*args: Positional arguments to pass to task_func.
**kwargs: Keyword arguments to pass to task_func.
Returns:
A tuple containing a unique task ID and the task result object.
"""
async with anyio.create_task_group() as tg:
try:
task_result = AnyIOTaskResult(tg)
tg.start_soon(task_result.run, task_func, *args, **kwargs)
task_id = str(id(task_result))
self.tasks[task_id] = task_result
logger.info(f"Task {task_id} started.")
return task_id, task_result
except Exception as e:
logger.error(f"An error occurred while launching the task: {e}")
return None, None
def get_task(self, task_id: str) -> Any:
return self.tasks.get(task_id)

View file

@ -0,0 +1,12 @@
from abc import ABC, abstractmethod
from typing import Any, Callable
class TaskBackend(ABC):
@abstractmethod
def launch_task(self, task_func: Callable[..., Any], *args: Any, **kwargs: Any):
pass
@abstractmethod
def get_task(self, task_id: str) -> Any:
pass

View file

@ -0,0 +1,23 @@
from typing import Any, Callable
from celery.result import AsyncResult # type: ignore
from langflow.services.task.backends.base import TaskBackend
from langflow.worker import celery_app
class CeleryBackend(TaskBackend):
def __init__(self):
self.celery_app = celery_app
def launch_task(
self, task_func: Callable[..., Any], *args: Any, **kwargs: Any
) -> tuple[str, AsyncResult]:
# I need to type the delay method to make it easier
from celery import Task # type: ignore
if not hasattr(task_func, "delay"):
raise ValueError(f"Task function {task_func} does not have a delay method")
task: Task = task_func.delay(*args, **kwargs)
return task.id, AsyncResult(task.id, app=self.celery_app)
def get_task(self, task_id: str) -> Any:
return AsyncResult(task_id, app=self.celery_app)

View file

@ -0,0 +1,11 @@
from langflow.services.task.manager import TaskService
from langflow.services.factory import ServiceFactory
class TaskServiceFactory(ServiceFactory):
def __init__(self):
super().__init__(TaskService)
def create(self):
# Here you would have logic to create and configure a TaskService
return TaskService()

View file

@ -0,0 +1,69 @@
from typing import Any, Callable, Coroutine, Union
from loguru import logger
from langflow.services.base import Service
from langflow.services.task.backends.anyio import AnyIOBackend
from langflow.services.task.backends.base import TaskBackend
from langflow.services.task.utils import get_celery_worker_status
def check_celery_availability():
from langflow.worker import celery_app
try:
status = get_celery_worker_status(celery_app)
logger.debug(f"Celery status: {status}")
except Exception as e:
logger.error(f"An error occurred: {e}")
status = {"availability": None}
return status
try:
status = check_celery_availability()
USE_CELERY = status.get("availability") is not None
except ImportError:
USE_CELERY = False
class TaskService(Service):
name = "task_service"
def __init__(self):
self.backend = self.get_backend()
self.use_celery = USE_CELERY
def get_backend(self) -> TaskBackend:
if USE_CELERY:
from langflow.services.task.backends.celery import CeleryBackend
logger.debug("Using Celery backend")
return CeleryBackend()
logger.debug("Using AnyIO backend")
return AnyIOBackend()
# In your TaskService class
async def launch_and_await_task(
self,
task_func: Callable[..., Any],
*args: Any,
**kwargs: Any,
) -> Any:
if not self.use_celery:
return None, await task_func(*args, **kwargs)
if not hasattr(task_func, "apply"):
raise ValueError(f"Task function {task_func} does not have an apply method")
task = task_func.apply(args=args, kwargs=kwargs)
result = task.get()
return task.id, result
async def launch_task(
self, task_func: Callable[..., Any], *args: Any, **kwargs: Any
) -> Any:
logger.debug(f"Launching task {task_func} with args {args} and kwargs {kwargs}")
logger.debug(f"Using backend {self.backend}")
task = self.backend.launch_task(task_func, *args, **kwargs)
return await task if isinstance(task, Coroutine) else task
def get_task(self, task_id: Union[int, str]) -> Any:
return self.backend.get_task(task_id)

View file

@ -0,0 +1,22 @@
import contextlib
from typing import TYPE_CHECKING
if TYPE_CHECKING:
with contextlib.suppress(ImportError):
from celery import Celery # type: ignore
def get_celery_worker_status(app: "Celery"):
i = app.control.inspect()
availability = app.control.ping()
stats = i.stats()
registered_tasks = i.registered()
active_tasks = i.active()
scheduled_tasks = i.scheduled()
return {
"availability": availability,
"stats": stats,
"registered_tasks": registered_tasks,
"active_tasks": active_tasks,
"scheduled_tasks": scheduled_tasks,
}

View file

@ -6,24 +6,22 @@ from langflow.services.settings.constants import (
DEFAULT_SUPERUSER,
DEFAULT_SUPERUSER_PASSWORD,
)
from .getters import get_session, get_settings_manager
from .getters import get_session, get_settings_service
from loguru import logger
def setup_superuser():
def setup_superuser(settings_service, session):
"""
Setup the superuser.
"""
# We will use the SUPERUSER and SUPERUSER_PASSWORD
# vars on settings_manager.auth_settings to create the superuser
# if it does not exist.
settings_manager = get_settings_manager()
if settings_manager.auth_settings.AUTO_LOGIN:
if settings_service.auth_settings.AUTO_LOGIN:
logger.debug("AUTO_LOGIN is set to True. Creating default superuser.")
session = next(get_session())
username = settings_manager.auth_settings.SUPERUSER
password = settings_manager.auth_settings.SUPERUSER_PASSWORD
username = settings_service.auth_settings.SUPERUSER
password = settings_service.auth_settings.SUPERUSER_PASSWORD
if username == DEFAULT_SUPERUSER and password == DEFAULT_SUPERUSER_PASSWORD:
logger.debug("Default superuser credentials detected.")
logger.debug("Creating default superuser.")
@ -50,21 +48,20 @@ def setup_superuser():
"Could not create superuser. Please create a superuser manually."
) from exc
# reset superuser credentials
settings_manager.auth_settings.reset_credentials()
settings_service.auth_settings.reset_credentials()
logger.debug("Superuser created successfully.")
def teardown_superuser():
def teardown_superuser(settings_service, session):
"""
Teardown the superuser.
"""
# If AUTO_LOGIN is True, we will remove the default superuser
# from the database.
settings_manager = get_settings_manager()
if settings_manager.auth_settings.AUTO_LOGIN:
if settings_service.auth_settings.AUTO_LOGIN:
logger.debug("AUTO_LOGIN is set to True. Removing default superuser.")
session = next(get_session())
username = settings_manager.auth_settings.SUPERUSER
username = settings_service.auth_settings.SUPERUSER
from langflow.services.database.models.user.user import User
user = session.query(User).filter(User.username == username).first()
@ -80,35 +77,38 @@ def teardown_services():
"""
Teardown all the services.
"""
teardown_superuser()
service_manager.teardown()
try:
teardown_superuser(get_settings_service(), next(get_session()))
service_manager.teardown()
except Exception as exc:
logger.exception(exc)
def initialize_settings_manager():
def initialize_settings_service():
"""
Initialize the settings manager.
"""
from langflow.services.settings import factory as settings_factory
service_manager.register_factory(settings_factory.SettingsManagerFactory())
service_manager.register_factory(settings_factory.SettingsServiceFactory())
def initialize_session_manager():
def initialize_session_service():
"""
Initialize the session manager.
"""
from langflow.services.session import factory as session_manager_factory # type: ignore
from langflow.services.session import factory as session_service_factory # type: ignore
from langflow.services.cache import factory as cache_factory
initialize_settings_manager()
initialize_settings_service()
service_manager.register_factory(
cache_factory.CacheManagerFactory(), dependencies=[ServiceType.SETTINGS_MANAGER]
cache_factory.CacheServiceFactory(), dependencies=[ServiceType.SETTINGS_SERVICE]
)
service_manager.register_factory(
session_manager_factory.SessionManagerFactory(),
dependencies=[ServiceType.CACHE_MANAGER],
session_service_factory.SessionServiceFactory(),
dependencies=[ServiceType.CACHE_SERVICE],
)
@ -121,23 +121,33 @@ def initialize_services():
from langflow.services.chat import factory as chat_factory
from langflow.services.settings import factory as settings_factory
from langflow.services.auth import factory as auth_factory
from langflow.services.task import factory as task_factory
from langflow.services.session import factory as session_service_factory # type: ignore
service_manager.register_factory(settings_factory.SettingsManagerFactory())
service_manager.register_factory(settings_factory.SettingsServiceFactory())
service_manager.register_factory(
auth_factory.AuthManagerFactory(), dependencies=[ServiceType.SETTINGS_MANAGER]
auth_factory.AuthServiceFactory(), dependencies=[ServiceType.SETTINGS_SERVICE]
)
service_manager.register_factory(
database_factory.DatabaseManagerFactory(),
dependencies=[ServiceType.SETTINGS_MANAGER],
database_factory.DatabaseServiceFactory(),
dependencies=[ServiceType.SETTINGS_SERVICE],
)
service_manager.register_factory(cache_factory.CacheManagerFactory())
service_manager.register_factory(chat_factory.ChatManagerFactory())
service_manager.register_factory(
cache_factory.CacheServiceFactory(), dependencies=[ServiceType.SETTINGS_SERVICE]
)
service_manager.register_factory(chat_factory.ChatServiceFactory())
service_manager.register_factory(task_factory.TaskServiceFactory())
service_manager.register_factory(
session_service_factory.SessionServiceFactory(),
dependencies=[ServiceType.CACHE_SERVICE],
)
# Test cache connection
service_manager.get(ServiceType.CACHE_MANAGER)
service_manager.get(ServiceType.CACHE_SERVICE)
# Test database connection
db_manager = service_manager.get(ServiceType.DATABASE_MANAGER)
service_manager.get(ServiceType.DATABASE_SERVICE)
# Setup the superuser
initialize_database()
if db_manager.ready:
setup_superuser()
session = next(get_session())
setup_superuser(service_manager.get(ServiceType.SETTINGS_SERVICE), session)

View file

@ -48,6 +48,8 @@ def python_function(text: str) -> str:
return text
"""
PYTHON_BASIC_TYPES = [str, bool, int, float, tuple, list, dict, set]
DIRECT_TYPES = [
"str",
"bool",

View file

@ -0,0 +1,62 @@
from langflow.core.celery_app import celery_app
from typing import Any, Dict, Optional
from typing import TYPE_CHECKING
from celery.exceptions import SoftTimeLimitExceeded # type: ignore
from langflow.processing.process import (
Result,
generate_result,
process_inputs,
)
from langflow.services.manager import initialize_session_service
from langflow.services.getters import get_session_service
if TYPE_CHECKING:
from langflow.graph.vertex.base import Vertex
@celery_app.task(acks_late=True)
def test_celery(word: str) -> str:
return f"test task return {word}"
@celery_app.task(bind=True, soft_time_limit=30, max_retries=3)
def build_vertex(self, vertex: "Vertex") -> "Vertex":
"""
Build a vertex
"""
try:
vertex.task_id = self.request.id
vertex.build()
return vertex
except SoftTimeLimitExceeded as e:
raise self.retry(
exc=SoftTimeLimitExceeded("Task took too long"), countdown=2
) from e
@celery_app.task(acks_late=True)
def process_graph_cached_task(
data_graph: Dict[str, Any],
inputs: Optional[dict] = None,
clear_cache=False,
session_id=None,
) -> Dict[str, Any]:
initialize_session_service()
session_service = get_session_service()
if clear_cache:
session_service.clear_session(session_id)
if session_id is None:
session_id = session_service.generate_key(
session_id=session_id, data_graph=data_graph
)
# Load the graph using SessionService
graph, artifacts = session_service.load_session(session_id, data_graph)
built_object = graph.build()
processed_inputs = process_inputs(inputs, artifacts)
result = generate_result(built_object, processed_inputs)
# langchain_object is now updated with the new memory
# we need to update the cache with the updated langchain_object
session_service.update_session(session_id, (graph, artifacts))
return Result(result=result, session_id=session_id).dict()

View file

@ -1,10 +1,16 @@
FROM node:14-alpine as frontend_build
ARG BACKEND
FROM node:20-alpine as frontend_build
ARG BACKEND_URL
WORKDIR /app
COPY . /app
COPY ./package.json ./package-lock.json ./tsconfig.json ./vite.config.ts ./index.html ./tailwind.config.js ./postcss.config.js ./prettier.config.js /app/
RUN npm install
COPY ./src /app/src
RUN npm run build
FROM nginx
COPY --from=frontend_build /app/build/ /usr/share/nginx/html
COPY /nginx.conf /etc/nginx/conf.d/default.conf
COPY /nginx.conf /etc/nginx/conf.d/default.conf
COPY start-nginx.sh /start-nginx.sh
RUN chmod +x /start-nginx.sh
ENV BACKEND_URL=$BACKEND_URL
CMD ["/start-nginx.sh"]

View file

@ -1,18 +1,20 @@
server {
gzip on;
gzip_comp_level 2;
gzip_min_length 1000;
gzip_types text/xml text/css;
gzip_http_version 1.1;
gzip_vary on;
gzip_disable "MSIE [4-6] \.";
gzip on;
gzip_comp_level 2;
gzip_min_length 1000;
gzip_types text/xml text/css;
gzip_http_version 1.1;
gzip_vary on;
gzip_disable "MSIE [4-6] \.";
listen 80;
listen 80;
location / {
root /usr/share/nginx/html;
index index.html index.htm;
try_files $uri $uri/ /index.html =404;
}
location / {
root /usr/share/nginx/html;
index index.html index.htm;
try_files $uri $uri/ /index.html =404;
}
include /etc/nginx/extra-conf.d/*.conf;
}

File diff suppressed because it is too large Load diff

View file

@ -73,7 +73,7 @@
"start": "vite",
"build": "vite build",
"serve": "vite preview",
"format": "npx prettier --write \"**/*.{js,jsx,ts,tsx,json,md}\"",
"format": "npx prettier --write \"./**/*.{js,jsx,ts,tsx,json,md}\"",
"type-check": "tsc --noEmit --pretty --project tsconfig.json && vite"
},
"eslintConfig": {

View file

@ -63,7 +63,7 @@ export default function AdminPage() {
function getUsers() {
setLoadingUsers(true);
getUsersPage(index, size)
getUsersPage(index - 1, size)
.then((users) => {
setTotalRowsCount(users["total_count"]);
userList.current = users["users"];

View file

@ -0,0 +1,8 @@
#!/bin/sh
# Replace the placeholder with the actual value
sed -i "s|__BACKEND_URL__|$BACKEND_URL|g" /etc/nginx/conf.d/default.conf
# Start nginx
exec nginx -g 'daemon off;'