From 2dd43d3f994423120c5990e7d54f81066b928bcf Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 17 Aug 2023 10:15:49 -0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20fix(chat.py):=20add=20support=20?= =?UTF-8?q?for=20running=20celery=20tasks=20for=20building=20vertices=20an?= =?UTF-8?q?d=20fallback=20to=20local=20build=20if=20celery=20task=20fails?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/langflow/api/v1/chat.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/src/backend/langflow/api/v1/chat.py b/src/backend/langflow/api/v1/chat.py index 2cec3b898..d28dfc79c 100644 --- a/src/backend/langflow/api/v1/chat.py +++ b/src/backend/langflow/api/v1/chat.py @@ -123,7 +123,10 @@ async def stream_build(flow_id: str): "log": f"Building node {vertex.vertex_type}", } yield str(StreamData(event="log", data=log_dict)) - vertex.build() + if vertex.is_task: + vertex = try_running_celery_task(vertex) + else: + vertex.build() params = vertex._built_object_repr() valid = True logger.debug(f"Building node {str(vertex.vertex_type)}") @@ -179,3 +182,20 @@ async def stream_build(flow_id: str): except Exception as exc: logger.error(f"Error streaming build: {exc}") raise HTTPException(status_code=500, detail=str(exc)) + + +def try_running_celery_task(vertex): + # Try running the task in celery + # and set the task_id to the local vertex + # if it fails, run the task locally + try: + from langflow.worker import build_vertex + + task = build_vertex.delay(vertex) + vertex.task_id = task.id + except Exception as exc: + logger.exception(exc) + logger.error("Error running task in celery, running locally") + vertex.task_id = None + vertex.build() + return vertex