diff --git a/src/backend/langflow/worker.py b/src/backend/langflow/worker.py index 8e07c0e55..be4a7e318 100644 --- a/src/backend/langflow/worker.py +++ b/src/backend/langflow/worker.py @@ -30,8 +30,9 @@ def build_vertex(self, vertex: "Vertex") -> "Vertex": raise self.retry(exc=SoftTimeLimitExceeded("Task took too long"), countdown=2) from e -@celery_app.task(acks_late=True) -def process_graph_cached_task( +@celery_app.task(bind=True, acks_late=True) +async def process_graph_cached_task( + self, data_graph: Dict[str, Any], inputs: Optional[dict] = None, clear_cache=False, @@ -48,7 +49,7 @@ def process_graph_cached_task( session_id = session_service.generate_key(session_id=session_id, data_graph=data_graph) # Use async_to_sync to handle the asynchronous part of the session service - session_data = async_to_sync(session_service.load_session)(session_id, data_graph) + session_data = await session_service.load_session(session_id, data_graph) logger.warning(f"session_data: {session_data}") graph, artifacts = session_data if session_data else (None, None) @@ -56,7 +57,7 @@ def process_graph_cached_task( raise ValueError("Graph not found in the session") # Use async_to_sync for the asynchronous build method - built_object = async_to_sync(graph.build)() + built_object = await graph.build() logger.info(f"Built object: {built_object}")