Update package versions, workflows, LLMChain and Graph sorting (#1674)

* Update package versions in pyproject.toml and poetry.lock files

* Update poetry caching action and workflows

* Update poetry caching action and workflows

* Refactor LLMChainComponent build method in LLMChain.py

* Update poetry install command in Makefile

* Refactor Makefile to remove redundant install_backend targets

* Fix codespell issues in project

* Update package versions and dependencies

* Fix import order in chat_io.spec.ts, headerComponent/index.tsx, and chatMessage/index.tsx

* Update ruff command in Makefile and fix poetry cache reuse in Dockerfile

* Refactor ServiceManager class in manager.py to handle default service factories

* Fix typo in DOWNLOAD_WEBHOOK_URL variable assignment

* Refactor cache_service tests in test_cache_manager.py

* Add pytest-profiling

* Update Makefile to run unit tests with parallel execution

* Refactor ServiceManager class in manager.py to handle default service factories

* Refactor node_name condition in Graph class to use "Listen" instead of "GetNotified"

* Refactor file paths in tests/conftest.py for better readability and maintainability

* Sort vertices in each layer by dependency in Graph class

* Refactor variable declaration in SessionService class to use type hinting

* Refactor make tests command in python_test.yml workflow

* Refactor file paths in tests/conftest.py for better readability and maintainability

* Refactor imports in tests/conftest.py to include sqlmodel.Session and related dependencies

* Refactor file paths in tests/conftest.py to include available files in error message

* Refactor file paths in tests/conftest.py to include available files in error message

* Refactor file paths in tests/conftest.py to fix typo in BasicChatwithPromptAndHistory.json
This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-04-11 12:20:56 -03:00 committed by GitHub
commit e582535bb0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 396 additions and 286 deletions

View file

@ -5,6 +5,7 @@ Revises: 1a110b568907
Create Date: 2024-04-10 19:17:22.820455
"""
from typing import Sequence, Union
import sqlalchemy as sa

View file

@ -1,9 +1,10 @@
from typing import Optional
from langchain.chains import LLMChain
from langchain.chains.llm import LLMChain
from langflow.field_typing import BaseLanguageModel, BaseMemory, BasePromptTemplate, Text
from langflow.field_typing import BaseLanguageModel, BaseMemory, Text
from langflow.interface.custom.custom_component import CustomComponent
from langchain_core.prompts import PromptTemplate
class LLMChainComponent(CustomComponent):
@ -19,10 +20,11 @@ class LLMChainComponent(CustomComponent):
def build(
self,
prompt: BasePromptTemplate,
template: Text,
llm: BaseLanguageModel,
memory: Optional[BaseMemory] = None,
) -> Text:
prompt = PromptTemplate.from_template(template)
runnable = LLMChain(prompt=prompt, llm=llm, memory=memory)
result_dict = runnable.invoke({})
output_key = runnable.output_key

View file

@ -918,7 +918,7 @@ class Graph:
return ChatVertex
elif node_name in ["ShouldRunNext"]:
return RoutingVertex
elif node_name in ["SharedState", "Notify", "GetNotified"]:
elif node_name in ["SharedState", "Notify", "Listen"]:
return StateVertex
elif node_base_type in lazy_load_vertex_dict.VERTEX_TYPE_MAP:
return lazy_load_vertex_dict.VERTEX_TYPE_MAP[node_base_type]
@ -1130,6 +1130,34 @@ class Graph:
return vertices_layers
def sort_layer_by_dependency(self, vertices_layers: List[List[str]]) -> List[List[str]]:
"""Sorts the vertices in each layer by dependency, ensuring no vertex depends on a subsequent vertex."""
sorted_layers = []
for layer in vertices_layers:
sorted_layer = self._sort_single_layer_by_dependency(layer)
sorted_layers.append(sorted_layer)
return sorted_layers
def _sort_single_layer_by_dependency(self, layer: List[str]) -> List[str]:
"""Sorts a single layer by dependency using a stable sorting method."""
# Build a map of each vertex to its index in the layer for quick lookup.
index_map = {vertex: index for index, vertex in enumerate(layer)}
# Create a sorted copy of the layer based on dependency order.
sorted_layer = sorted(layer, key=lambda vertex: self._max_dependency_index(vertex, index_map), reverse=True)
return sorted_layer
def _max_dependency_index(self, vertex_id: str, index_map: Dict[str, int]) -> int:
"""Finds the highest index a given vertex's dependencies occupy in the same layer."""
vertex = self.get_vertex(vertex_id)
max_index = -1
for successor in vertex.successors: # Assuming vertex.successors is a list of successor vertex identifiers.
if successor.id in index_map:
max_index = max(max_index, index_map[successor.id])
return max_index
def sort_vertices(
self,
stop_component_id: Optional[str] = None,
@ -1151,6 +1179,9 @@ class Graph:
vertices_layers = self.layered_topological_sort(vertices)
vertices_layers = self.sort_by_avg_build_time(vertices_layers)
# vertices_layers = self.sort_chat_inputs_first(vertices_layers)
# Now we should sort each layer in a way that we make sure
# vertex V does not depend on vertex V+1
vertices_layers = self.sort_layer_by_dependency(vertices_layers)
self.increment_run_count()
self._sorted_vertices_layers = vertices_layers
first_layer = vertices_layers[0]

View file

@ -5,17 +5,6 @@ from loguru import logger
from langflow.graph import Graph
async def build_sorted_vertices(data_graph, flow_id: str) -> Tuple[Graph, Dict]:
"""
Build langchain object from data_graph.
"""
logger.debug("Building langchain object")
graph = Graph.from_payload(data_graph, flow_id=flow_id)
return graph, {}
def get_memory_key(langchain_object):
"""
Given a LangChain object, this function retrieves the current memory key from the object's memory attribute.

View file

@ -1,7 +1,7 @@
from contextlib import contextmanager
from typing import TYPE_CHECKING, Generator
from langflow.services import ServiceType, service_manager
from langflow.services.schema import ServiceType
if TYPE_CHECKING:
from sqlmodel import Session
@ -21,7 +21,7 @@ if TYPE_CHECKING:
from langflow.services.variable.service import VariableService
def get_service(service_type: ServiceType):
def get_service(service_type: ServiceType, default=None):
"""
Retrieves the service instance for the given service type.
@ -32,7 +32,13 @@ def get_service(service_type: ServiceType):
Any: The service instance.
"""
return service_manager.get(service_type) # type: ignore
from langflow.services.manager import service_manager
if not service_manager.factories:
#! This is a workaround to ensure that the service manager is initialized
#! Not optimal, but it works for now
service_manager.register_factories()
return service_manager.get(service_type, default) # type: ignore
def get_state_service() -> "StateService":
@ -42,7 +48,9 @@ def get_state_service() -> "StateService":
Returns:
The StateService instance.
"""
return service_manager.get(ServiceType.STATE_SERVICE) # type: ignore
from langflow.services.state.factory import StateServiceFactory
return get_service(ServiceType.STATE_SERVICE, StateServiceFactory()) # type: ignore
def get_socket_service() -> "SocketIOService":
@ -52,7 +60,7 @@ def get_socket_service() -> "SocketIOService":
Returns:
SocketIOService: The SocketIOService instance.
"""
return service_manager.get(ServiceType.SOCKETIO_SERVICE) # type: ignore
return get_service(ServiceType.SOCKETIO_SERVICE) # type: ignore
def get_storage_service() -> "StorageService":
@ -62,7 +70,9 @@ def get_storage_service() -> "StorageService":
Returns:
The storage service instance.
"""
return service_manager.get(ServiceType.STORAGE_SERVICE) # type: ignore
from langflow.services.storage.factory import StorageServiceFactory
return get_service(ServiceType.STORAGE_SERVICE, default=StorageServiceFactory()) # type: ignore
def get_variable_service() -> "VariableService":
@ -73,7 +83,9 @@ def get_variable_service() -> "VariableService":
The VariableService instance.
"""
return service_manager.get(ServiceType.VARIABLE_SERVICE) # type: ignore
from langflow.services.variable.factory import VariableServiceFactory
return get_service(ServiceType.VARIABLE_SERVICE, VariableServiceFactory()) # type: ignore
def get_plugins_service() -> "PluginService":
@ -83,7 +95,7 @@ def get_plugins_service() -> "PluginService":
Returns:
PluginService: The PluginService instance.
"""
return service_manager.get(ServiceType.PLUGIN_SERVICE) # type: ignore
return get_service(ServiceType.PLUGIN_SERVICE) # type: ignore
def get_settings_service() -> "SettingsService":
@ -98,14 +110,9 @@ def get_settings_service() -> "SettingsService":
Raises:
ValueError: If the service cannot be retrieved or initialized.
"""
try:
return service_manager.get(ServiceType.SETTINGS_SERVICE) # type: ignore
except ValueError:
# initialize settings service
from langflow.services.manager import initialize_settings_service
from langflow.services.settings.factory import SettingsServiceFactory
initialize_settings_service()
return service_manager.get(ServiceType.SETTINGS_SERVICE) # type: ignore
return get_service(ServiceType.SETTINGS_SERVICE, SettingsServiceFactory()) # type: ignore
def get_db_service() -> "DatabaseService":
@ -116,7 +123,9 @@ def get_db_service() -> "DatabaseService":
The DatabaseService instance.
"""
return service_manager.get(ServiceType.DATABASE_SERVICE) # type: ignore
from langflow.services.database.factory import DatabaseServiceFactory
return get_service(ServiceType.DATABASE_SERVICE, DatabaseServiceFactory()) # type: ignore
def get_session() -> Generator["Session", None, None]:
@ -165,7 +174,9 @@ def get_cache_service() -> "CacheService":
Returns:
The cache service instance.
"""
return service_manager.get(ServiceType.CACHE_SERVICE) # type: ignore
from langflow.services.cache.factory import CacheServiceFactory
return get_service(ServiceType.CACHE_SERVICE, CacheServiceFactory()) # type: ignore
def get_session_service() -> "SessionService":
@ -175,7 +186,9 @@ def get_session_service() -> "SessionService":
Returns:
The session service instance.
"""
return service_manager.get(ServiceType.SESSION_SERVICE) # type: ignore
from langflow.services.session.factory import SessionServiceFactory
return get_service(ServiceType.SESSION_SERVICE, SessionServiceFactory()) # type: ignore
def get_monitor_service() -> "MonitorService":
@ -185,7 +198,9 @@ def get_monitor_service() -> "MonitorService":
Returns:
MonitorService: The MonitorService instance.
"""
return service_manager.get(ServiceType.MONITOR_SERVICE) # type: ignore
from langflow.services.monitor.factory import MonitorServiceFactory
return get_service(ServiceType.MONITOR_SERVICE, MonitorServiceFactory()) # type: ignore
def get_task_service() -> "TaskService":
@ -196,7 +211,9 @@ def get_task_service() -> "TaskService":
The TaskService instance.
"""
return service_manager.get(ServiceType.TASK_SERVICE) # type: ignore
from langflow.services.task.factory import TaskServiceFactory
return get_service(ServiceType.TASK_SERVICE, TaskServiceFactory()) # type: ignore
def get_chat_service() -> "ChatService":
@ -206,7 +223,7 @@ def get_chat_service() -> "ChatService":
Returns:
ChatService: The chat service instance.
"""
return service_manager.get(ServiceType.CHAT_SERVICE) # type: ignore
return get_service(ServiceType.CHAT_SERVICE) # type: ignore
def get_store_service() -> "StoreService":
@ -216,4 +233,4 @@ def get_store_service() -> "StoreService":
Returns:
StoreService: The StoreService instance.
"""
return service_manager.get(ServiceType.STORE_SERVICE) # type: ignore
return get_service(ServiceType.STORE_SERVICE) # type: ignore

View file

@ -1,13 +1,20 @@
from typing import TYPE_CHECKING, Dict
import importlib
import inspect
from typing import TYPE_CHECKING, Dict, Optional
from loguru import logger
if TYPE_CHECKING:
from langflow.services.base import Service
from langflow.services.factory import ServiceFactory
from langflow.services.schema import ServiceType
class NoFactoryRegisteredError(Exception):
pass
class ServiceManager:
"""
Manages the creation of different services.
@ -16,6 +23,15 @@ class ServiceManager:
def __init__(self):
self.services: Dict[str, "Service"] = {}
self.factories = {}
self.register_factories()
def register_factories(self):
for factory in self.get_factories():
try:
self.register_factory(factory)
except Exception as exc:
logger.exception(exc)
logger.error(f"Error initializing {factory}: {exc}")
def register_factory(
self,
@ -28,24 +44,28 @@ class ServiceManager:
service_name = service_factory.service_class.name
self.factories[service_name] = service_factory
def get(self, service_name: "ServiceType") -> "Service":
def get(self, service_name: "ServiceType", default: Optional["ServiceFactory"] = None) -> "Service":
"""
Get (or create) a service by its name.
"""
if service_name not in self.services:
self._create_service(service_name)
self._create_service(service_name, default)
return self.services[service_name]
def _create_service(self, service_name: "ServiceType"):
def _create_service(self, service_name: "ServiceType", default: Optional["ServiceFactory"] = None):
"""
Create a new service given its name, handling dependencies.
"""
logger.debug(f"Create service {service_name}")
self._validate_service_creation(service_name)
self._validate_service_creation(service_name, default)
# Create dependencies first
factory = self.factories.get(service_name)
if factory is None and default is not None:
self.register_factory(default)
factory = default
for dependency in factory.dependencies:
if dependency not in self.services:
self._create_service(dependency)
@ -57,12 +77,12 @@ class ServiceManager:
self.services[service_name] = self.factories[service_name].create(**dependent_services)
self.services[service_name].set_ready()
def _validate_service_creation(self, service_name: "ServiceType"):
def _validate_service_creation(self, service_name: "ServiceType", default: Optional["ServiceFactory"] = None):
"""
Validate whether the service can be created.
"""
if service_name not in self.factories:
raise ValueError(f"No factory registered for the service class '{service_name.name}'")
if service_name not in self.factories and default is None:
raise NoFactoryRegisteredError(f"No factory registered for the service class '{service_name.name}'")
def update(self, service_name: "ServiceType"):
"""
@ -88,6 +108,34 @@ class ServiceManager:
self.services = {}
self.factories = {}
@staticmethod
def get_factories():
from langflow.services.factory import ServiceFactory
from langflow.services.schema import ServiceType
service_names = [ServiceType(service_type).value.replace("_service", "") for service_type in ServiceType]
base_module = "langflow.services"
factories = []
for name in service_names:
try:
module_name = f"{base_module}.{name}.factory"
module = importlib.import_module(module_name)
# Find all classes in the module that are subclasses of ServiceFactory
for name, obj in inspect.getmembers(module, inspect.isclass):
if issubclass(obj, ServiceFactory) and obj is not ServiceFactory:
factories.append(obj())
break
except Exception as exc:
logger.exception(exc)
raise RuntimeError(
f"Could not initialize services. Please check your settings. Error in {name}."
) from exc
return factories
service_manager = ServiceManager()
@ -106,9 +154,7 @@ def initialize_session_service():
Initialize the session manager.
"""
from langflow.services.cache import factory as cache_factory
from langflow.services.session import (
factory as session_service_factory,
) # type: ignore
from langflow.services.session import factory as session_service_factory # type: ignore
initialize_settings_service()

View file

@ -19,5 +19,5 @@ class ServiceType(str, Enum):
VARIABLE_SERVICE = "variable_service"
STORAGE_SERVICE = "storage_service"
MONITOR_SERVICE = "monitor_service"
SOCKETIO_SERVICE = "socket_service"
# SOCKETIO_SERVICE = "socket_service"
STATE_SERVICE = "state_service"

View file

@ -1,6 +1,5 @@
from typing import Coroutine, Optional
from langflow.interface.run import build_sorted_vertices
from langflow.services.base import Service
from langflow.services.cache.base import CacheService
from langflow.services.session.utils import compute_dict_hash, session_id_generator
@ -25,8 +24,10 @@ class SessionService(Service):
if data_graph is None:
return (None, None)
# If not cached, build the graph and cache it
graph, artifacts = await build_sorted_vertices(data_graph, flow_id)
from langflow.graph.graph.base import Graph
graph = Graph.from_payload(data_graph, flow_id=flow_id)
artifacts: dict = {}
await self.cache_service.set(key, (graph, artifacts))
return graph, artifacts

View file

@ -100,9 +100,9 @@ class Settings(BaseSettings):
STORE: Optional[bool] = True
STORE_URL: Optional[str] = "https://api.langflow.store"
DOWNLOAD_WEBHOOK_URL: Optional[
str
] = "https://api.langflow.store/flows/trigger/ec611a61-8460-4438-b187-a4f65e5559d4"
DOWNLOAD_WEBHOOK_URL: Optional[str] = (
"https://api.langflow.store/flows/trigger/ec611a61-8460-4438-b187-a4f65e5559d4"
)
LIKE_WEBHOOK_URL: Optional[str] = "https://api.langflow.store/flows/trigger/64275852-ec00-45c1-984e-3bff814732da"
STORAGE_TYPE: str = "local"

View file

@ -1,41 +1,13 @@
import importlib
import inspect
from loguru import logger
from sqlmodel import Session, select
from langflow.services.auth.utils import create_super_user, verify_password
from langflow.services.cache.factory import CacheServiceFactory
from langflow.services.database.utils import initialize_database
from langflow.services.factory import ServiceFactory
from langflow.services.manager import service_manager
from langflow.services.schema import ServiceType
from langflow.services.settings.constants import DEFAULT_SUPERUSER, DEFAULT_SUPERUSER_PASSWORD
from langflow.services.socket.utils import set_socketio_server
from .deps import get_db_service, get_session, get_settings_service
def get_factories():
service_names = [ServiceType(service_type).value.replace("_service", "") for service_type in ServiceType]
base_module = "langflow.services"
factories = []
for name in service_names:
try:
module_name = f"{base_module}.{name}.factory"
module = importlib.import_module(module_name)
# Find all classes in the module that are subclasses of ServiceFactory
for name, obj in inspect.getmembers(module, inspect.isclass):
if issubclass(obj, ServiceFactory) and obj is not ServiceFactory:
factories.append(obj())
break
except Exception as exc:
logger.exception(exc)
raise RuntimeError(f"Could not initialize services. Please check your settings. Error in {name}.") from exc
return factories
from .deps import get_db_service, get_service, get_session, get_settings_service
def get_or_create_super_user(session: Session, username, password, is_default):
@ -145,6 +117,8 @@ def teardown_services():
except Exception as exc:
logger.exception(exc)
try:
from langflow.services.manager import service_manager
service_manager.teardown()
except Exception as exc:
logger.exception(exc)
@ -156,7 +130,7 @@ def initialize_settings_service():
"""
from langflow.services.settings import factory as settings_factory
service_manager.register_factory(settings_factory.SettingsServiceFactory())
get_service(ServiceType.SETTINGS_SERVICE, settings_factory.SettingsServiceFactory())
def initialize_session_service():
@ -168,11 +142,13 @@ def initialize_session_service():
initialize_settings_service()
service_manager.register_factory(
get_service(
ServiceType.CACHE_SERVICE,
cache_factory.CacheServiceFactory(),
)
service_manager.register_factory(
get_service(
ServiceType.SESSION_SERVICE,
session_service_factory.SessionServiceFactory(),
)
@ -181,27 +157,17 @@ def initialize_services(fix_migration: bool = False, socketio_server=None):
"""
Initialize all the services needed.
"""
for factory in get_factories():
try:
service_manager.register_factory(factory)
except Exception as exc:
logger.exception(exc)
logger.error(f"Error initializing {factory}: {exc}")
# Test cache connection
service_manager.get(ServiceType.CACHE_SERVICE)
get_service(ServiceType.CACHE_SERVICE, default=CacheServiceFactory())
# Setup the superuser
try:
initialize_database(fix_migration=fix_migration)
except Exception as exc:
logger.error(exc)
raise exc
setup_superuser(service_manager.get(ServiceType.SETTINGS_SERVICE), next(get_session()))
setup_superuser(get_service(ServiceType.SETTINGS_SERVICE), next(get_session()))
try:
get_db_service().migrate_flows_if_auto_login()
except Exception as exc:
logger.error(f"Error migrating flows: {exc}")
raise RuntimeError("Error migrating flows") from exc
# Initialize the SocketIO service
set_socketio_server(socketio_server)

View file

@ -62,30 +62,6 @@ cryptography = "^42.0.5"
asyncer = "^0.0.5"
[tool.poetry.group.dev.dependencies]
pytest-asyncio = "^0.23.1"
types-redis = "^4.6.0.5"
ipykernel = "^6.26.0"
mypy = "^1.7.1"
ruff = "^0.3.5"
httpx = "*"
pytest = "^8.1.1"
types-requests = "^2.31.0"
requests = "^2.31.0"
pytest-cov = "^5.0.0"
pandas-stubs = "^2.2.1.230412"
types-pillow = "^10.2.0.0"
types-pyyaml = "^6.0.12.8"
types-python-jose = "^3.3.4.8"
types-passlib = "^1.7.7.13"
locust = "^2.24.1"
pytest-mock = "^3.14.0"
pytest-xdist = "^3.5.0"
types-pywin32 = "^306.0.0.4"
types-google-cloud-ndb = "^2.3.0.0"
pytest-sugar = "^1.0.0"
[tool.poetry.extras]
deploy = ["celery", "redis", "flower"]
local = ["llama-cpp-python", "sentence-transformers", "ctransformers"]