diff --git a/src/backend/langflow/services/task/backends/__init__.py b/src/backend/langflow/services/task/backends/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/backend/langflow/services/task/backends/anyio.py b/src/backend/langflow/services/task/backends/anyio.py new file mode 100644 index 000000000..68c449d4e --- /dev/null +++ b/src/backend/langflow/services/task/backends/anyio.py @@ -0,0 +1,50 @@ +from typing import Any, Callable, Tuple +import anyio +from langflow.services.task.backends.base import TaskBackend + + +class AnyIOTaskResult: + def __init__(self, scope): + self._scope = scope + self._status = "PENDING" + self._result = None + self._exception = None + + @property + def status(self) -> str: + if self._status == "DONE": + return "FAILURE" if self._exception is not None else "SUCCESS" + return self._status + + @property + def result(self) -> any: + return self._result + + def ready(self) -> bool: + return self._status == "DONE" + + async def run(self, func, *args, **kwargs): + try: + self._result = await func(*args, **kwargs) + except Exception as e: + self._exception = e + finally: + self._status = "DONE" + + +class AnyIOBackend(TaskBackend): + def __init__(self): + self.tasks = {} + + async def launch_task( + self, task_func: Callable[..., Any], *args: Any, **kwargs: Any + ) -> Tuple[str, AnyIOTaskResult]: # sourcery skip: remove-unnecessary-cast + async with anyio.create_task_group() as tg: + task_result = AnyIOTaskResult(tg) + await tg.spawn(task_result.run, task_func, *args, **kwargs) + task_id = str(id(task_result)) + self.tasks[task_id] = task_result + return task_id, task_result + + def get_task(self, task_id: int) -> AnyIOTaskResult: + return self.tasks.get(task_id) diff --git a/src/backend/langflow/services/task/backends/base.py b/src/backend/langflow/services/task/backends/base.py new file mode 100644 index 000000000..a05c760e7 --- /dev/null +++ b/src/backend/langflow/services/task/backends/base.py @@ -0,0 +1,14 @@ +from abc import ABC, abstractmethod +from typing import Any, Callable, Union + + +class TaskBackend(ABC): + @abstractmethod + def launch_task( + self, task_func: Callable[..., Any], *args: Any, **kwargs: Any + ) -> Union[int, str]: + pass + + @abstractmethod + def get_task(self, task_id: Union[int, str]) -> Any: + pass diff --git a/src/backend/langflow/services/task/backends/celery.py b/src/backend/langflow/services/task/backends/celery.py new file mode 100644 index 000000000..04e26159c --- /dev/null +++ b/src/backend/langflow/services/task/backends/celery.py @@ -0,0 +1,18 @@ +from typing import Any, Callable +from celery.result import AsyncResult +from langflow.services.task.backends.base import TaskBackend +from langflow.worker import celery_app + + +class CeleryBackend(TaskBackend): + def __init__(self): + self.celery_app = celery_app + + def launch_task( + self, task_func: Callable[..., Any], *args: Any, **kwargs: Any + ) -> str: + task = task_func.apply_async(args=args, kwargs=kwargs) + return task.id + + def get_task(self, task_id: str) -> AsyncResult: + return AsyncResult(task_id, app=self.celery_app)