🔧 fix(endpoints.py): import missing dependencies to resolve NameError
✨ feat(endpoints.py): add new endpoint to get task status by task ID 🔧 fix(schemas.py): update ProcessResponse schema to use 'id' instead of 'session_id' for consistency ✨ feat(schemas.py): add new schema for TaskStatusResponse
This commit is contained in:
parent
298e499f72
commit
3d84aace37
2 changed files with 28 additions and 5 deletions
|
|
@ -1,6 +1,7 @@
|
|||
from http import HTTPStatus
|
||||
from typing import Annotated, Optional, Union
|
||||
|
||||
|
||||
from langflow.services.cache.utils import save_uploaded_file
|
||||
from langflow.services.database.models.flow import Flow
|
||||
from langflow.processing.process import process_tweaks
|
||||
|
|
@ -13,6 +14,7 @@ from langflow.interface.custom.custom_component import CustomComponent
|
|||
|
||||
from langflow.api.v1.schemas import (
|
||||
ProcessResponse,
|
||||
TaskStatusResponse,
|
||||
UploadFileResponse,
|
||||
CustomComponentCode,
|
||||
)
|
||||
|
|
@ -29,6 +31,8 @@ from langflow.services.utils import get_session
|
|||
from langflow.worker import process_graph_cached_task
|
||||
from sqlmodel import Session
|
||||
|
||||
from celery.result import AsyncResult
|
||||
|
||||
# build router
|
||||
router = APIRouter(tags=["Base"])
|
||||
|
||||
|
|
@ -96,16 +100,25 @@ async def process_flow(
|
|||
graph_data = process_tweaks(graph_data, tweaks)
|
||||
except Exception as exc:
|
||||
logger.error(f"Error processing tweaks: {exc}")
|
||||
response, session_id = process_graph_cached_task.delay(
|
||||
|
||||
task: "AsyncResult" = process_graph_cached_task.delay(
|
||||
graph_data, inputs, clear_cache, session_id
|
||||
).get()
|
||||
return ProcessResponse(result=response, session_id=session_id)
|
||||
)
|
||||
return ProcessResponse(result=task.state, id=task.id)
|
||||
except Exception as e:
|
||||
# Log stack trace
|
||||
logger.exception(e)
|
||||
raise HTTPException(status_code=500, detail=str(e)) from e
|
||||
|
||||
|
||||
@router.get("/task/{task_id}/status", response_model=TaskStatusResponse)
|
||||
async def get_task_status(task_id: str):
|
||||
task = AsyncResult(task_id)
|
||||
return TaskStatusResponse(
|
||||
status=task.status, result=task.result if task.ready() else None
|
||||
)
|
||||
|
||||
|
||||
@router.post(
|
||||
"/upload/{flow_id}",
|
||||
response_model=UploadFileResponse,
|
||||
|
|
|
|||
|
|
@ -46,8 +46,18 @@ class UpdateTemplateRequest(BaseModel):
|
|||
class ProcessResponse(BaseModel):
|
||||
"""Process response schema."""
|
||||
|
||||
result: dict
|
||||
session_id: Optional[str] = None
|
||||
result: Any
|
||||
id: Optional[str] = None
|
||||
|
||||
|
||||
# TaskStatusResponse(
|
||||
# status=task.status, result=task.result if task.ready() else None
|
||||
# )
|
||||
class TaskStatusResponse(BaseModel):
|
||||
"""Task status response schema."""
|
||||
|
||||
status: str
|
||||
result: Optional[Any] = None
|
||||
|
||||
|
||||
class ChatMessage(BaseModel):
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue