diff --git a/src/backend/langflow/services/task/manager.py b/src/backend/langflow/services/task/manager.py index 4083978df..ab74d916e 100644 --- a/src/backend/langflow/services/task/manager.py +++ b/src/backend/langflow/services/task/manager.py @@ -1,19 +1,26 @@ from typing import Any, Callable, Coroutine, Union -import logging - +from loguru import logger from langflow.services.base import Service from langflow.services.task.backends.anyio import AnyIOBackend from langflow.services.task.backends.base import TaskBackend from langflow.services.task.utils import get_celery_worker_status -try: + +def check_celery_availability(): from langflow.worker import celery_app try: status = get_celery_worker_status(celery_app) + logger.debug(f"Celery status: {status}") except Exception as e: - logging.error(f"An error occurred: {e}") + logger.error(f"An error occurred: {e}") status = {"availability": None} + return status + + +try: + status = check_celery_availability() + USE_CELERY = status.get("availability") is not None except ImportError: USE_CELERY = False @@ -30,7 +37,9 @@ class TaskService(Service): if USE_CELERY: from langflow.services.task.backends.celery import CeleryBackend + logger.debug("Using Celery backend") return CeleryBackend() + logger.debug("Using AnyIO backend") return AnyIOBackend() # In your TaskService class @@ -51,6 +60,8 @@ class TaskService(Service): async def launch_task( self, task_func: Callable[..., Any], *args: Any, **kwargs: Any ) -> Any: + logger.debug(f"Launching task {task_func} with args {args} and kwargs {kwargs}") + logger.debug(f"Using backend {self.backend}") task = self.backend.launch_task(task_func, *args, **kwargs) return await task if isinstance(task, Coroutine) else task