From e26d61fcfecbf89ac85442a61ab07079f7c4e9fa Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Fri, 1 Sep 2023 17:22:30 -0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20fix(endpoints.py):=20import=20mi?= =?UTF-8?q?ssing=20get=5Ftask=5Fmanager=20function=20to=20fix=20NameError?= =?UTF-8?q?=20=E2=9C=A8=20feat(endpoints.py):=20add=20support=20for=20laun?= =?UTF-8?q?ching=20tasks=20using=20task=20manager=20to=20improve=20task=20?= =?UTF-8?q?management=20=F0=9F=90=9B=20fix(endpoints.py):=20change=20proce?= =?UTF-8?q?ss=5Fgraph=5Fcached=5Ftask=20function=20call=20to=20use=20task?= =?UTF-8?q?=5Fmanager.launch=5Ftask=20to=20fix=20AttributeError=20?= =?UTF-8?q?=F0=9F=90=9B=20fix(endpoints.py):=20change=20process=5Fflow=20f?= =?UTF-8?q?unction=20to=20use=20task=5Fmanager.get=5Ftask=20to=20fix=20Att?= =?UTF-8?q?ributeError=20=F0=9F=90=9B=20fix(endpoints.py):=20change=20get?= =?UTF-8?q?=5Ftask=5Fstatus=20function=20to=20use=20task=5Fmanager.get=5Ft?= =?UTF-8?q?ask=20to=20fix=20AttributeError=20=F0=9F=90=9B=20fix(process.py?= =?UTF-8?q?):=20change=20process=5Fgraph=5Fcached=20function=20to=20be=20a?= =?UTF-8?q?sync=20to=20fix=20TypeError?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/langflow/api/v1/endpoints.py | 25 +++++++++++++++------- src/backend/langflow/processing/process.py | 2 +- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/src/backend/langflow/api/v1/endpoints.py b/src/backend/langflow/api/v1/endpoints.py index 4e8d3a582..988130ba7 100644 --- a/src/backend/langflow/api/v1/endpoints.py +++ b/src/backend/langflow/api/v1/endpoints.py @@ -5,9 +5,9 @@ from langflow.services.auth.utils import api_key_security, get_current_active_us from langflow.services.cache.utils import save_uploaded_file from langflow.services.database.models.flow import Flow -from langflow.processing.process import process_tweaks +from langflow.processing.process import process_graph_cached, process_tweaks from langflow.services.database.models.user.user import User -from langflow.services.utils import get_settings_manager +from langflow.services.utils import get_settings_manager, get_task_manager from langflow.utils.logger import logger from fastapi import APIRouter, Depends, HTTPException, UploadFile, Body, status import sqlalchemy as sa @@ -33,7 +33,6 @@ from langflow.services.utils import get_session from langflow.worker import process_graph_cached_task from sqlmodel import Session -from celery.result import AsyncResult # build router router = APIRouter(tags=["Base"]) @@ -129,11 +128,18 @@ async def process_flow( graph_data = process_tweaks(graph_data, tweaks) except Exception as exc: logger.error(f"Error processing tweaks: {exc}") - - task: "AsyncResult" = process_graph_cached_task.delay( - graph_data, inputs, clear_cache, session_id + task_manager = get_task_manager() + task_id = task_manager.launch_task( + process_graph_cached_task + if task_manager.use_celery + else process_graph_cached, + graph_data, + inputs, + clear_cache, + session_id, ) - return ProcessResponse(result=task.state, id=task.id) + task = task_manager.get_task(task_id) + return ProcessResponse(result=task.status, id=task_id) except sa.exc.StatementError as exc: # StatementError('(builtins.ValueError) badly formed hexadecimal UUID string') if "badly formed hexadecimal UUID string" in str(exc): @@ -158,7 +164,10 @@ async def process_flow( @router.get("/task/{task_id}/status", response_model=TaskStatusResponse) async def get_task_status(task_id: str): - task = AsyncResult(task_id) + task_manager = get_task_manager() + task = task_manager.get_task(task_id) + if task is None: + raise HTTPException(status_code=404, detail="Task not found") return TaskStatusResponse( status=task.status, result=task.result if task.ready() else None ) diff --git a/src/backend/langflow/processing/process.py b/src/backend/langflow/processing/process.py index 014e1db0b..e887c3a0b 100644 --- a/src/backend/langflow/processing/process.py +++ b/src/backend/langflow/processing/process.py @@ -173,7 +173,7 @@ def generate_result(langchain_object: Union[Chain, VectorStore], inputs: dict): # return result, session_id -def process_graph_cached( +async def process_graph_cached( data_graph: Dict[str, Any], inputs: Optional[dict] = None, clear_cache=False,