Refactor process_graph_cached_task to use

async/await
This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-11-28 15:17:52 -03:00
commit 0d8a400557

View file

@ -30,8 +30,9 @@ def build_vertex(self, vertex: "Vertex") -> "Vertex":
raise self.retry(exc=SoftTimeLimitExceeded("Task took too long"), countdown=2) from e
@celery_app.task(acks_late=True)
def process_graph_cached_task(
@celery_app.task(bind=True, acks_late=True)
async def process_graph_cached_task(
self,
data_graph: Dict[str, Any],
inputs: Optional[dict] = None,
clear_cache=False,
@ -48,7 +49,7 @@ def process_graph_cached_task(
session_id = session_service.generate_key(session_id=session_id, data_graph=data_graph)
# Use async_to_sync to handle the asynchronous part of the session service
session_data = async_to_sync(session_service.load_session)(session_id, data_graph)
session_data = await session_service.load_session(session_id, data_graph)
logger.warning(f"session_data: {session_data}")
graph, artifacts = session_data if session_data else (None, None)
@ -56,7 +57,7 @@ def process_graph_cached_task(
raise ValueError("Graph not found in the session")
# Use async_to_sync for the asynchronous build method
built_object = async_to_sync(graph.build)()
built_object = await graph.build()
logger.info(f"Built object: {built_object}")