Fix async handling in TaskService

This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-11-28 15:17:44 -03:00
commit 46ba84b95d

View file

@ -1,10 +1,11 @@
from typing import Any, Callable, Coroutine, Union
from langflow.utils.logger import configure
from loguru import logger
from langflow.services.base import Service
from langflow.services.task.backends.anyio import AnyIOBackend
from langflow.services.task.backends.base import TaskBackend
from langflow.services.task.utils import get_celery_worker_status
from langflow.utils.logger import configure
from loguru import logger
def check_celery_availability():
@ -60,7 +61,11 @@ class TaskService(Service):
if not hasattr(task_func, "apply"):
raise ValueError(f"Task function {task_func} does not have an apply method")
task = task_func.apply(args=args, kwargs=kwargs)
result = task.get()
# if result is coroutine
if isinstance(result, Coroutine):
result = await result
return task.id, result
async def launch_task(self, task_func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
@ -71,3 +76,6 @@ class TaskService(Service):
def get_task(self, task_id: Union[int, str]) -> Any:
return self.backend.get_task(task_id)
def get_task(self, task_id: Union[int, str]) -> Any:
return self.backend.get_task(task_id)