diff --git a/src/backend/base/langflow/processing/base.py b/src/backend/base/langflow/processing/base.py deleted file mode 100644 index 3ef0909db..000000000 --- a/src/backend/base/langflow/processing/base.py +++ /dev/null @@ -1,44 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING - -from loguru import logger - -from langflow.services.deps import get_plugins_service - -if TYPE_CHECKING: - from langchain_core.callbacks import BaseCallbackHandler - from langfuse.callback import CallbackHandler - - -def setup_callbacks(trace_id): - """Setup callbacks for langchain object.""" - callbacks = [] - plugin_service = get_plugins_service() - plugin_callbacks = plugin_service.get_callbacks(_id=trace_id) - if plugin_callbacks: - callbacks.extend(plugin_callbacks) - return callbacks - - -def get_langfuse_callback(trace_id): - from langflow.services.deps import get_plugins_service - - logger.debug("Initializing langfuse callback") - if langfuse := get_plugins_service().get("langfuse"): - logger.debug("Langfuse credentials found") - try: - trace = langfuse.trace(name="langflow-" + trace_id, id=trace_id) - return trace.getNewHandler() - except Exception: # noqa: BLE001 - logger.exception("Error initializing langfuse callback") - - return None - - -def flush_langfuse_callback_if_present(callbacks: list[BaseCallbackHandler | CallbackHandler]) -> None: - """If langfuse callback is present, run callback.langfuse.flush().""" - for callback in callbacks: - if hasattr(callback, "langfuse") and hasattr(callback.langfuse, "flush"): - callback.langfuse.flush() - break diff --git a/src/backend/base/langflow/services/deps.py b/src/backend/base/langflow/services/deps.py index 0a7291199..6b7fb4084 100644 --- a/src/backend/base/langflow/services/deps.py +++ b/src/backend/base/langflow/services/deps.py @@ -16,7 +16,6 @@ if TYPE_CHECKING: from langflow.services.cache.service import AsyncBaseCacheService, CacheService from langflow.services.chat.service import ChatService from langflow.services.database.service import DatabaseService - from langflow.services.plugins.service import PluginService from langflow.services.session.service import SessionService from langflow.services.settings.service import SettingsService from langflow.services.socket.service import SocketIOService @@ -115,15 +114,6 @@ def get_variable_service() -> VariableService: return get_service(ServiceType.VARIABLE_SERVICE, VariableServiceFactory()) -def get_plugins_service() -> PluginService: - """Get the PluginService instance from the service manager. - - Returns: - PluginService: The PluginService instance. - """ - return get_service(ServiceType.PLUGIN_SERVICE) # type: ignore[attr-defined] - - def get_settings_service() -> SettingsService: """Retrieves the SettingsService instance. diff --git a/src/backend/base/langflow/services/plugins/__init__.py b/src/backend/base/langflow/services/plugins/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/backend/base/langflow/services/plugins/base.py b/src/backend/base/langflow/services/plugins/base.py deleted file mode 100644 index 5f20db136..000000000 --- a/src/backend/base/langflow/services/plugins/base.py +++ /dev/null @@ -1,17 +0,0 @@ -from typing import Any - - -class BasePlugin: - def initialize(self) -> None: - pass - - def teardown(self) -> None: - pass - - def get(self) -> Any: - pass - - -class CallbackPlugin(BasePlugin): - def get_callback(self, _id=None): - pass diff --git a/src/backend/base/langflow/services/plugins/factory.py b/src/backend/base/langflow/services/plugins/factory.py deleted file mode 100644 index e6048d550..000000000 --- a/src/backend/base/langflow/services/plugins/factory.py +++ /dev/null @@ -1,12 +0,0 @@ -from __future__ import annotations - -from langflow.services.factory import ServiceFactory -from langflow.services.plugins.service import PluginService - - -class PluginServiceFactory(ServiceFactory): - def __init__(self) -> None: - super().__init__(PluginService) - - def create(self): - return PluginService() diff --git a/src/backend/base/langflow/services/plugins/langfuse_plugin.py b/src/backend/base/langflow/services/plugins/langfuse_plugin.py deleted file mode 100644 index fe7389ac1..000000000 --- a/src/backend/base/langflow/services/plugins/langfuse_plugin.py +++ /dev/null @@ -1,86 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING - -from loguru import logger - -from langflow.services.deps import get_settings_service -from langflow.services.plugins.base import CallbackPlugin - -if TYPE_CHECKING: - from langfuse import Langfuse - - -class LangfuseInstance: - _instance: Langfuse | None = None - - @classmethod - def get(cls): - logger.debug("Getting Langfuse instance") - if cls._instance is None: - cls.create() - return cls._instance - - @classmethod - def create(cls) -> None: - try: - logger.debug("Creating Langfuse instance") - from langfuse import Langfuse - - settings_manager = get_settings_service() - - if settings_manager.settings.langfuse_public_key and settings_manager.settings.langfuse_secret_key: - logger.debug("Langfuse credentials found") - cls._instance = Langfuse( - public_key=settings_manager.settings.langfuse_public_key, - secret_key=settings_manager.settings.langfuse_secret_key, - host=settings_manager.settings.langfuse_host, - ) - else: - logger.debug("No Langfuse credentials found") - cls._instance = None - except ImportError: - logger.debug("Langfuse not installed") - cls._instance = None - - @classmethod - def update(cls) -> None: - logger.debug("Updating Langfuse instance") - cls._instance = None - cls.create() - - @classmethod - def teardown(cls) -> None: - logger.debug("Tearing down Langfuse instance") - if cls._instance is not None: - cls._instance.flush() - cls._instance = None - - -class LangfusePlugin(CallbackPlugin): - def initialize(self) -> None: - LangfuseInstance.create() - - def teardown(self) -> None: - LangfuseInstance.teardown() - - def get(self): - return LangfuseInstance.get() - - def get_callback(self, _id: str | None = None): - if _id is None: - _id = "default" - - logger.debug("Initializing langfuse callback") - - try: - langfuse_instance = self.get() - if langfuse_instance is not None and hasattr(langfuse_instance, "trace"): - trace = langfuse_instance.trace(name="langflow-" + _id, id=_id) - if trace: - return trace.getNewHandler() - - except Exception: # noqa: BLE001 - logger.exception("Error initializing langfuse callback") - - return None diff --git a/src/backend/base/langflow/services/plugins/service.py b/src/backend/base/langflow/services/plugins/service.py deleted file mode 100644 index c623cfdd6..000000000 --- a/src/backend/base/langflow/services/plugins/service.py +++ /dev/null @@ -1,64 +0,0 @@ -from __future__ import annotations - -import importlib -import inspect -from pathlib import Path - -from loguru import logger - -from langflow.services.base import Service -from langflow.services.plugins.base import BasePlugin, CallbackPlugin - - -class PluginService(Service): - name = "plugin_service" - - def __init__(self) -> None: - self.plugins: dict[str, BasePlugin] = {} - self.plugin_dir = Path(__file__).parent - self.plugins_base_module = "langflow.services.plugins" - self.load_plugins() - - def load_plugins(self) -> None: - base_files = ["base.py", "service.py", "factory.py", "__init__.py"] - for module in self.plugin_dir.iterdir(): - if module.suffix == ".py" and module.name not in base_files: - plugin_name = module.stem - module_path = f"{self.plugins_base_module}.{plugin_name}" - try: - mod = importlib.import_module(module_path) - for attr_name in dir(mod): - attr = getattr(mod, attr_name) - if ( - inspect.isclass(attr) - and issubclass(attr, BasePlugin) - and attr not in {CallbackPlugin, BasePlugin} - ): - self.register_plugin(plugin_name, attr()) - except Exception: # noqa: BLE001 - logger.exception(f"Error loading plugin {plugin_name}") - - def register_plugin(self, plugin_name, plugin_instance) -> None: - self.plugins[plugin_name] = plugin_instance - plugin_instance.initialize() - - def get_plugin(self, plugin_name) -> BasePlugin | None: - return self.plugins.get(plugin_name) - - def get(self, plugin_name): - if plugin := self.get_plugin(plugin_name): - return plugin.get() - return None - - async def teardown(self) -> None: - for plugin in self.plugins.values(): - plugin.teardown() - - def get_callbacks(self, _id=None): - callbacks = [] - for plugin in self.plugins.values(): - if isinstance(plugin, CallbackPlugin): - callback = plugin.get_callback(_id=_id) - if callback: - callbacks.append(callback) - return callbacks diff --git a/src/backend/base/langflow/services/schema.py b/src/backend/base/langflow/services/schema.py index 482e3a7dc..8227d0f69 100644 --- a/src/backend/base/langflow/services/schema.py +++ b/src/backend/base/langflow/services/schema.py @@ -12,7 +12,6 @@ class ServiceType(str, Enum): CHAT_SERVICE = "chat_service" SESSION_SERVICE = "session_service" TASK_SERVICE = "task_service" - PLUGINS_SERVICE = "plugins_service" STORE_SERVICE = "store_service" VARIABLE_SERVICE = "variable_service" STORAGE_SERVICE = "storage_service"