feat(task): add TaskManagerFactory class to create and configure a TaskManager

🐛 fix(task): update get_task_status function in TaskManager class to return the correct task status
🔧 chore(task): add AsyncIOTaskResult class to handle asyncio tasks and get their status and result
🔧 chore(task): add get_celery_worker_status function to get the status of the Celery worker
This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-09-01 17:20:40 -03:00
commit 8faad70917
4 changed files with 116 additions and 0 deletions

View file

@ -0,0 +1,11 @@
from langflow.services.task.manager import TaskManager
from langflow.services.factory import ServiceFactory
class TaskManagerFactory(ServiceFactory):
def __init__(self):
super().__init__(TaskManager)
def create(self):
# Here you would have logic to create and configure a TaskManager
return TaskManager()

View file

@ -0,0 +1,70 @@
import asyncio
from typing import Any, Callable, Union
import logging
from langflow.services.base import Service
from langflow.services.task.utils import AsyncIOTaskResult, get_celery_worker_status
try:
from celery.result import AsyncResult
from langflow.worker import celery_app
try:
status = get_celery_worker_status(celery_app)
except Exception as e:
logging.error(f"An error occurred: {e}")
status = {"availability": None}
USE_CELERY = status.get("availability") is not None
except ImportError:
USE_CELERY = False
class TaskManager(Service):
STATUS_PENDING = "PENDING"
STATUS_FINISHED = "FINISHED"
STATUS_UNKNOWN = "UNKNOWN"
name = "task_manager"
def __init__(self):
self.tasks = {} # For storing asyncio tasks
self.celery_results = {} # For storing Celery AsyncResult instances
if USE_CELERY:
from langflow.worker import celery_app
self.celery_app = celery_app
else:
self.celery_app = None # To store the celery app if available
self.use_celery = USE_CELERY
def launch_task(
self,
task_func: Callable[..., Any],
*args: Any,
**kwargs: Any,
) -> Union[int, str]:
if USE_CELERY:
task = task_func.apply_async(args=args, kwargs=kwargs)
self.celery_results[task.id] = task
return task.id
else:
task = asyncio.create_task(task_func(*args, **kwargs))
task_id = str(id(task))
self.tasks[task_id] = AsyncIOTaskResult(task)
def set_result(future):
try:
self.tasks[task_id] = AsyncIOTaskResult(future)
except Exception as e:
logging.error(f"An error occurred: {e}")
task.add_done_callback(set_result)
return task_id
# Update the get_task_status function in TaskManager class
def get_task(
self, task_id: Union[int, str]
) -> Union[AsyncResult, AsyncIOTaskResult]:
if self.use_celery:
return AsyncResult(task_id, app=self.celery_app)
return self.tasks.get(task_id)

View file

@ -0,0 +1,35 @@
from asyncio import Task
class AsyncIOTaskResult:
def __init__(self, task: Task):
self._task = task
@property
def status(self) -> str:
if self._task.done():
return "FAILURE" if self._task.exception() is not None else "SUCCESS"
return "PENDING"
@property
def result(self) -> any:
return self._task.result() if self._task.done() else None
def ready(self) -> bool:
return self._task.done()
def get_celery_worker_status(app):
i = app.control.inspect()
availability = i.ping()
stats = i.stats()
registered_tasks = i.registered()
active_tasks = i.active()
scheduled_tasks = i.scheduled()
return {
"availability": availability,
"stats": stats,
"registered_tasks": registered_tasks,
"active_tasks": active_tasks,
"scheduled_tasks": scheduled_tasks,
}