From 3d84aace37eef716d7a21474245bc0ceb4fc8d03 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sun, 20 Aug 2023 13:24:47 -0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=A7=20fix(endpoints.py):=20import=20mi?= =?UTF-8?q?ssing=20dependencies=20to=20resolve=20NameError=20=E2=9C=A8=20f?= =?UTF-8?q?eat(endpoints.py):=20add=20new=20endpoint=20to=20get=20task=20s?= =?UTF-8?q?tatus=20by=20task=20ID=20=F0=9F=94=A7=20fix(schemas.py):=20upda?= =?UTF-8?q?te=20ProcessResponse=20schema=20to=20use=20'id'=20instead=20of?= =?UTF-8?q?=20'session=5Fid'=20for=20consistency=20=E2=9C=A8=20feat(schema?= =?UTF-8?q?s.py):=20add=20new=20schema=20for=20TaskStatusResponse?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/langflow/api/v1/endpoints.py | 19 ++++++++++++++++--- src/backend/langflow/api/v1/schemas.py | 14 ++++++++++++-- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/src/backend/langflow/api/v1/endpoints.py b/src/backend/langflow/api/v1/endpoints.py index 999f751d5..692438679 100644 --- a/src/backend/langflow/api/v1/endpoints.py +++ b/src/backend/langflow/api/v1/endpoints.py @@ -1,6 +1,7 @@ from http import HTTPStatus from typing import Annotated, Optional, Union + from langflow.services.cache.utils import save_uploaded_file from langflow.services.database.models.flow import Flow from langflow.processing.process import process_tweaks @@ -13,6 +14,7 @@ from langflow.interface.custom.custom_component import CustomComponent from langflow.api.v1.schemas import ( ProcessResponse, + TaskStatusResponse, UploadFileResponse, CustomComponentCode, ) @@ -29,6 +31,8 @@ from langflow.services.utils import get_session from langflow.worker import process_graph_cached_task from sqlmodel import Session +from celery.result import AsyncResult + # build router router = APIRouter(tags=["Base"]) @@ -96,16 +100,25 @@ async def process_flow( graph_data = process_tweaks(graph_data, tweaks) except Exception as exc: logger.error(f"Error processing tweaks: {exc}") - response, session_id = process_graph_cached_task.delay( + + task: "AsyncResult" = process_graph_cached_task.delay( graph_data, inputs, clear_cache, session_id - ).get() - return ProcessResponse(result=response, session_id=session_id) + ) + return ProcessResponse(result=task.state, id=task.id) except Exception as e: # Log stack trace logger.exception(e) raise HTTPException(status_code=500, detail=str(e)) from e +@router.get("/task/{task_id}/status", response_model=TaskStatusResponse) +async def get_task_status(task_id: str): + task = AsyncResult(task_id) + return TaskStatusResponse( + status=task.status, result=task.result if task.ready() else None + ) + + @router.post( "/upload/{flow_id}", response_model=UploadFileResponse, diff --git a/src/backend/langflow/api/v1/schemas.py b/src/backend/langflow/api/v1/schemas.py index 65bf64dca..f1a1ac902 100644 --- a/src/backend/langflow/api/v1/schemas.py +++ b/src/backend/langflow/api/v1/schemas.py @@ -46,8 +46,18 @@ class UpdateTemplateRequest(BaseModel): class ProcessResponse(BaseModel): """Process response schema.""" - result: dict - session_id: Optional[str] = None + result: Any + id: Optional[str] = None + + +# TaskStatusResponse( +# status=task.status, result=task.result if task.ready() else None +# ) +class TaskStatusResponse(BaseModel): + """Task status response schema.""" + + status: str + result: Optional[Any] = None class ChatMessage(BaseModel):