From 46ba84b95de08c8f5ad4b7cbf0e76cdaf86756e4 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 28 Nov 2023 15:17:44 -0300 Subject: [PATCH] Fix async handling in TaskService --- src/backend/langflow/services/task/service.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/backend/langflow/services/task/service.py b/src/backend/langflow/services/task/service.py index 6730b1d91..745543aae 100644 --- a/src/backend/langflow/services/task/service.py +++ b/src/backend/langflow/services/task/service.py @@ -1,10 +1,11 @@ from typing import Any, Callable, Coroutine, Union -from langflow.utils.logger import configure -from loguru import logger + from langflow.services.base import Service from langflow.services.task.backends.anyio import AnyIOBackend from langflow.services.task.backends.base import TaskBackend from langflow.services.task.utils import get_celery_worker_status +from langflow.utils.logger import configure +from loguru import logger def check_celery_availability(): @@ -60,7 +61,11 @@ class TaskService(Service): if not hasattr(task_func, "apply"): raise ValueError(f"Task function {task_func} does not have an apply method") task = task_func.apply(args=args, kwargs=kwargs) + result = task.get() + # if result is coroutine + if isinstance(result, Coroutine): + result = await result return task.id, result async def launch_task(self, task_func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: @@ -71,3 +76,6 @@ class TaskService(Service): def get_task(self, task_id: Union[int, str]) -> Any: return self.backend.get_task(task_id) + + def get_task(self, task_id: Union[int, str]) -> Any: + return self.backend.get_task(task_id)