Refactor process_graph_data function to use task_service.launch_task and handle task status

This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-12-20 14:17:12 -03:00
commit c58837d65d

View file

@ -49,20 +49,19 @@ async def process_graph_data(
):
task_result = None
task_status = None
task_id = None
if tweaks:
try:
graph_data = process_tweaks(graph_data, tweaks)
except Exception as exc:
logger.error(f"Error processing tweaks: {exc}")
if sync:
task_id, result = await task_service.launch_and_await_task(
process_graph_cached_task if task_service.use_celery else process_graph_cached,
result = await process_graph_cached(
graph_data,
inputs,
clear_cache,
session_id,
)
task_id = str(id(result))
if isinstance(result, dict) and "result" in result:
task_result = result["result"]
session_id = result["session_id"]
@ -78,9 +77,19 @@ async def process_graph_data(
if session_id is None:
# Generate a session ID
session_id = get_session_service().generate_key(session_id=session_id, data_graph=graph_data)
result = process_graph_cached(
data_graph=graph_data, inputs=inputs, clear_cache=clear_cache, session_id=session_id
task_id, task = await task_service.launch_task(
process_graph_cached_task if task_service.use_celery else process_graph_cached,
graph_data,
inputs,
clear_cache,
session_id,
)
task_status = task.status
if task.status == "FAILURE":
logger.error(f"Task {task_id} failed: {task.traceback}")
task_result = str(task._exception)
else:
task_result = task.result
if task_id:
task_response = TaskResponse(id=task_id, href=f"api/v1/task/{task_id}")