diff --git a/src/backend/langflow/services/task/__init__.py b/src/backend/langflow/services/task/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/backend/langflow/services/task/factory.py b/src/backend/langflow/services/task/factory.py new file mode 100644 index 000000000..b21cb38cc --- /dev/null +++ b/src/backend/langflow/services/task/factory.py @@ -0,0 +1,11 @@ +from langflow.services.task.manager import TaskManager +from langflow.services.factory import ServiceFactory + + +class TaskManagerFactory(ServiceFactory): + def __init__(self): + super().__init__(TaskManager) + + def create(self): + # Here you would have logic to create and configure a TaskManager + return TaskManager() diff --git a/src/backend/langflow/services/task/manager.py b/src/backend/langflow/services/task/manager.py new file mode 100644 index 000000000..6fc9dfa16 --- /dev/null +++ b/src/backend/langflow/services/task/manager.py @@ -0,0 +1,70 @@ +import asyncio +from typing import Any, Callable, Union +import logging + +from langflow.services.base import Service +from langflow.services.task.utils import AsyncIOTaskResult, get_celery_worker_status + +try: + from celery.result import AsyncResult + from langflow.worker import celery_app + + try: + status = get_celery_worker_status(celery_app) + except Exception as e: + logging.error(f"An error occurred: {e}") + status = {"availability": None} + USE_CELERY = status.get("availability") is not None +except ImportError: + USE_CELERY = False + + +class TaskManager(Service): + STATUS_PENDING = "PENDING" + STATUS_FINISHED = "FINISHED" + STATUS_UNKNOWN = "UNKNOWN" + name = "task_manager" + + def __init__(self): + self.tasks = {} # For storing asyncio tasks + self.celery_results = {} # For storing Celery AsyncResult instances + if USE_CELERY: + from langflow.worker import celery_app + + self.celery_app = celery_app + + else: + self.celery_app = None # To store the celery app if available + self.use_celery = USE_CELERY + + def launch_task( + self, + task_func: Callable[..., Any], + *args: Any, + **kwargs: Any, + ) -> Union[int, str]: + if USE_CELERY: + task = task_func.apply_async(args=args, kwargs=kwargs) + self.celery_results[task.id] = task + return task.id + else: + task = asyncio.create_task(task_func(*args, **kwargs)) + task_id = str(id(task)) + self.tasks[task_id] = AsyncIOTaskResult(task) + + def set_result(future): + try: + self.tasks[task_id] = AsyncIOTaskResult(future) + except Exception as e: + logging.error(f"An error occurred: {e}") + + task.add_done_callback(set_result) + return task_id + + # Update the get_task_status function in TaskManager class + def get_task( + self, task_id: Union[int, str] + ) -> Union[AsyncResult, AsyncIOTaskResult]: + if self.use_celery: + return AsyncResult(task_id, app=self.celery_app) + return self.tasks.get(task_id) diff --git a/src/backend/langflow/services/task/utils.py b/src/backend/langflow/services/task/utils.py new file mode 100644 index 000000000..7736c256a --- /dev/null +++ b/src/backend/langflow/services/task/utils.py @@ -0,0 +1,35 @@ +from asyncio import Task + + +class AsyncIOTaskResult: + def __init__(self, task: Task): + self._task = task + + @property + def status(self) -> str: + if self._task.done(): + return "FAILURE" if self._task.exception() is not None else "SUCCESS" + return "PENDING" + + @property + def result(self) -> any: + return self._task.result() if self._task.done() else None + + def ready(self) -> bool: + return self._task.done() + + +def get_celery_worker_status(app): + i = app.control.inspect() + availability = i.ping() + stats = i.stats() + registered_tasks = i.registered() + active_tasks = i.active() + scheduled_tasks = i.scheduled() + return { + "availability": availability, + "stats": stats, + "registered_tasks": registered_tasks, + "active_tasks": active_tasks, + "scheduled_tasks": scheduled_tasks, + }