🔧 chore(celery_app.py): refactor celery_app to use a separate function make_celery for improved modularity and readability
🔧 chore(celeryconfig.py): update broker_url and result_backend to use LANGFLOW_REDIS_HOST and LANGFLOW_REDIS_PORT environment variables if available, fallback to default values if not
This commit is contained in:
parent
1a51e90c43
commit
0833b00fdc
2 changed files with 17 additions and 7 deletions
|
|
@ -1,7 +1,11 @@
|
|||
from celery import Celery
|
||||
|
||||
celery_app = Celery(
|
||||
"langflow", broker="redis://queue:6379/0", backend="redis://queue:6379/0"
|
||||
)
|
||||
# command: celery -A langflow.worker.celery_app worker --loglevel=INFO
|
||||
celery_app.conf.task_routes = {"langflow.worker.tasks.*": {"queue": "langflow"}}
|
||||
|
||||
def make_celery(app_name: str):
|
||||
celery_app = Celery(app_name)
|
||||
celery_app.config_from_object("langflow.core.celeryconfig")
|
||||
celery_app.conf.task_routes = {"langflow.worker.tasks.*": {"queue": "langflow"}}
|
||||
return celery_app
|
||||
|
||||
|
||||
celery_app = make_celery("langflow")
|
||||
|
|
|
|||
|
|
@ -1,7 +1,13 @@
|
|||
# celeryconfig.py
|
||||
import os
|
||||
|
||||
broker_url = os.environ.get("BROKER_URL", "redis://localhost:6379/0")
|
||||
result_backend = os.environ.get("RESULT_BACKEND", "redis://localhost:6379/0")
|
||||
langflow_redis_host = os.environ.get("LANGFLOW_REDIS_HOST")
|
||||
langflow_redis_port = os.environ.get("LANGFLOW_REDIS_PORT")
|
||||
if langflow_redis_host and langflow_redis_port:
|
||||
broker_url = f"redis://{langflow_redis_host}:{langflow_redis_port}/0"
|
||||
result_backend = f"redis://{langflow_redis_host}:{langflow_redis_port}/0"
|
||||
else:
|
||||
broker_url = os.environ.get("BROKER_URL", "redis://localhost:6379/0")
|
||||
result_backend = os.environ.get("RESULT_BACKEND", "redis://localhost:6379/0")
|
||||
# tasks should be json or pickle
|
||||
accept_content = ["json", "pickle"]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue