🔧 fix(manager.py): replace logging module with loguru logger for consistent logging across the project
✨ feat(manager.py): add debug logs to check Celery availability and backend being used 🔧 fix(manager.py): move check_celery_availability function definition above its usage to improve code readability ✨ feat(manager.py): add debug logs to show task launch details and backend being used
This commit is contained in:
parent
5f3c602b29
commit
c5a6003ef5
1 changed files with 15 additions and 4 deletions
|
|
@ -1,19 +1,26 @@
|
|||
from typing import Any, Callable, Coroutine, Union
|
||||
import logging
|
||||
|
||||
from loguru import logger
|
||||
from langflow.services.base import Service
|
||||
from langflow.services.task.backends.anyio import AnyIOBackend
|
||||
from langflow.services.task.backends.base import TaskBackend
|
||||
from langflow.services.task.utils import get_celery_worker_status
|
||||
|
||||
try:
|
||||
|
||||
def check_celery_availability():
|
||||
from langflow.worker import celery_app
|
||||
|
||||
try:
|
||||
status = get_celery_worker_status(celery_app)
|
||||
logger.debug(f"Celery status: {status}")
|
||||
except Exception as e:
|
||||
logging.error(f"An error occurred: {e}")
|
||||
logger.error(f"An error occurred: {e}")
|
||||
status = {"availability": None}
|
||||
return status
|
||||
|
||||
|
||||
try:
|
||||
status = check_celery_availability()
|
||||
|
||||
USE_CELERY = status.get("availability") is not None
|
||||
except ImportError:
|
||||
USE_CELERY = False
|
||||
|
|
@ -30,7 +37,9 @@ class TaskService(Service):
|
|||
if USE_CELERY:
|
||||
from langflow.services.task.backends.celery import CeleryBackend
|
||||
|
||||
logger.debug("Using Celery backend")
|
||||
return CeleryBackend()
|
||||
logger.debug("Using AnyIO backend")
|
||||
return AnyIOBackend()
|
||||
|
||||
# In your TaskService class
|
||||
|
|
@ -51,6 +60,8 @@ class TaskService(Service):
|
|||
async def launch_task(
|
||||
self, task_func: Callable[..., Any], *args: Any, **kwargs: Any
|
||||
) -> Any:
|
||||
logger.debug(f"Launching task {task_func} with args {args} and kwargs {kwargs}")
|
||||
logger.debug(f"Using backend {self.backend}")
|
||||
task = self.backend.launch_task(task_func, *args, **kwargs)
|
||||
return await task if isinstance(task, Coroutine) else task
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue