diff --git a/src/backend/langflow/api/v1/endpoints.py b/src/backend/langflow/api/v1/endpoints.py index 999f751d5..692438679 100644 --- a/src/backend/langflow/api/v1/endpoints.py +++ b/src/backend/langflow/api/v1/endpoints.py @@ -1,6 +1,7 @@ from http import HTTPStatus from typing import Annotated, Optional, Union + from langflow.services.cache.utils import save_uploaded_file from langflow.services.database.models.flow import Flow from langflow.processing.process import process_tweaks @@ -13,6 +14,7 @@ from langflow.interface.custom.custom_component import CustomComponent from langflow.api.v1.schemas import ( ProcessResponse, + TaskStatusResponse, UploadFileResponse, CustomComponentCode, ) @@ -29,6 +31,8 @@ from langflow.services.utils import get_session from langflow.worker import process_graph_cached_task from sqlmodel import Session +from celery.result import AsyncResult + # build router router = APIRouter(tags=["Base"]) @@ -96,16 +100,25 @@ async def process_flow( graph_data = process_tweaks(graph_data, tweaks) except Exception as exc: logger.error(f"Error processing tweaks: {exc}") - response, session_id = process_graph_cached_task.delay( + + task: "AsyncResult" = process_graph_cached_task.delay( graph_data, inputs, clear_cache, session_id - ).get() - return ProcessResponse(result=response, session_id=session_id) + ) + return ProcessResponse(result=task.state, id=task.id) except Exception as e: # Log stack trace logger.exception(e) raise HTTPException(status_code=500, detail=str(e)) from e +@router.get("/task/{task_id}/status", response_model=TaskStatusResponse) +async def get_task_status(task_id: str): + task = AsyncResult(task_id) + return TaskStatusResponse( + status=task.status, result=task.result if task.ready() else None + ) + + @router.post( "/upload/{flow_id}", response_model=UploadFileResponse, diff --git a/src/backend/langflow/api/v1/schemas.py b/src/backend/langflow/api/v1/schemas.py index 65bf64dca..f1a1ac902 100644 --- a/src/backend/langflow/api/v1/schemas.py +++ b/src/backend/langflow/api/v1/schemas.py @@ -46,8 +46,18 @@ class UpdateTemplateRequest(BaseModel): class ProcessResponse(BaseModel): """Process response schema.""" - result: dict - session_id: Optional[str] = None + result: Any + id: Optional[str] = None + + +# TaskStatusResponse( +# status=task.status, result=task.result if task.ready() else None +# ) +class TaskStatusResponse(BaseModel): + """Task status response schema.""" + + status: str + result: Optional[Any] = None class ChatMessage(BaseModel):