diff --git a/src/backend/langflow/api/v1/chat.py b/src/backend/langflow/api/v1/chat.py index 2cec3b898..d28dfc79c 100644 --- a/src/backend/langflow/api/v1/chat.py +++ b/src/backend/langflow/api/v1/chat.py @@ -123,7 +123,10 @@ async def stream_build(flow_id: str): "log": f"Building node {vertex.vertex_type}", } yield str(StreamData(event="log", data=log_dict)) - vertex.build() + if vertex.is_task: + vertex = try_running_celery_task(vertex) + else: + vertex.build() params = vertex._built_object_repr() valid = True logger.debug(f"Building node {str(vertex.vertex_type)}") @@ -179,3 +182,20 @@ async def stream_build(flow_id: str): except Exception as exc: logger.error(f"Error streaming build: {exc}") raise HTTPException(status_code=500, detail=str(exc)) + + +def try_running_celery_task(vertex): + # Try running the task in celery + # and set the task_id to the local vertex + # if it fails, run the task locally + try: + from langflow.worker import build_vertex + + task = build_vertex.delay(vertex) + vertex.task_id = task.id + except Exception as exc: + logger.exception(exc) + logger.error("Error running task in celery, running locally") + vertex.task_id = None + vertex.build() + return vertex