🐛 fix(celery.py): fix type hinting error for AsyncResult import

🔒 chore(celery.py): add check for delay method existence in launch_task to prevent errors
🔒 chore(celery.py): change get_task return type hint to Any to match actual return type
This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-09-05 15:32:54 -03:00
commit e50704a49f

View file

@ -1,5 +1,5 @@
from typing import Any, Callable
from celery.result import AsyncResult
from celery.result import AsyncResult # type: ignore
from langflow.services.task.backends.base import TaskBackend
from langflow.worker import celery_app
@ -11,8 +11,10 @@ class CeleryBackend(TaskBackend):
def launch_task(
self, task_func: Callable[..., Any], *args: Any, **kwargs: Any
) -> str:
task = task_func.apply_async(args=args, kwargs=kwargs)
if not hasattr(task_func, "delay"):
raise ValueError(f"Task function {task_func} does not have a delay method")
task = task_func.delay(*args, **kwargs)
return task.id
def get_task(self, task_id: str) -> AsyncResult:
def get_task(self, task_id: str) -> Any:
return AsyncResult(task_id, app=self.celery_app)