Change all Services names from Manager to Service

This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-09-15 17:41:39 -03:00
commit 651595932c
60 changed files with 454 additions and 414 deletions

View file

@ -1,7 +1,7 @@
from importlib import metadata
# Deactivate cache manager for now
# from langflow.services.cache import cache_manager
# from langflow.services.cache import cache_service
from langflow.processing.process import load_flow_from_json
from langflow.interface.custom.custom_component import CustomComponent
@ -12,4 +12,4 @@ except metadata.PackageNotFoundError:
__version__ = ""
del metadata # optional, avoids polluting the results of dir(__package__)
__all__ = ["load_flow_from_json", "cache_manager", "CustomComponent"]
__all__ = ["load_flow_from_json", "cache_service", "CustomComponent"]

View file

@ -2,8 +2,8 @@ import sys
import time
import httpx
from langflow.services.database.utils import session_getter
from langflow.services.manager import initialize_services, initialize_settings_manager
from langflow.services.utils import get_db_manager, get_settings_manager
from langflow.services.manager import initialize_services, initialize_settings_service
from langflow.services.utils import get_db_service, get_settings_service
from multiprocess import Process, cpu_count # type: ignore
import platform
@ -63,20 +63,20 @@ def update_settings(
"""Update the settings from a config file."""
# Check for database_url in the environment variables
initialize_settings_manager()
settings_manager = get_settings_manager()
initialize_settings_service()
settings_service = get_settings_service()
if config:
logger.debug(f"Loading settings from {config}")
settings_manager.settings.update_from_yaml(config, dev=dev)
settings_service.settings.update_from_yaml(config, dev=dev)
if remove_api_keys:
logger.debug(f"Setting remove_api_keys to {remove_api_keys}")
settings_manager.settings.update_settings(REMOVE_API_KEYS=remove_api_keys)
settings_service.settings.update_settings(REMOVE_API_KEYS=remove_api_keys)
if cache:
logger.debug(f"Setting cache to {cache}")
settings_manager.settings.update_settings(CACHE=cache)
settings_service.settings.update_settings(CACHE=cache)
if components_path:
logger.debug(f"Adding component path {components_path}")
settings_manager.settings.update_settings(COMPONENTS_PATH=components_path)
settings_service.settings.update_settings(COMPONENTS_PATH=components_path)
def serve_on_jcloud():
@ -352,8 +352,8 @@ def superuser(
),
):
initialize_services()
db_manager = get_db_manager()
with session_getter(db_manager) as session:
db_service = get_db_service()
with session_getter(db_service) as session:
from langflow.services.auth.utils import create_super_user
if create_super_user(db=session, username=username, password=password):
@ -374,10 +374,10 @@ def superuser(
@app.command()
def migration(test: bool = typer.Option(False, help="Run migrations in test mode.")):
initialize_services()
db_manager = get_db_manager()
db_service = get_db_service()
if not test:
db_manager.run_migrations()
results = db_manager.run_migrations_test()
db_service.run_migrations()
results = db_service.run_migrations_test()
display_results(results)

View file

@ -13,12 +13,12 @@ from langflow.api.v1.schemas import BuildStatus, BuiltResponse, InitResponse, St
from langflow.graph.graph.base import Graph
from langflow.services.auth.utils import get_current_active_user, get_current_user
from langflow.services.utils import get_cache_manager, get_session
from langflow.services.utils import get_cache_service, get_session
from loguru import logger
from langflow.services.utils import get_chat_manager
from langflow.services.utils import get_chat_service
from sqlmodel import Session
from langflow.services.chat.manager import ChatManager
from langflow.services.cache.manager import BaseCacheManager
from langflow.services.chat.manager import ChatService
from langflow.services.cache.manager import BaseCacheService
router = APIRouter(tags=["Chat"])
@ -30,7 +30,7 @@ async def chat(
websocket: WebSocket,
token: str = Query(...),
db: Session = Depends(get_session),
chat_manager: "ChatManager" = Depends(get_chat_manager),
chat_service: "ChatService" = Depends(get_chat_service),
):
"""Websocket endpoint for chat."""
try:
@ -45,8 +45,8 @@ async def chat(
code=status.WS_1008_POLICY_VIOLATION, reason="Unauthorized"
)
if client_id in chat_manager.cache_manager:
await chat_manager.handle_websocket(client_id, websocket)
if client_id in chat_service.cache_service:
await chat_service.handle_websocket(client_id, websocket)
else:
# We accept the connection but close it immediately
# if the flow is not built yet
@ -71,8 +71,8 @@ async def init_build(
graph_data: dict,
flow_id: str,
current_user=Depends(get_current_active_user),
chat_manager: "ChatManager" = Depends(get_chat_manager),
cache_manager: "BaseCacheManager" = Depends(get_cache_manager),
chat_service: "ChatService" = Depends(get_chat_service),
cache_service: "BaseCacheService" = Depends(get_cache_service),
):
"""Initialize the build by storing graph data and returning a unique session ID."""
try:
@ -80,17 +80,17 @@ async def init_build(
raise ValueError("No ID provided")
# Check if already building
if (
flow_id in cache_manager
and isinstance(cache_manager[flow_id], dict)
and cache_manager[flow_id].get("status") == BuildStatus.IN_PROGRESS
flow_id in cache_service
and isinstance(cache_service[flow_id], dict)
and cache_service[flow_id].get("status") == BuildStatus.IN_PROGRESS
):
return InitResponse(flowId=flow_id)
# Delete from cache if already exists
if flow_id in chat_manager.cache_manager:
chat_manager.cache_manager.delete(flow_id)
if flow_id in chat_service.cache_service:
chat_service.cache_service.delete(flow_id)
logger.debug(f"Deleted flow {flow_id} from cache")
cache_manager[flow_id] = {
cache_service[flow_id] = {
"graph_data": graph_data,
"status": BuildStatus.STARTED,
"user_id": current_user.id,
@ -104,13 +104,13 @@ async def init_build(
@router.get("/build/{flow_id}/status", response_model=BuiltResponse)
async def build_status(
flow_id: str, cache_manager: "BaseCacheManager" = Depends(get_cache_manager)
flow_id: str, cache_service: "BaseCacheService" = Depends(get_cache_service)
):
"""Check the flow_id is in the cache_manager."""
"""Check the flow_id is in the cache_service."""
try:
built = (
flow_id in cache_manager
and cache_manager[flow_id]["status"] == BuildStatus.SUCCESS
flow_id in cache_service
and cache_service[flow_id]["status"] == BuildStatus.SUCCESS
)
return BuiltResponse(
@ -125,8 +125,8 @@ async def build_status(
@router.get("/build/stream/{flow_id}", response_class=StreamingResponse)
async def stream_build(
flow_id: str,
chat_manager: "ChatManager" = Depends(get_chat_manager),
cache_manager: "BaseCacheManager" = Depends(get_cache_manager),
chat_service: "ChatService" = Depends(get_chat_service),
cache_service: "BaseCacheService" = Depends(get_cache_service),
):
"""Stream the build process based on stored flow data."""
@ -134,18 +134,18 @@ async def stream_build(
final_response = {"end_of_stream": True}
artifacts = {}
try:
if flow_id not in cache_manager:
if flow_id not in cache_service:
error_message = "Invalid session ID"
yield str(StreamData(event="error", data={"error": error_message}))
return
if cache_manager[flow_id].get("status") == BuildStatus.IN_PROGRESS:
if cache_service[flow_id].get("status") == BuildStatus.IN_PROGRESS:
error_message = "Already building"
yield str(StreamData(event="error", data={"error": error_message}))
return
graph_data = cache_manager[flow_id].get("graph_data")
cache_manager[flow_id]["user_id"]
graph_data = cache_service[flow_id].get("graph_data")
cache_service[flow_id]["user_id"]
if not graph_data:
error_message = "No data provided"
@ -158,7 +158,7 @@ async def stream_build(
graph = Graph.from_payload(graph_data)
number_of_nodes = len(graph.nodes)
cache_manager[flow_id]["status"] = BuildStatus.IN_PROGRESS
cache_service[flow_id]["status"] = BuildStatus.IN_PROGRESS
for i, vertex in enumerate(graph.generator_build(), 1):
try:
@ -185,7 +185,7 @@ async def stream_build(
logger.exception(exc)
params = str(exc)
valid = False
cache_manager[flow_id]["status"] = BuildStatus.FAILURE
cache_service[flow_id]["status"] = BuildStatus.FAILURE
response = {
"valid": valid,
@ -209,14 +209,14 @@ async def stream_build(
"handle_keys": [],
}
yield str(StreamData(event="message", data=input_keys_response))
chat_manager.set_cache(flow_id, langchain_object)
chat_service.set_cache(flow_id, langchain_object)
# We need to reset the chat history
chat_manager.chat_history.empty_history(flow_id)
cache_manager[flow_id]["status"] = BuildStatus.SUCCESS
chat_service.chat_history.empty_history(flow_id)
cache_service[flow_id]["status"] = BuildStatus.SUCCESS
except Exception as exc:
logger.exception(exc)
logger.error("Error while building the flow: %s", exc)
cache_manager[flow_id]["status"] = BuildStatus.FAILURE
cache_service[flow_id]["status"] = BuildStatus.FAILURE
yield str(StreamData(event="error", data={"error": str(exc)}))
finally:
yield str(StreamData(event="message", data=final_response))

View file

@ -1,5 +1,5 @@
from http import HTTPStatus
from typing import Annotated, Any, Optional, Union
from typing import TYPE_CHECKING, Annotated, Any, Optional, Union
from langflow.services.auth.utils import api_key_security, get_current_active_user
@ -7,7 +7,7 @@ from langflow.services.cache.utils import save_uploaded_file
from langflow.services.database.models.flow import Flow
from langflow.processing.process import process_graph_cached, process_tweaks
from langflow.services.database.models.user.user import User
from langflow.services.utils import get_settings_manager, get_task_manager
from langflow.services.utils import get_settings_service, get_task_service
from loguru import logger
from fastapi import APIRouter, Depends, HTTPException, UploadFile, Body, status
import sqlalchemy as sa
@ -33,7 +33,8 @@ from langflow.services.utils import get_session
from langflow.worker import process_graph_cached_task
from sqlmodel import Session
from langflow.services.task.manager import TaskManager
if TYPE_CHECKING:
from langflow.services.task.manager import TaskService
# build router
router = APIRouter(tags=["Base"])
@ -41,21 +42,21 @@ router = APIRouter(tags=["Base"])
@router.get("/all", dependencies=[Depends(get_current_active_user)])
def get_all(
settings_manager=Depends(get_settings_manager),
settings_service=Depends(get_settings_service),
):
logger.debug("Building langchain types dict")
native_components = build_langchain_types_dict()
# custom_components is a list of dicts
# need to merge all the keys into one dict
custom_components_from_file: dict[str, Any] = {}
if settings_manager.settings.COMPONENTS_PATH:
if settings_service.settings.COMPONENTS_PATH:
logger.info(
f"Building custom components from {settings_manager.settings.COMPONENTS_PATH}"
f"Building custom components from {settings_service.settings.COMPONENTS_PATH}"
)
custom_component_dicts = []
processed_paths = []
for path in settings_manager.settings.COMPONENTS_PATH:
for path in settings_service.settings.COMPONENTS_PATH:
if str(path) in processed_paths:
continue
custom_component_dict = build_langchain_custom_component_list_from_path(
@ -99,7 +100,7 @@ async def process_flow(
tweaks: Optional[dict] = None,
clear_cache: Annotated[bool, Body(embed=True)] = False, # noqa: F821
session_id: Annotated[Union[None, str], Body(embed=True)] = None, # noqa: F821
task_manager: "TaskManager" = Depends(get_task_manager),
task_service: "TaskService" = Depends(get_task_service),
api_key_user: User = Depends(api_key_security),
sync: Annotated[bool, Body(embed=True)] = True, # noqa: F821
):
@ -133,9 +134,9 @@ async def process_flow(
except Exception as exc:
logger.error(f"Error processing tweaks: {exc}")
if sync:
task_id, result = await task_manager.launch_and_await_task(
task_id, result = await task_service.launch_and_await_task(
process_graph_cached_task
if task_manager.use_celery
if task_service.use_celery
else process_graph_cached,
graph_data,
inputs,
@ -145,9 +146,9 @@ async def process_flow(
task_result = result.result
session_id = result.session_id
else:
task_id, task = await task_manager.launch_task(
task_id, task = await task_service.launch_task(
process_graph_cached_task
if task_manager.use_celery
if task_service.use_celery
else process_graph_cached,
graph_data,
inputs,
@ -180,8 +181,8 @@ async def process_flow(
@router.get("/task/{task_id}/status", response_model=TaskStatusResponse)
async def get_task_status(task_id: str):
task_manager = get_task_manager()
task = task_manager.get_task(task_id)
task_service = get_task_service()
task = task_service.get_task(task_id)
if task is None:
raise HTTPException(status_code=404, detail="Task not found")
return TaskStatusResponse(

View file

@ -13,7 +13,7 @@ from langflow.services.database.models.flow import (
)
from langflow.services.database.models.user.user import User
from langflow.services.utils import get_session
from langflow.services.utils import get_settings_manager
from langflow.services.utils import get_settings_service
import orjson
from sqlmodel import Session
from fastapi import APIRouter, Depends, HTTPException
@ -83,7 +83,7 @@ def update_flow(
flow_id: UUID,
flow: FlowUpdate,
current_user: User = Depends(get_current_active_user),
settings_manager=Depends(get_settings_manager),
settings_service=Depends(get_settings_service),
):
"""Update a flow."""
@ -91,7 +91,7 @@ def update_flow(
if not db_flow:
raise HTTPException(status_code=404, detail="Flow not found")
flow_data = flow.dict(exclude_unset=True)
if settings_manager.settings.REMOVE_API_KEYS:
if settings_service.settings.REMOVE_API_KEYS:
flow_data = remove_api_keys(flow_data)
for key, value in flow_data.items():
if value is not None:

View file

@ -12,7 +12,7 @@ from langflow.services.auth.utils import (
get_current_active_user,
)
from langflow.services.utils import get_settings_manager
from langflow.services.utils import get_settings_service
router = APIRouter(tags=["Login"])
@ -35,9 +35,9 @@ async def login_to_get_access_token(
@router.get("/auto_login")
async def auto_login(
db: Session = Depends(get_session), settings_manager=Depends(get_settings_manager)
db: Session = Depends(get_session), settings_service=Depends(get_settings_service)
):
if settings_manager.auth_settings.AUTO_LOGIN:
if settings_service.auth_settings.AUTO_LOGIN:
return create_user_longterm_token(db)
raise HTTPException(

View file

@ -0,0 +1,39 @@
from typing import Optional, Text
from langflow.api.v1.schemas import ChatMessage
from langflow.services.utils import get_chat_service
from langflow import CustomComponent
from anyio.from_thread import start_blocking_portal
from loguru import logger
class ChatOutput(CustomComponent):
display_name = "Chat Output"
description = "Used to send a message to the chat."
field_config = {
"code": {
"show": False,
}
}
def build_config(self):
return {"message": {"input_types": ["Text"]}}
def build(self, message: Optional[Text], is_ai: bool = False) -> Text:
if not message:
return ""
try:
chat_service = get_chat_service()
chat_message = ChatMessage(message=message, is_bot=is_ai)
# send_message is a coroutine
# run in a thread safe manner
with start_blocking_portal() as portal:
portal.call(chat_service.send_message, chat_message)
chat_service.chat_history.add_message(
chat_service.cache_service.current_client_id, chat_message
)
except Exception as exc:
logger.exception(exc)
logger.debug(f"Error sending message to chat: {exc}")
self.repr_value = message
return message

View file

@ -5,7 +5,7 @@ from langchain.agents import types
from langflow.custom.customs import get_custom_nodes
from langflow.interface.agents.custom import CUSTOM_AGENTS
from langflow.interface.base import LangChainTypeCreator
from langflow.services.utils import get_settings_manager
from langflow.services.utils import get_settings_service
from langflow.template.frontend_node.agents import AgentFrontendNode
from loguru import logger
@ -54,7 +54,7 @@ class AgentCreator(LangChainTypeCreator):
# Now this is a generator
def to_list(self) -> List[str]:
names = []
settings_manager = get_settings_manager()
settings_service = get_settings_service()
for _, agent in self.type_to_loader_dict.items():
agent_name = (
agent.function_name()
@ -62,8 +62,8 @@ class AgentCreator(LangChainTypeCreator):
else agent.__name__
)
if (
agent_name in settings_manager.settings.AGENTS
or settings_manager.settings.DEV
agent_name in settings_service.settings.AGENTS
or settings_service.settings.DEV
):
names.append(agent_name)
return names

View file

@ -2,7 +2,7 @@ from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Type, Union
from langchain.chains.base import Chain
from langchain.agents import AgentExecutor
from langflow.services.utils import get_settings_manager
from langflow.services.utils import get_settings_service
from pydantic import BaseModel
from langflow.template.field.base import TemplateField
@ -27,11 +27,11 @@ class LangChainTypeCreator(BaseModel, ABC):
@property
def docs_map(self) -> Dict[str, str]:
"""A dict with the name of the component as key and the documentation link as value."""
settings_manager = get_settings_manager()
settings_service = get_settings_service()
if self.name_docs_dict is None:
try:
type_settings = getattr(
settings_manager.settings, self.type_name.upper()
settings_service.settings, self.type_name.upper()
)
self.name_docs_dict = {
name: value_dict["documentation"]

View file

@ -3,7 +3,7 @@ from typing import Any, Dict, List, Optional, Type
from langflow.custom.customs import get_custom_nodes
from langflow.interface.base import LangChainTypeCreator
from langflow.interface.importing.utils import import_class
from langflow.services.utils import get_settings_manager
from langflow.services.utils import get_settings_service
from langflow.template.frontend_node.chains import ChainFrontendNode
from loguru import logger
@ -31,7 +31,7 @@ class ChainCreator(LangChainTypeCreator):
@property
def type_to_loader_dict(self) -> Dict:
if self.type_dict is None:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
self.type_dict: dict[str, Any] = {
chain_name: import_class(f"langchain.chains.{chain_name}")
for chain_name in chains.__all__
@ -45,8 +45,8 @@ class ChainCreator(LangChainTypeCreator):
self.type_dict = {
name: chain
for name, chain in self.type_dict.items()
if name in settings_manager.settings.CHAINS
or settings_manager.settings.DEV
if name in settings_service.settings.CHAINS
or settings_service.settings.DEV
}
return self.type_dict

View file

@ -4,7 +4,7 @@ from fastapi import HTTPException
from langflow.interface.custom.constants import CUSTOM_COMPONENT_SUPPORTED_TYPES
from langflow.interface.custom.component import Component
from langflow.interface.custom.directory_reader import DirectoryReader
from langflow.services.utils import get_db_manager
from langflow.services.utils import get_db_service
from langflow.interface.custom.utils import extract_inner_type
from langflow.utils import validate
@ -179,8 +179,8 @@ class CustomComponent(Component, extra=Extra.allow):
from langflow.processing.process import build_sorted_vertices
from langflow.processing.process import process_tweaks
db_manager = get_db_manager()
with session_getter(db_manager) as session:
db_service = get_db_service()
with session_getter(db_service) as session:
graph_data = flow.data if (flow := session.get(Flow, flow_id)) else None
if not graph_data:
raise ValueError(f"Flow {flow_id} not found")
@ -193,8 +193,8 @@ class CustomComponent(Component, extra=Extra.allow):
raise ValueError("Session is invalid")
try:
get_session = get_session or session_getter
db_manager = get_db_manager()
with get_session(db_manager) as session:
db_service = get_db_service()
with get_session(db_service) as session:
flows = session.query(Flow).filter(Flow.user_id == self.user_id).all()
return flows
except Exception as e:
@ -209,8 +209,8 @@ class CustomComponent(Component, extra=Extra.allow):
get_session: Optional[Callable] = None,
) -> Flow:
get_session = get_session or session_getter
db_manager = get_db_manager()
with get_session(db_manager) as session:
db_service = get_db_service()
with get_session(db_service) as session:
if flow_id:
flow = session.query(Flow).get(flow_id)
elif flow_name:

View file

@ -1,7 +1,7 @@
from typing import Dict, List, Optional, Type
from langflow.interface.base import LangChainTypeCreator
from langflow.services.utils import get_settings_manager
from langflow.services.utils import get_settings_service
from langflow.template.frontend_node.documentloaders import DocumentLoaderFrontNode
from langflow.interface.custom_lists import documentloaders_type_to_cls_dict
@ -31,12 +31,12 @@ class DocumentLoaderCreator(LangChainTypeCreator):
return None
def to_list(self) -> List[str]:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
return [
documentloader.__name__
for documentloader in self.type_to_loader_dict.values()
if documentloader.__name__ in settings_manager.settings.DOCUMENTLOADERS
or settings_manager.settings.DEV
if documentloader.__name__ in settings_service.settings.DOCUMENTLOADERS
or settings_service.settings.DEV
]

View file

@ -2,7 +2,7 @@ from typing import Dict, List, Optional, Type
from langflow.interface.base import LangChainTypeCreator
from langflow.interface.custom_lists import embedding_type_to_cls_dict
from langflow.services.utils import get_settings_manager
from langflow.services.utils import get_settings_service
from langflow.template.frontend_node.base import FrontendNode
from langflow.template.frontend_node.embeddings import EmbeddingFrontendNode
@ -33,12 +33,12 @@ class EmbeddingCreator(LangChainTypeCreator):
return None
def to_list(self) -> List[str]:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
return [
embedding.__name__
for embedding in self.type_to_loader_dict.values()
if embedding.__name__ in settings_manager.settings.EMBEDDINGS
or settings_manager.settings.DEV
if embedding.__name__ in settings_service.settings.EMBEDDINGS
or settings_service.settings.DEV
]

View file

@ -2,7 +2,7 @@ from typing import Dict, List, Optional, Type
from langflow.interface.base import LangChainTypeCreator
from langflow.interface.custom_lists import llm_type_to_cls_dict
from langflow.services.utils import get_settings_manager
from langflow.services.utils import get_settings_service
from langflow.template.frontend_node.llms import LLMFrontendNode
from loguru import logger
@ -34,12 +34,12 @@ class LLMCreator(LangChainTypeCreator):
return None
def to_list(self) -> List[str]:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
return [
llm.__name__
for llm in self.type_to_loader_dict.values()
if llm.__name__ in settings_manager.settings.LLMS
or settings_manager.settings.DEV
if llm.__name__ in settings_service.settings.LLMS
or settings_service.settings.DEV
]

View file

@ -2,7 +2,7 @@ from typing import Dict, List, Optional, Type
from langflow.interface.base import LangChainTypeCreator
from langflow.interface.custom_lists import memory_type_to_cls_dict
from langflow.services.utils import get_settings_manager
from langflow.services.utils import get_settings_service
from langflow.template.frontend_node.base import FrontendNode
from langflow.template.frontend_node.memories import MemoryFrontendNode
@ -49,12 +49,12 @@ class MemoryCreator(LangChainTypeCreator):
return None
def to_list(self) -> List[str]:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
return [
memory.__name__
for memory in self.type_to_loader_dict.values()
if memory.__name__ in settings_manager.settings.MEMORIES
or settings_manager.settings.DEV
if memory.__name__ in settings_service.settings.MEMORIES
or settings_service.settings.DEV
]

View file

@ -4,7 +4,7 @@ from langchain import output_parsers
from langflow.interface.base import LangChainTypeCreator
from langflow.interface.importing.utils import import_class
from langflow.services.utils import get_settings_manager
from langflow.services.utils import get_settings_service
from langflow.template.frontend_node.output_parsers import OutputParserFrontendNode
from loguru import logger
@ -24,7 +24,7 @@ class OutputParserCreator(LangChainTypeCreator):
@property
def type_to_loader_dict(self) -> Dict:
if self.type_dict is None:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
self.type_dict = {
output_parser_name: import_class(
f"langchain.output_parsers.{output_parser_name}"
@ -35,8 +35,8 @@ class OutputParserCreator(LangChainTypeCreator):
self.type_dict = {
name: output_parser
for name, output_parser in self.type_dict.items()
if name in settings_manager.settings.OUTPUT_PARSERS
or settings_manager.settings.DEV
if name in settings_service.settings.OUTPUT_PARSERS
or settings_service.settings.DEV
}
return self.type_dict

View file

@ -5,7 +5,7 @@ from langchain import prompts
from langflow.custom.customs import get_custom_nodes
from langflow.interface.base import LangChainTypeCreator
from langflow.interface.importing.utils import import_class
from langflow.services.utils import get_settings_manager
from langflow.services.utils import get_settings_service
from langflow.template.frontend_node.prompts import PromptFrontendNode
from loguru import logger
@ -21,7 +21,7 @@ class PromptCreator(LangChainTypeCreator):
@property
def type_to_loader_dict(self) -> Dict:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
if self.type_dict is None:
self.type_dict = {
prompt_name: import_class(f"langchain.prompts.{prompt_name}")
@ -36,8 +36,8 @@ class PromptCreator(LangChainTypeCreator):
self.type_dict = {
name: prompt
for name, prompt in self.type_dict.items()
if name in settings_manager.settings.PROMPTS
or settings_manager.settings.DEV
if name in settings_service.settings.PROMPTS
or settings_service.settings.DEV
}
return self.type_dict

View file

@ -4,7 +4,7 @@ from langchain import retrievers
from langflow.interface.base import LangChainTypeCreator
from langflow.interface.importing.utils import import_class
from langflow.services.utils import get_settings_manager
from langflow.services.utils import get_settings_service
from langflow.template.frontend_node.retrievers import RetrieverFrontendNode
from loguru import logger
@ -49,12 +49,12 @@ class RetrieverCreator(LangChainTypeCreator):
return None
def to_list(self) -> List[str]:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
return [
retriever
for retriever in self.type_to_loader_dict.keys()
if retriever in settings_manager.settings.RETRIEVERS
or settings_manager.settings.DEV
if retriever in settings_service.settings.RETRIEVERS
or settings_service.settings.DEV
]

View file

@ -1,7 +1,7 @@
from typing import Dict, List, Optional, Type
from langflow.interface.base import LangChainTypeCreator
from langflow.services.utils import get_settings_manager
from langflow.services.utils import get_settings_service
from langflow.template.frontend_node.textsplitters import TextSplittersFrontendNode
from langflow.interface.custom_lists import textsplitter_type_to_cls_dict
@ -31,12 +31,12 @@ class TextSplitterCreator(LangChainTypeCreator):
return None
def to_list(self) -> List[str]:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
return [
textsplitter.__name__
for textsplitter in self.type_to_loader_dict.values()
if textsplitter.__name__ in settings_manager.settings.TEXTSPLITTERS
or settings_manager.settings.DEV
if textsplitter.__name__ in settings_service.settings.TEXTSPLITTERS
or settings_service.settings.DEV
]

View file

@ -4,7 +4,7 @@ from langchain.agents import agent_toolkits
from langflow.interface.base import LangChainTypeCreator
from langflow.interface.importing.utils import import_class, import_module
from langflow.services.utils import get_settings_manager
from langflow.services.utils import get_settings_service
from loguru import logger
from langflow.utils.util import build_template_from_class
@ -30,7 +30,7 @@ class ToolkitCreator(LangChainTypeCreator):
@property
def type_to_loader_dict(self) -> Dict:
if self.type_dict is None:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
self.type_dict = {
toolkit_name: import_class(
f"langchain.agents.agent_toolkits.{toolkit_name}"
@ -38,7 +38,7 @@ class ToolkitCreator(LangChainTypeCreator):
# if toolkit_name is not lower case it is a class
for toolkit_name in agent_toolkits.__all__
if not toolkit_name.islower()
and toolkit_name in settings_manager.settings.TOOLKITS
and toolkit_name in settings_service.settings.TOOLKITS
}
return self.type_dict

View file

@ -15,7 +15,7 @@ from langflow.interface.tools.constants import (
OTHER_TOOLS,
)
from langflow.interface.tools.util import get_tool_params
from langflow.services.utils import get_settings_manager
from langflow.services.utils import get_settings_service
from langflow.template.field.base import TemplateField
from langflow.template.template.base import Template
@ -67,7 +67,7 @@ class ToolCreator(LangChainTypeCreator):
@property
def type_to_loader_dict(self) -> Dict:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
if self.tools_dict is None:
all_tools = {}
@ -77,8 +77,8 @@ class ToolCreator(LangChainTypeCreator):
tool_name = tool_params.get("name") or tool
if (
tool_name in settings_manager.settings.TOOLS
or settings_manager.settings.DEV
tool_name in settings_service.settings.TOOLS
or settings_service.settings.DEV
):
if tool_name == "JsonSpec":
tool_params["path"] = tool_params.pop("dict_") # type: ignore

View file

@ -5,7 +5,7 @@ from langchain import SQLDatabase, utilities
from langflow.custom.customs import get_custom_nodes
from langflow.interface.base import LangChainTypeCreator
from langflow.interface.importing.utils import import_class
from langflow.services.utils import get_settings_manager
from langflow.services.utils import get_settings_service
from langflow.template.frontend_node.utilities import UtilitiesFrontendNode
from loguru import logger
@ -27,7 +27,7 @@ class UtilityCreator(LangChainTypeCreator):
from the langchain.chains module and filtering them according to the settings.utilities list.
"""
if self.type_dict is None:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
self.type_dict = {
utility_name: import_class(f"langchain.utilities.{utility_name}")
for utility_name in utilities.__all__
@ -37,8 +37,8 @@ class UtilityCreator(LangChainTypeCreator):
self.type_dict = {
name: utility
for name, utility in self.type_dict.items()
if name in settings_manager.settings.UTILITIES
or settings_manager.settings.DEV
if name in settings_service.settings.UTILITIES
or settings_service.settings.DEV
}
return self.type_dict

View file

@ -10,7 +10,7 @@ from langchain.base_language import BaseLanguageModel
from PIL.Image import Image
from loguru import logger
from langflow.services.chat.config import ChatConfig
from langflow.services.utils import get_settings_manager
from langflow.services.utils import get_settings_service
def load_file_into_dict(file_path: str) -> dict:
@ -64,11 +64,11 @@ def extract_input_variables_from_prompt(prompt: str) -> list[str]:
def setup_llm_caching():
"""Setup LLM caching."""
settings_manager = get_settings_manager()
settings_service = get_settings_service()
try:
set_langchain_cache(settings_manager.settings)
set_langchain_cache(settings_service.settings)
except ImportError:
logger.warning(f"Could not import {settings_manager.settings.CACHE_TYPE}. ")
logger.warning(f"Could not import {settings_service.settings.CACHE_TYPE}. ")
except Exception as exc:
logger.warning(f"Could not setup LLM caching. Error: {exc}")

View file

@ -4,7 +4,7 @@ from langchain import vectorstores
from langflow.interface.base import LangChainTypeCreator
from langflow.interface.importing.utils import import_class
from langflow.services.utils import get_settings_manager
from langflow.services.utils import get_settings_service
from langflow.template.frontend_node.vectorstores import VectorStoreFrontendNode
from loguru import logger
@ -44,12 +44,12 @@ class VectorstoreCreator(LangChainTypeCreator):
return None
def to_list(self) -> List[str]:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
return [
vectorstore
for vectorstore in self.type_to_loader_dict.keys()
if vectorstore in settings_manager.settings.VECTORSTORES
or settings_manager.settings.DEV
if vectorstore in settings_service.settings.VECTORSTORES
or settings_service.settings.DEV
]

View file

@ -7,7 +7,7 @@ from langflow.interface.run import (
get_memory_key,
update_memory_keys,
)
from langflow.services.utils import get_session_manager
from langflow.services.utils import get_session_service
from loguru import logger
from langflow.graph import Graph
from langchain.chains.base import Chain
@ -158,20 +158,20 @@ async def process_graph_cached(
clear_cache=False,
session_id=None,
) -> Result:
session_manager = get_session_manager()
session_service = get_session_service()
if clear_cache:
session_manager.clear_session(session_id)
session_service.clear_session(session_id)
if session_id is None:
session_id = session_manager.generate_key(
session_id = session_service.generate_key(
session_id=session_id, data_graph=data_graph
)
# Load the graph using SessionManager
langchain_object, artifacts = session_manager.load_session(session_id, data_graph)
# Load the graph using SessionService
langchain_object, artifacts = session_service.load_session(session_id, data_graph)
processed_inputs = process_inputs(inputs, artifacts)
result = generate_result(langchain_object, processed_inputs)
# langchain_object is now updated with the new memory
# we need to update the cache with the updated langchain_object
session_manager.update_session(session_id, (langchain_object, artifacts))
session_service.update_session(session_id, (langchain_object, artifacts))
return Result(result, session_id)

View file

@ -1,4 +1,4 @@
from .manager import service_manager
from .manager import service_service
from .schema import ServiceType
__all__ = ["service_manager", "ServiceType"]
__all__ = ["service_service", "ServiceType"]

View file

@ -1,12 +1,12 @@
from langflow.services.factory import ServiceFactory
from langflow.services.auth.service import AuthManager
from langflow.services.auth.service import AuthService
class AuthManagerFactory(ServiceFactory):
name = "auth_manager"
class AuthServiceFactory(ServiceFactory):
name = "auth_service"
def __init__(self):
super().__init__(AuthManager)
super().__init__(AuthService)
def create(self, settings_manager):
return AuthManager(settings_manager)
def create(self, settings_service):
return AuthService(settings_service)

View file

@ -2,11 +2,11 @@ from langflow.services.base import Service
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from langflow.services.settings.manager import SettingsManager
from langflow.services.settings.manager import SettingsService
class AuthManager(Service):
name = "auth_manager"
class AuthService(Service):
name = "auth_service"
def __init__(self, settings_manager: "SettingsManager"):
self.settings_manager = settings_manager
def __init__(self, settings_service: "SettingsService"):
self.settings_service = settings_service

View file

@ -12,7 +12,7 @@ from langflow.services.database.models.user.crud import (
get_user_by_username,
update_user_last_login_at,
)
from langflow.services.utils import get_session, get_settings_manager
from langflow.services.utils import get_session, get_settings_service
from sqlmodel import Session
oauth2_login = OAuth2PasswordBearer(tokenUrl="api/v1/login")
@ -33,18 +33,18 @@ async def api_key_security(
header_param: str = Security(api_key_header),
db: Session = Depends(get_session),
) -> Optional[User]:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
result: Optional[Union[ApiKey, User]] = None
if settings_manager.auth_settings.AUTO_LOGIN:
if settings_service.auth_settings.AUTO_LOGIN:
# Get the first user
if not settings_manager.auth_settings.FIRST_SUPERUSER:
if not settings_service.auth_settings.FIRST_SUPERUSER:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Missing first superuser credentials",
)
result = get_user_by_username(
db, settings_manager.auth_settings.FIRST_SUPERUSER
db, settings_service.auth_settings.FIRST_SUPERUSER
)
elif not query_param and not header_param:
@ -74,7 +74,7 @@ async def get_current_user(
token: Annotated[str, Depends(oauth2_login)],
db: Session = Depends(get_session),
) -> User:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
@ -85,14 +85,14 @@ async def get_current_user(
if isinstance(token, Coroutine):
token = await token
if settings_manager.auth_settings.SECRET_KEY is None:
if settings_service.auth_settings.SECRET_KEY is None:
raise credentials_exception
try:
payload = jwt.decode(
token,
settings_manager.auth_settings.SECRET_KEY,
algorithms=[settings_manager.auth_settings.ALGORITHM],
settings_service.auth_settings.SECRET_KEY,
algorithms=[settings_service.auth_settings.ALGORITHM],
)
user_id: UUID = payload.get("sub") # type: ignore
token_type: str = payload.get("type") # type: ignore
@ -132,19 +132,19 @@ def get_current_active_superuser(
def verify_password(plain_password, hashed_password):
settings_manager = get_settings_manager()
return settings_manager.auth_settings.pwd_context.verify(
settings_service = get_settings_service()
return settings_service.auth_settings.pwd_context.verify(
plain_password, hashed_password
)
def get_password_hash(password):
settings_manager = get_settings_manager()
return settings_manager.auth_settings.pwd_context.hash(password)
settings_service = get_settings_service()
return settings_service.auth_settings.pwd_context.hash(password)
def create_token(data: dict, expires_delta: timedelta):
settings_manager = get_settings_manager()
settings_service = get_settings_service()
to_encode = data.copy()
expire = datetime.now(timezone.utc) + expires_delta
@ -152,8 +152,8 @@ def create_token(data: dict, expires_delta: timedelta):
return jwt.encode(
to_encode,
settings_manager.auth_settings.SECRET_KEY,
algorithm=settings_manager.auth_settings.ALGORITHM,
settings_service.auth_settings.SECRET_KEY,
algorithm=settings_service.auth_settings.ALGORITHM,
)
@ -181,9 +181,9 @@ def create_super_user(
def create_user_longterm_token(db: Session = Depends(get_session)) -> dict:
settings_manager = get_settings_manager()
username = settings_manager.auth_settings.FIRST_SUPERUSER
password = settings_manager.auth_settings.FIRST_SUPERUSER_PASSWORD
settings_service = get_settings_service()
username = settings_service.auth_settings.FIRST_SUPERUSER
password = settings_service.auth_settings.FIRST_SUPERUSER_PASSWORD
if not username or not password:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
@ -227,10 +227,10 @@ def get_user_id_from_token(token: str) -> UUID:
def create_user_tokens(
user_id: UUID, db: Session = Depends(get_session), update_last_login: bool = False
) -> dict:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
access_token_expires = timedelta(
minutes=settings_manager.auth_settings.ACCESS_TOKEN_EXPIRE_MINUTES
minutes=settings_service.auth_settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
access_token = create_token(
data={"sub": str(user_id)},
@ -238,7 +238,7 @@ def create_user_tokens(
)
refresh_token_expires = timedelta(
minutes=settings_manager.auth_settings.REFRESH_TOKEN_EXPIRE_MINUTES
minutes=settings_service.auth_settings.REFRESH_TOKEN_EXPIRE_MINUTES
)
refresh_token = create_token(
data={"sub": str(user_id), "type": "rf"},
@ -257,13 +257,13 @@ def create_user_tokens(
def create_refresh_token(refresh_token: str, db: Session = Depends(get_session)):
settings_manager = get_settings_manager()
settings_service = get_settings_service()
try:
payload = jwt.decode(
refresh_token,
settings_manager.auth_settings.SECRET_KEY,
algorithms=[settings_manager.auth_settings.ALGORITHM],
settings_service.auth_settings.SECRET_KEY,
algorithms=[settings_service.auth_settings.ALGORITHM],
)
user_id: UUID = payload.get("sub") # type: ignore
token_type: str = payload.get("type") # type: ignore

View file

@ -1,12 +1,12 @@
import abc
class BaseCacheManager(abc.ABC):
class BaseCacheService(abc.ABC):
"""
Abstract base class for a cache.
"""
name = "cache_manager"
name = "cache_service"
@abc.abstractmethod
def get(self, key):

View file

@ -1,27 +1,27 @@
from langflow.services.cache.manager import InMemoryCache, RedisCache, BaseCacheManager
from langflow.services.cache.manager import InMemoryCache, RedisCache, BaseCacheService
from langflow.services.factory import ServiceFactory
from langflow.utils.logger import logger
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from langflow.services.settings.manager import SettingsManager
from langflow.services.settings.manager import SettingsService
class CacheManagerFactory(ServiceFactory):
class CacheServiceFactory(ServiceFactory):
def __init__(self):
super().__init__(BaseCacheManager)
super().__init__(BaseCacheService)
def create(self, settings_manager: "SettingsManager"):
# Here you would have logic to create and configure a CacheManager
def create(self, settings_service: "SettingsService"):
# Here you would have logic to create and configure a CacheService
# based on the settings_service
if settings_manager.settings.CACHE_TYPE == "redis":
if settings_service.settings.CACHE_TYPE == "redis":
logger.debug("Creating Redis cache")
redis_cache = RedisCache(
host=settings_manager.settings.REDIS_HOST,
port=settings_manager.settings.REDIS_PORT,
db=settings_manager.settings.REDIS_DB,
expiration_time=settings_manager.settings.REDIS_CACHE_EXPIRE,
host=settings_service.settings.REDIS_HOST,
port=settings_service.settings.REDIS_PORT,
db=settings_service.settings.REDIS_DB,
expiration_time=settings_service.settings.REDIS_CACHE_EXPIRE,
)
if redis_cache.is_connected():
logger.debug("Redis cache is connected")
@ -31,5 +31,5 @@ class CacheManagerFactory(ServiceFactory):
)
return InMemoryCache()
elif settings_manager.settings.CACHE_TYPE == "memory":
elif settings_service.settings.CACHE_TYPE == "memory":
return InMemoryCache()

View file

@ -3,12 +3,12 @@ import time
from collections import OrderedDict
from langflow.services.base import Service
from langflow.services.cache.base import BaseCacheManager
from langflow.services.cache.base import BaseCacheService
import pickle
class InMemoryCache(BaseCacheManager, Service):
class InMemoryCache(BaseCacheService, Service):
"""
A simple in-memory cache using an OrderedDict.
@ -176,7 +176,7 @@ class InMemoryCache(BaseCacheManager, Service):
return f"InMemoryCache(max_size={self.max_size}, expiration_time={self.expiration_time})"
class RedisCache(BaseCacheManager, Service):
class RedisCache(BaseCacheService, Service):
"""
A Redis-based cache implementation.

View file

@ -50,10 +50,10 @@ class AsyncSubject:
await observer()
class CacheManager(Subject, Service):
class CacheService(Subject, Service):
"""Manages cache for different clients and notifies observers on changes."""
name = "cache_manager"
name = "cache_service"
def __init__(self):
super().__init__()
@ -150,4 +150,4 @@ class CacheManager(Subject, Service):
return list(self.current_cache.values())[-1]
cache_manager = CacheManager()
cache_service = CacheService()

View file

@ -1,11 +1,11 @@
from langflow.services.chat.manager import ChatManager
from langflow.services.chat.manager import ChatService
from langflow.services.factory import ServiceFactory
class ChatManagerFactory(ServiceFactory):
class ChatServiceFactory(ServiceFactory):
def __init__(self):
super().__init__(ChatManager)
super().__init__(ChatService)
def create(self):
# Here you would have logic to create and configure a ChatManager
return ChatManager()
# Here you would have logic to create and configure a ChatService
return ChatService()

View file

@ -7,11 +7,11 @@ from langflow.services.chat.cache import Subject
from langflow.services.chat.utils import process_graph
from loguru import logger
from .cache import cache_manager
from .cache import cache_service
import asyncio
from typing import Any, Dict, List
from langflow.services import service_manager, ServiceType
from langflow.services import service_service, ServiceType
import orjson
@ -42,15 +42,15 @@ class ChatHistory(Subject):
self.history[client_id] = []
class ChatManager(Service):
name = "chat_manager"
class ChatService(Service):
name = "chat_service"
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
self.chat_history = ChatHistory()
self.chat_cache = cache_manager
self.chat_cache = cache_service
self.chat_cache.attach(self.update)
self.cache_manager = service_manager.get(ServiceType.CACHE_MANAGER)
self.cache_service = service_service.get(ServiceType.CACHE_MANAGER)
def on_chat_history_update(self):
"""Send the last chat message to the client."""
@ -179,8 +179,8 @@ class ChatManager(Service):
"result": langchain_object,
"type": type(langchain_object),
}
self.cache_manager.upsert(client_id, result_dict)
return client_id in self.cache_manager
self.cache_service.upsert(client_id, result_dict)
return client_id in self.cache_service
async def handle_websocket(self, client_id: str, websocket: WebSocket):
await self.connect(client_id, websocket)
@ -202,7 +202,7 @@ class ChatManager(Service):
continue
with self.chat_cache.set_client_id(client_id):
if langchain_object := self.cache_manager.get(client_id).get(
if langchain_object := self.cache_service.get(client_id).get(
"result"
):
await self.process_message(client_id, payload, langchain_object)

View file

@ -1,17 +1,17 @@
from typing import TYPE_CHECKING
from langflow.services.database.manager import DatabaseManager
from langflow.services.database.manager import DatabaseService
from langflow.services.factory import ServiceFactory
if TYPE_CHECKING:
from langflow.services.settings.manager import SettingsManager
from langflow.services.settings.manager import SettingsService
class DatabaseManagerFactory(ServiceFactory):
class DatabaseServiceFactory(ServiceFactory):
def __init__(self):
super().__init__(DatabaseManager)
super().__init__(DatabaseService)
def create(self, settings_manager: "SettingsManager"):
# Here you would have logic to create and configure a DatabaseManager
if not settings_manager.settings.DATABASE_URL:
def create(self, settings_service: "SettingsService"):
# Here you would have logic to create and configure a DatabaseService
if not settings_service.settings.DATABASE_URL:
raise ValueError("No database URL provided")
return DatabaseManager(settings_manager.settings.DATABASE_URL)
return DatabaseService(settings_service.settings.DATABASE_URL)

View file

@ -3,7 +3,7 @@ from typing import TYPE_CHECKING
from langflow.services.base import Service
from langflow.services.database.models.user.crud import get_user_by_username
from langflow.services.database.utils import Result, TableResults
from langflow.services.utils import get_settings_manager
from langflow.services.utils import get_settings_service
from sqlalchemy import inspect
import sqlalchemy as sa
from sqlmodel import SQLModel, Session, create_engine
@ -16,8 +16,8 @@ if TYPE_CHECKING:
from sqlalchemy.engine import Engine
class DatabaseManager(Service):
name = "database_manager"
class DatabaseService(Service):
name = "database_service"
def __init__(self, database_url: str):
self.database_url = database_url
@ -30,10 +30,10 @@ class DatabaseManager(Service):
def _create_engine(self) -> "Engine":
"""Create the engine for the database."""
settings_manager = get_settings_manager()
settings_service = get_settings_service()
if (
settings_manager.settings.DATABASE_URL
and settings_manager.settings.DATABASE_URL.startswith("sqlite")
settings_service.settings.DATABASE_URL
and settings_service.settings.DATABASE_URL.startswith("sqlite")
):
connect_args = {"check_same_thread": False}
else:
@ -162,12 +162,12 @@ class DatabaseManager(Service):
def teardown(self):
logger.debug("Tearing down database")
try:
settings_manager = get_settings_manager()
settings_service = get_settings_service()
# remove the default superuser if auto_login is enabled
# using the FIRST_SUPERUSER to get the user
if settings_manager.auth_settings.AUTO_LOGIN:
if settings_service.auth_settings.AUTO_LOGIN:
logger.debug("Removing default superuser")
username = settings_manager.auth_settings.FIRST_SUPERUSER
username = settings_service.auth_settings.FIRST_SUPERUSER
with Session(self.engine) as session:
user = get_user_by_username(session, username)
session.delete(user)

View file

@ -6,21 +6,21 @@ from alembic.util.exc import CommandError
from sqlmodel import Session
if TYPE_CHECKING:
from langflow.services.database.manager import DatabaseManager
from langflow.services.database.manager import DatabaseService
def initialize_database():
logger.debug("Initializing database")
from langflow.services import service_manager, ServiceType
from langflow.services import service_service, ServiceType
database_manager = service_manager.get(ServiceType.DATABASE_MANAGER)
database_service = service_service.get(ServiceType.DATABASE_MANAGER)
try:
database_manager.check_schema_health()
database_service.check_schema_health()
except Exception as exc:
logger.error(f"Error checking schema health: {exc}")
raise RuntimeError("Error checking schema health") from exc
try:
database_manager.run_migrations()
database_service.run_migrations()
except CommandError as exc:
if "Can't locate revision identified by" not in str(exc):
raise exc
@ -30,23 +30,23 @@ def initialize_database():
logger.warning(
"Wrong revision in DB, deleting alembic_version table and running migrations again"
)
with session_getter(database_manager) as session:
with session_getter(database_service) as session:
session.execute("DROP TABLE alembic_version")
database_manager.run_migrations()
database_service.run_migrations()
except Exception as exc:
# if the exception involves tables already existing
# we can ignore it
if "already exists" not in str(exc):
logger.error(f"Error running migrations: {exc}")
raise RuntimeError("Error running migrations") from exc
database_manager.create_db_and_tables()
database_service.create_db_and_tables()
logger.debug("Database initialized")
@contextmanager
def session_getter(db_manager: "DatabaseManager"):
def session_getter(db_service: "DatabaseService"):
try:
session = Session(db_manager.engine)
session = Session(db_service.engine)
yield session
except Exception as e:
print("Session rollback because of exception:", e)

View file

@ -7,7 +7,7 @@ if TYPE_CHECKING:
from langflow.services.base import Service
class ServiceManager:
class ServiceService:
"""
Manages the creation of different services.
"""
@ -95,7 +95,7 @@ class ServiceManager:
self.dependencies = {}
service_manager = ServiceManager()
service_service = ServiceService()
def initialize_services():
@ -106,67 +106,67 @@ def initialize_services():
from langflow.services.cache import factory as cache_factory
from langflow.services.chat import factory as chat_factory
from langflow.services.settings import factory as settings_factory
from langflow.services.session import factory as session_manager_factory
from langflow.services.session import factory as session_service_factory
from langflow.services.auth import factory as auth_factory
from langflow.services.task import factory as task_factory
service_manager.register_factory(settings_factory.SettingsManagerFactory())
service_manager.register_factory(
database_factory.DatabaseManagerFactory(),
service_service.register_factory(settings_factory.SettingsServiceFactory())
service_service.register_factory(
database_factory.DatabaseServiceFactory(),
dependencies=[ServiceType.SETTINGS_MANAGER],
)
service_manager.register_factory(
cache_factory.CacheManagerFactory(), dependencies=[ServiceType.SETTINGS_MANAGER]
service_service.register_factory(
cache_factory.CacheServiceFactory(), dependencies=[ServiceType.SETTINGS_MANAGER]
)
service_manager.register_factory(
auth_factory.AuthManagerFactory(), dependencies=[ServiceType.SETTINGS_MANAGER]
service_service.register_factory(
auth_factory.AuthServiceFactory(), dependencies=[ServiceType.SETTINGS_MANAGER]
)
service_manager.register_factory(chat_factory.ChatManagerFactory())
service_manager.register_factory(
session_manager_factory.SessionManagerFactory(),
service_service.register_factory(chat_factory.ChatServiceFactory())
service_service.register_factory(
session_service_factory.SessionServiceFactory(),
dependencies=[ServiceType.CACHE_MANAGER],
)
service_manager.register_factory(
task_factory.TaskManagerFactory(),
service_service.register_factory(
task_factory.TaskServiceFactory(),
)
# Test cache connection
service_manager.get(ServiceType.CACHE_MANAGER)
service_service.get(ServiceType.CACHE_MANAGER)
# Test database connection
service_manager.get(ServiceType.DATABASE_MANAGER)
service_service.get(ServiceType.DATABASE_MANAGER)
# Test cache connection
service_manager.get(ServiceType.CACHE_MANAGER)
service_service.get(ServiceType.CACHE_MANAGER)
# Test database connection
service_manager.get(ServiceType.DATABASE_MANAGER)
service_service.get(ServiceType.DATABASE_MANAGER)
def initialize_settings_manager():
def initialize_settings_service():
"""
Initialize the settings manager.
"""
from langflow.services.settings import factory as settings_factory
service_manager.register_factory(settings_factory.SettingsManagerFactory())
service_service.register_factory(settings_factory.SettingsServiceFactory())
def initialize_session_manager():
def initialize_session_service():
"""
Initialize the session manager.
"""
from langflow.services.session import factory as session_manager_factory # type: ignore
from langflow.services.session import factory as session_service_factory # type: ignore
from langflow.services.cache import factory as cache_factory
initialize_settings_manager()
initialize_settings_service()
service_manager.register_factory(
cache_factory.CacheManagerFactory(), dependencies=[ServiceType.SETTINGS_MANAGER]
service_service.register_factory(
cache_factory.CacheServiceFactory(), dependencies=[ServiceType.SETTINGS_MANAGER]
)
service_manager.register_factory(
session_manager_factory.SessionManagerFactory(),
service_service.register_factory(
session_service_factory.SessionServiceFactory(),
dependencies=[ServiceType.CACHE_MANAGER],
)
@ -175,4 +175,4 @@ def teardown_services():
"""
Teardown all the services.
"""
service_manager.teardown()
service_service.teardown()

View file

@ -7,10 +7,10 @@ class ServiceType(str, Enum):
registered with the service manager.
"""
AUTH_MANAGER = "auth_manager"
CACHE_MANAGER = "cache_manager"
SETTINGS_MANAGER = "settings_manager"
DATABASE_MANAGER = "database_manager"
CHAT_MANAGER = "chat_manager"
SESSION_MANAGER = "session_manager"
TASK_MANAGER = "task_manager"
AUTH_MANAGER = "auth_service"
CACHE_MANAGER = "cache_service"
SETTINGS_MANAGER = "settings_service"
DATABASE_MANAGER = "database_service"
CHAT_MANAGER = "chat_service"
SESSION_MANAGER = "session_service"
TASK_MANAGER = "task_service"

View file

@ -1,14 +1,14 @@
from typing import TYPE_CHECKING
from langflow.services.session.manager import SessionManager
from langflow.services.session.manager import SessionService
from langflow.services.factory import ServiceFactory
if TYPE_CHECKING:
from langflow.services.cache.manager import BaseCacheManager
from langflow.services.cache.manager import BaseCacheService
class SessionManagerFactory(ServiceFactory):
class SessionServiceFactory(ServiceFactory):
def __init__(self):
super().__init__(SessionManager)
super().__init__(SessionService)
def create(self, cache_manager: "BaseCacheManager"):
return SessionManager(cache_manager)
def create(self, cache_service: "BaseCacheService"):
return SessionService(cache_service)

View file

@ -6,23 +6,23 @@ from langflow.services.cache.utils import compute_dict_hash
from langflow.services.session.utils import session_id_generator
if TYPE_CHECKING:
from langflow.services.cache.base import BaseCacheManager
from langflow.services.cache.base import BaseCacheService
class SessionManager(Service):
name = "session_manager"
class SessionService(Service):
name = "session_service"
def __init__(self, cache_manager):
self.cache_manager: "BaseCacheManager" = cache_manager
def __init__(self, cache_service):
self.cache_service: "BaseCacheService" = cache_service
def load_session(self, key, data_graph):
# Check if the data is cached
if key in self.cache_manager:
return self.cache_manager.get(key)
if key in self.cache_service:
return self.cache_service.get(key)
# If not cached, build the graph and cache it
graph, artifacts = build_sorted_vertices(data_graph)
self.cache_manager.set(key, (graph, artifacts))
self.cache_service.set(key, (graph, artifacts))
return graph, artifacts
@ -38,7 +38,7 @@ class SessionManager(Service):
return self.build_key(session_id, data_graph=data_graph)
def update_session(self, session_id, value):
self.cache_manager.set(session_id, value)
self.cache_service.set(session_id, value)
def clear_session(self, session_id):
self.cache_manager.delete(session_id)
self.cache_service.delete(session_id)

View file

@ -1,15 +1,15 @@
from pathlib import Path
from langflow.services.settings.manager import SettingsManager
from langflow.services.settings.manager import SettingsService
from langflow.services.factory import ServiceFactory
class SettingsManagerFactory(ServiceFactory):
class SettingsServiceFactory(ServiceFactory):
def __init__(self):
super().__init__(SettingsManager)
super().__init__(SettingsService)
def create(self):
# Here you would have logic to create and configure a SettingsManager
# Here you would have logic to create and configure a SettingsService
langflow_dir = Path(__file__).parent.parent.parent
return SettingsManager.load_settings_from_yaml(
return SettingsService.load_settings_from_yaml(
str(langflow_dir / "config.yaml")
)

View file

@ -6,8 +6,8 @@ import os
import yaml
class SettingsManager(Service):
name = "settings_manager"
class SettingsService(Service):
name = "settings_service"
def __init__(self, settings: Settings, auth_settings: AuthSettings):
super().__init__()
@ -15,7 +15,7 @@ class SettingsManager(Service):
self.auth_settings = auth_settings
@classmethod
def load_settings_from_yaml(cls, file_path: str) -> "SettingsManager":
def load_settings_from_yaml(cls, file_path: str) -> "SettingsService":
# Check if a string is a valid path or a file name
if "/" not in file_path:
# Get current path

View file

@ -1,11 +1,11 @@
from langflow.services.task.manager import TaskManager
from langflow.services.task.manager import TaskService
from langflow.services.factory import ServiceFactory
class TaskManagerFactory(ServiceFactory):
class TaskServiceFactory(ServiceFactory):
def __init__(self):
super().__init__(TaskManager)
super().__init__(TaskService)
def create(self):
# Here you would have logic to create and configure a TaskManager
return TaskManager()
# Here you would have logic to create and configure a TaskService
return TaskService()

View file

@ -19,8 +19,8 @@ except ImportError:
USE_CELERY = False
class TaskManager(Service):
name = "task_manager"
class TaskService(Service):
name = "task_service"
def __init__(self):
self.backend = self.get_backend()
@ -33,7 +33,7 @@ class TaskManager(Service):
return CeleryBackend()
return AnyIOBackend()
# In your TaskManager class
# In your TaskService class
async def launch_and_await_task(
self,
task_func: Callable[..., Any],

View file

@ -1,41 +1,41 @@
from langflow.services import ServiceType, service_manager
from langflow.services import ServiceType, service_service
from typing import TYPE_CHECKING, Generator
if TYPE_CHECKING:
from langflow.services.database.manager import DatabaseManager
from langflow.services.settings.manager import SettingsManager
from langflow.services.cache.manager import BaseCacheManager
from langflow.services.session.manager import SessionManager
from langflow.services.task.manager import TaskManager
from langflow.services.chat.manager import ChatManager
from langflow.services.database.manager import DatabaseService
from langflow.services.settings.manager import SettingsService
from langflow.services.cache.manager import BaseCacheService
from langflow.services.session.manager import SessionService
from langflow.services.task.manager import TaskService
from langflow.services.chat.manager import ChatService
from sqlmodel import Session
def get_settings_manager() -> "SettingsManager":
return service_manager.get(ServiceType.SETTINGS_MANAGER)
def get_settings_service() -> "SettingsService":
return service_service.get(ServiceType.SETTINGS_MANAGER)
def get_db_manager() -> "DatabaseManager":
return service_manager.get(ServiceType.DATABASE_MANAGER)
def get_db_service() -> "DatabaseService":
return service_service.get(ServiceType.DATABASE_MANAGER)
def get_session() -> Generator["Session", None, None]:
db_manager = service_manager.get(ServiceType.DATABASE_MANAGER)
yield from db_manager.get_session()
db_service = service_service.get(ServiceType.DATABASE_MANAGER)
yield from db_service.get_session()
def get_cache_manager() -> "BaseCacheManager":
return service_manager.get(ServiceType.CACHE_MANAGER)
def get_cache_service() -> "BaseCacheService":
return service_service.get(ServiceType.CACHE_MANAGER)
def get_session_manager() -> "SessionManager":
return service_manager.get(ServiceType.SESSION_MANAGER)
def get_session_service() -> "SessionService":
return service_service.get(ServiceType.SESSION_MANAGER)
def get_task_manager() -> "TaskManager":
return service_manager.get(ServiceType.TASK_MANAGER)
def get_task_service() -> "TaskService":
return service_service.get(ServiceType.TASK_MANAGER)
def get_chat_manager() -> "ChatManager":
return service_manager.get(ServiceType.CHAT_MANAGER)
def get_chat_service() -> "ChatService":
return service_service.get(ServiceType.CHAT_MANAGER)

View file

@ -30,7 +30,7 @@ def build_template_from_function(
variables = {"_type": _type}
for class_field_items, value in _class.__fields__.items():
if class_field_items in ["callback_manager"]:
if class_field_items in ["callback_service"]:
continue
variables[class_field_items] = {}
for name_, value_ in value.__repr_args__():
@ -84,7 +84,7 @@ def build_template_from_class(
if "__fields__" in _class.__dict__:
for class_field_items, value in _class.__fields__.items():
if class_field_items in ["callback_manager"]:
if class_field_items in ["callback_service"]:
continue
variables[class_field_items] = {}
for name_, value_ in value.__repr_args__():

View file

@ -6,8 +6,8 @@ from langflow.processing.process import (
generate_result,
process_inputs,
)
from langflow.services.manager import initialize_session_manager
from langflow.services.utils import get_session_manager
from langflow.services.manager import initialize_session_service
from langflow.services.utils import get_session_service
if TYPE_CHECKING:
from langflow.graph.vertex.base import Vertex
@ -40,16 +40,16 @@ def process_graph_cached_task(
clear_cache=False,
session_id=None,
) -> Tuple[Any, str]:
initialize_session_manager()
session_manager = get_session_manager()
initialize_session_service()
session_service = get_session_service()
if clear_cache:
session_manager.clear_session(session_id)
# Load the graph using SessionManager
langchain_object, artifacts = session_manager.load_session(session_id, data_graph)
session_service.clear_session(session_id)
# Load the graph using SessionService
langchain_object, artifacts = session_service.load_session(session_id, data_graph)
processed_inputs = process_inputs(inputs, artifacts)
result = generate_result(langchain_object, processed_inputs)
# langchain_object is now updated with the new memory
# we need to update the cache with the updated langchain_object
session_manager.update_session(session_id, (langchain_object, artifacts))
session_service.update_session(session_id, (langchain_object, artifacts))
return result, session_id

View file

@ -17,7 +17,7 @@ from sqlmodel.pool import StaticPool
from typer.testing import CliRunner
if TYPE_CHECKING:
from langflow.services.database.manager import DatabaseManager
from langflow.services.database.manager import DatabaseService
def pytest_configure():
@ -158,8 +158,8 @@ def session_getter_fixture(client):
SQLModel.metadata.create_all(engine)
@contextmanager
def blank_session_getter(db_manager: "DatabaseManager"):
with Session(db_manager.engine) as session:
def blank_session_getter(db_service: "DatabaseService"):
with Session(db_service.engine) as session:
yield session
yield blank_session_getter

View file

@ -31,15 +31,15 @@ def test_zero_shot_agent(client: TestClient, logged_in_headers):
}
# Additional assertions for other template variables
assert template["callback_manager"] == {
assert template["callback_service"] == {
"required": False,
"dynamic": False,
"placeholder": "",
"show": False,
"multiline": False,
"password": False,
"name": "callback_manager",
"type": "BaseCallbackManager",
"name": "callback_service",
"type": "BaseCallbackService",
"list": False,
"advanced": False,
"info": "",

View file

@ -2,81 +2,81 @@ from io import StringIO
import pandas as pd
import pytest
from langflow.services.chat.cache import CacheManager
from langflow.services.chat.cache import CacheService
from PIL import Image
@pytest.fixture
def cache_manager():
return CacheManager()
def cache_service():
return CacheService()
def test_cache_manager_attach_detach_notify(cache_manager):
def test_cache_service_attach_detach_notify(cache_service):
observer_called = False
def observer():
nonlocal observer_called
observer_called = True
cache_manager.attach(observer)
cache_manager.notify()
cache_service.attach(observer)
cache_service.notify()
assert observer_called
observer_called = False
cache_manager.detach(observer)
cache_manager.notify()
cache_service.detach(observer)
cache_service.notify()
assert not observer_called
def test_cache_manager_client_context(cache_manager):
with cache_manager.set_client_id("client1"):
cache_manager.add("foo", "bar", "string")
assert cache_manager.get("foo") == {
def test_cache_service_client_context(cache_service):
with cache_service.set_client_id("client1"):
cache_service.add("foo", "bar", "string")
assert cache_service.get("foo") == {
"obj": "bar",
"type": "string",
"extension": "str",
}
with cache_manager.set_client_id("client2"):
cache_manager.add("baz", "qux", "string")
assert cache_manager.get("baz") == {
with cache_service.set_client_id("client2"):
cache_service.add("baz", "qux", "string")
assert cache_service.get("baz") == {
"obj": "qux",
"type": "string",
"extension": "str",
}
with pytest.raises(KeyError):
cache_manager.get("foo")
cache_service.get("foo")
def test_cache_manager_add_pandas(cache_manager):
def test_cache_service_add_pandas(cache_service):
df = pd.DataFrame({"col1": [1, 2], "col2": [3, 4]})
with cache_manager.set_client_id("client1"):
cache_manager.add_pandas("test_df", df)
cached_df = cache_manager.get("test_df")
with cache_service.set_client_id("client1"):
cache_service.add_pandas("test_df", df)
cached_df = cache_service.get("test_df")
assert cached_df["type"] == "pandas"
assert cached_df["extension"] == "csv"
read_df = pd.read_csv(StringIO(cached_df["obj"]), index_col=0)
pd.testing.assert_frame_equal(df, read_df)
def test_cache_manager_add_image(cache_manager):
def test_cache_service_add_image(cache_service):
img = Image.new("RGB", (50, 50), color="red")
with cache_manager.set_client_id("client1"):
cache_manager.add_image("test_image", img)
cached_img = cache_manager.get("test_image")
with cache_service.set_client_id("client1"):
cache_service.add_image("test_image", img)
cached_img = cache_service.get("test_image")
assert cached_img["type"] == "image"
assert cached_img["extension"] == "png"
assert isinstance(cached_img["obj"], Image.Image)
def test_cache_manager_get_last(cache_manager):
with cache_manager.set_client_id("client1"):
cache_manager.add("foo", "bar", "string")
cache_manager.add("baz", "qux", "string")
last_item = cache_manager.get_last()
def test_cache_service_get_last(cache_service):
with cache_service.set_client_id("client1"):
cache_service.add("foo", "bar", "string")
cache_service.add("baz", "qux", "string")
last_item = cache_service.get_last()
assert last_item == {"obj": "qux", "type": "string", "extension": "str"}

View file

@ -26,8 +26,8 @@ def test_components_path(runner, client, default_settings):
["run", "--components-path", str(temp_dir), *default_settings],
)
assert result.exit_code == 0, result.stdout
settings_manager = utils.get_settings_manager()
assert str(temp_dir) in settings_manager.settings.COMPONENTS_PATH
settings_service = utils.get_settings_service()
assert str(temp_dir) in settings_service.settings.COMPONENTS_PATH
def test_superuser(runner, client, session):

View file

@ -2,7 +2,7 @@ import uuid
from langflow.processing.process import Result
from langflow.services.auth.utils import get_password_hash
from langflow.services.database.models.api_key.api_key import ApiKey
from langflow.services.utils import get_settings_manager
from langflow.services.utils import get_settings_service
import pytest
from fastapi.testclient import TestClient
from langflow.interface.tools.constants import CUSTOM_TOOLS
@ -111,8 +111,8 @@ def test_process_flow_invalid_api_key(client, flow, monkeypatch):
from langflow.api.v1 import endpoints
from langflow.services.database.models.api_key import crud
settings_manager = get_settings_manager()
settings_manager.auth_settings.AUTO_LOGIN = False
settings_service = get_settings_service()
settings_service.auth_settings.AUTO_LOGIN = False
async def mock_process_graph_cached(*args, **kwargs):
return Result(result={}, session_id="session_id_mock")
@ -170,8 +170,8 @@ def test_process_flow_without_autologin(client, flow, monkeypatch, created_api_k
from langflow.api.v1 import endpoints
from langflow.services.database.models.api_key import crud
settings_manager = get_settings_manager()
settings_manager.auth_settings.AUTO_LOGIN = False
settings_service = get_settings_service()
settings_service.auth_settings.AUTO_LOGIN = False
async def mock_process_graph_cached(*args, **kwargs):
return Result(result={}, session_id="session_id_mock")
@ -208,8 +208,8 @@ def test_process_flow_fails_autologin_off(client, flow, monkeypatch):
from langflow.api.v1 import endpoints
from langflow.services.database.models.api_key import crud
settings_manager = get_settings_manager()
settings_manager.auth_settings.AUTO_LOGIN = False
settings_service = get_settings_service()
settings_service.auth_settings.AUTO_LOGIN = False
async def mock_process_graph_cached(*args, **kwargs):
return Result(result={}, session_id="session_id_mock")

View file

@ -1,14 +1,14 @@
from fastapi.testclient import TestClient
from langflow.services.utils import get_settings_manager
from langflow.services.utils import get_settings_service
def test_llms_settings(client: TestClient, logged_in_headers):
settings_manager = get_settings_manager()
settings_service = get_settings_service()
response = client.get("api/v1/all", headers=logged_in_headers)
assert response.status_code == 200
json_response = response.json()
llms = json_response["llms"]
assert set(llms.keys()) == set(settings_manager.settings.LLMS)
assert set(llms.keys()) == set(settings_service.settings.LLMS)
# def test_hugging_face_hub(client: TestClient):

View file

@ -1,5 +1,5 @@
from langflow.processing.process import process_tweaks
from langflow.services.utils import get_session_manager
from langflow.services.utils import get_session_service
def test_no_tweaks():
@ -199,13 +199,13 @@ def test_tweak_not_in_template():
def test_load_langchain_object_with_cached_session(client, basic_graph_data):
# Provide a non-existent session_id
session_manager = get_session_manager()
session_service = get_session_service()
session_id1 = "non-existent-session-id"
langchain_object1, artifacts1 = session_manager.load_session(
langchain_object1, artifacts1 = session_service.load_session(
session_id1, basic_graph_data
)
# Use the new session_id to get the langchain_object again
langchain_object2, artifacts2 = session_manager.load_session(
langchain_object2, artifacts2 = session_service.load_session(
session_id1, basic_graph_data
)
@ -215,16 +215,16 @@ def test_load_langchain_object_with_cached_session(client, basic_graph_data):
def test_load_langchain_object_with_no_cached_session(client, basic_graph_data):
# Provide a non-existent session_id
session_manager = get_session_manager()
session_service = get_session_service()
session_id1 = "non-existent-session-id"
session_id = session_manager.build_key(session_id1, basic_graph_data)
langchain_object1, artifacts1 = session_manager.load_session(
session_id = session_service.build_key(session_id1, basic_graph_data)
langchain_object1, artifacts1 = session_service.load_session(
session_id, basic_graph_data
)
# Clear the cache
session_manager.clear_session(session_id)
session_service.clear_session(session_id)
# Use the new session_id to get the langchain_object again
langchain_object2, artifacts2 = session_manager.load_session(
langchain_object2, artifacts2 = session_service.load_session(
session_id, basic_graph_data
)
@ -235,13 +235,13 @@ def test_load_langchain_object_with_no_cached_session(client, basic_graph_data):
def test_load_langchain_object_without_session_id(client, basic_graph_data):
# Provide a non-existent session_id
session_manager = get_session_manager()
session_service = get_session_service()
session_id1 = None
langchain_object1, artifacts1 = session_manager.load_session(
langchain_object1, artifacts1 = session_service.load_session(
session_id1, basic_graph_data
)
# Use the new session_id to get the langchain_object again
langchain_object2, artifacts2 = session_manager.load_session(
langchain_object2, artifacts2 = session_service.load_session(
session_id1, basic_graph_data
)

View file

@ -1,14 +1,14 @@
from fastapi.testclient import TestClient
from langflow.services.utils import get_settings_manager
from langflow.services.utils import get_settings_service
def test_prompts_settings(client: TestClient, logged_in_headers):
settings_manager = get_settings_manager()
settings_service = get_settings_service()
response = client.get("api/v1/all", headers=logged_in_headers)
assert response.status_code == 200
json_response = response.json()
prompts = json_response["prompts"]
assert set(prompts.keys()) == set(settings_manager.settings.PROMPTS)
assert set(prompts.keys()) == set(settings_service.settings.PROMPTS)
def test_prompt_template(client: TestClient, logged_in_headers):

View file

@ -2,15 +2,15 @@ from datetime import datetime
from langflow.services.auth.utils import create_super_user, get_password_hash
from langflow.services.database.models.user.user import User
from langflow.services.utils import get_settings_manager
from langflow.services.utils import get_settings_service
import pytest
from langflow.services.database.models.user import UserUpdate
@pytest.fixture
def super_user(client, session):
settings_manager = get_settings_manager()
auth_settings = settings_manager.auth_settings
settings_service = get_settings_service()
auth_settings = settings_service.auth_settings
return create_super_user(
db=session,
username=auth_settings.FIRST_SUPERUSER,
@ -20,8 +20,8 @@ def super_user(client, session):
@pytest.fixture
def super_user_headers(client, super_user):
settings_manager = get_settings_manager()
auth_settings = settings_manager.auth_settings
settings_service = get_settings_service()
auth_settings = settings_service.auth_settings
login_data = {
"username": auth_settings.FIRST_SUPERUSER,
"password": auth_settings.FIRST_SUPERUSER_PASSWORD,

View file

@ -1,14 +1,14 @@
from fastapi.testclient import TestClient
from langflow.services.utils import get_settings_manager
from langflow.services.utils import get_settings_service
# check that all agents are in settings.agents
# are in json_response["agents"]
def test_vectorstores_settings(client: TestClient, logged_in_headers):
settings_manager = get_settings_manager()
settings_service = get_settings_service()
response = client.get("api/v1/all", headers=logged_in_headers)
assert response.status_code == 200
json_response = response.json()
vectorstores = json_response["vectorstores"]
settings_vecs = set(settings_manager.settings.VECTORSTORES)
settings_vecs = set(settings_service.settings.VECTORSTORES)
assert all(vs in vectorstores for vs in settings_vecs)

View file

@ -1,7 +1,7 @@
from fastapi import WebSocketDisconnect
from fastapi.testclient import TestClient
# from langflow.services.chat.manager import ChatManager
# from langflow.services.chat.manager import ChatService
import pytest
@ -28,7 +28,7 @@ def test_init_build(client, active_user, logged_in_headers):
def test_websocket_endpoint(client: TestClient, active_user, logged_in_headers):
# Assuming your websocket_endpoint uses chat_manager which caches data from stream_build
# Assuming your websocket_endpoint uses chat_service which caches data from stream_build
access_token = logged_in_headers["Authorization"].split(" ")[1]
with pytest.raises(WebSocketDisconnect):
with client.websocket_connect(
@ -40,12 +40,12 @@ def test_websocket_endpoint(client: TestClient, active_user, logged_in_headers):
def test_websocket_endpoint_after_build(client, basic_graph_data):
# Assuming your websocket_endpoint uses chat_manager which caches data from stream_build
# Assuming your websocket_endpoint uses chat_service which caches data from stream_build
client.post("api/v1/build/init", json=basic_graph_data)
client.get("api/v1/build/stream/websocket_test")
# There should be more to test here, but it depends on the inner workings of your websocket handler
# and how your chat_manager and other classes behave. The following is just an example structure.
# and how your chat_service and other classes behave. The following is just an example structure.
with pytest.raises(WebSocketDisconnect):
with client.websocket_connect("api/v1/chat/websocket_test") as websocket:
websocket.send_json({"input": "test"})