diff --git a/src/backend/base/langflow/api/utils.py b/src/backend/base/langflow/api/utils.py index 22efc62b8..f7c34661b 100644 --- a/src/backend/base/langflow/api/utils.py +++ b/src/backend/base/langflow/api/utils.py @@ -266,13 +266,13 @@ def parse_value(value: Any, input_type: str) -> Any: return value -def cascade_delete_flow(session: Session, flow: Flow) -> None: +async def cascade_delete_flow(session: AsyncSession, flow_id: uuid.UUID) -> None: try: - session.exec(delete(TransactionTable).where(TransactionTable.flow_id == flow.id)) - session.exec(delete(VertexBuildTable).where(VertexBuildTable.flow_id == flow.id)) - session.exec(delete(Flow).where(Flow.id == flow.id)) + await session.exec(delete(TransactionTable).where(TransactionTable.flow_id == flow_id)) + await session.exec(delete(VertexBuildTable).where(VertexBuildTable.flow_id == flow_id)) + await session.exec(delete(Flow).where(Flow.id == flow_id)) except Exception as e: - msg = f"Unable to cascade delete flow: ${flow.id}" + msg = f"Unable to cascade delete flow: ${flow_id}" raise RuntimeError(msg, e) from e diff --git a/src/backend/base/langflow/api/v1/files.py b/src/backend/base/langflow/api/v1/files.py index 8076ba96b..f04e77382 100644 --- a/src/backend/base/langflow/api/v1/files.py +++ b/src/backend/base/langflow/api/v1/files.py @@ -9,7 +9,7 @@ from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, UploadFile from fastapi.responses import StreamingResponse -from langflow.api.utils import CurrentActiveUser, DbSession +from langflow.api.utils import AsyncDbSession, CurrentActiveUser from langflow.api.v1.schemas import UploadFileResponse from langflow.services.database.models.flow import Flow from langflow.services.deps import get_settings_service, get_storage_service @@ -22,14 +22,14 @@ router = APIRouter(tags=["Files"], prefix="/files") # Create dep that gets the flow_id from the request # then finds it in the database and returns it while # using the current user as the owner -def get_flow_id( +async def get_flow_id( flow_id: UUID, current_user: CurrentActiveUser, - session: DbSession, + session: AsyncDbSession, ): flow_id_str = str(flow_id) # AttributeError: 'SelectOfScalar' object has no attribute 'first' - flow = session.get(Flow, flow_id_str) + flow = await session.get(Flow, flow_id_str) if not flow: raise HTTPException(status_code=404, detail="Flow not found") if flow.user_id != current_user.id: @@ -43,7 +43,7 @@ async def upload_file( file: UploadFile, flow_id: Annotated[UUID, Depends(get_flow_id)], current_user: CurrentActiveUser, - session: DbSession, + session: AsyncDbSession, storage_service: Annotated[StorageService, Depends(get_storage_service)], ) -> UploadFileResponse: try: @@ -58,7 +58,7 @@ async def upload_file( try: flow_id_str = str(flow_id) - flow = session.get(Flow, flow_id_str) + flow = await session.get(Flow, flow_id_str) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e diff --git a/src/backend/base/langflow/api/v1/flows.py b/src/backend/base/langflow/api/v1/flows.py index 8f51396f2..7a999dea9 100644 --- a/src/backend/base/langflow/api/v1/flows.py +++ b/src/backend/base/langflow/api/v1/flows.py @@ -14,9 +14,16 @@ from fastapi.encoders import jsonable_encoder from fastapi.responses import StreamingResponse from fastapi_pagination import Page, Params, add_pagination from fastapi_pagination.ext.sqlalchemy import paginate -from sqlmodel import Session, and_, col, select +from sqlmodel import and_, col, select +from sqlmodel.ext.asyncio.session import AsyncSession -from langflow.api.utils import CurrentActiveUser, DbSession, cascade_delete_flow, remove_api_keys, validate_is_component +from langflow.api.utils import ( + AsyncDbSession, + CurrentActiveUser, + cascade_delete_flow, + remove_api_keys, + validate_is_component, +) from langflow.api.v1.schemas import FlowListCreate from langflow.initial_setup.setup import STARTER_FOLDER_NAME from langflow.services.database.models.flow import Flow, FlowCreate, FlowRead, FlowUpdate @@ -25,7 +32,6 @@ from langflow.services.database.models.flow.utils import get_webhook_component_i from langflow.services.database.models.folder.constants import DEFAULT_FOLDER_NAME from langflow.services.database.models.folder.model import Folder from langflow.services.database.models.transactions.crud import get_transactions_by_flow_id -from langflow.services.database.models.user.model import User from langflow.services.database.models.vertex_builds.crud import get_vertex_builds_by_flow_id from langflow.services.deps import get_settings_service from langflow.services.settings.service import SettingsService @@ -34,26 +40,27 @@ from langflow.services.settings.service import SettingsService router = APIRouter(prefix="/flows", tags=["Flows"]) -@router.post("/", response_model=FlowRead, status_code=201) -async def create_flow( +async def _new_flow( *, - session: DbSession, + session: AsyncSession, flow: FlowCreate, - current_user: CurrentActiveUser, + user_id: UUID, ): try: """Create a new flow.""" if flow.user_id is None: - flow.user_id = current_user.id + flow.user_id = user_id # First check if the flow.name is unique # there might be flows with name like: "MyFlow", "MyFlow (1)", "MyFlow (2)" # so we need to check if the name is unique with `like` operator # if we find a flow with the same name, we add a number to the end of the name # based on the highest number found - if session.exec(select(Flow).where(Flow.name == flow.name).where(Flow.user_id == current_user.id)).first(): - flows = session.exec( - select(Flow).where(Flow.name.like(f"{flow.name} (%")).where(Flow.user_id == current_user.id) # type: ignore[attr-defined] + if (await session.exec(select(Flow).where(Flow.name == flow.name).where(Flow.user_id == user_id))).first(): + flows = ( + await session.exec( + select(Flow).where(Flow.name.like(f"{flow.name} (%")).where(Flow.user_id == user_id) # type: ignore[attr-defined] + ) ).all() if flows: extract_number = re.compile(r"\((\d+)\)$") @@ -69,17 +76,21 @@ async def create_flow( # Now check if the endpoint is unique if ( flow.endpoint_name - and session.exec( - select(Flow).where(Flow.endpoint_name == flow.endpoint_name).where(Flow.user_id == current_user.id) + and ( + await session.exec( + select(Flow).where(Flow.endpoint_name == flow.endpoint_name).where(Flow.user_id == user_id) + ) ).first() ): - flows = session.exec( - select(Flow) - .where(Flow.endpoint_name.like(f"{flow.endpoint_name}-%")) # type: ignore[union-attr] - .where(Flow.user_id == current_user.id) + flows = ( + await session.exec( + select(Flow) + .where(Flow.endpoint_name.like(f"{flow.endpoint_name}-%")) # type: ignore[union-attr] + .where(Flow.user_id == user_id) + ) ).all() if flows: - # The endpoitn name is like "my-endpoint","my-endpoint-1", "my-endpoint-2" + # The endpoint name is like "my-endpoint","my-endpoint-1", "my-endpoint-2" # so we need to get the highest number and add 1 # we need to get the last part of the endpoint name numbers = [int(flow.endpoint_name.split("-")[-1]) for flow in flows] @@ -92,19 +103,36 @@ async def create_flow( if db_flow.folder_id is None: # Make sure flows always have a folder - default_folder = session.exec( - select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME, Folder.user_id == current_user.id) + default_folder = ( + await session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME, Folder.user_id == user_id)) ).first() if default_folder: db_flow.folder_id = default_folder.id session.add(db_flow) - session.commit() - session.refresh(db_flow) except Exception as e: # If it is a validation error, return the error message if hasattr(e, "errors"): raise HTTPException(status_code=400, detail=str(e)) from e + if isinstance(e, HTTPException): + raise + raise HTTPException(status_code=500, detail=str(e)) from e + + return db_flow + + +@router.post("/", response_model=FlowRead, status_code=201) +async def create_flow( + *, + session: AsyncDbSession, + flow: FlowCreate, + current_user: CurrentActiveUser, +): + try: + db_flow = await _new_flow(session=session, flow=flow, user_id=current_user.id) + await session.commit() + await session.refresh(db_flow) + except Exception as e: if "UNIQUE constraint failed" in str(e): # Get the name of the column that failed columns = str(e).split("UNIQUE constraint failed: ")[1].split(".")[1].split("\n")[0] @@ -119,7 +147,6 @@ async def create_flow( if isinstance(e, HTTPException): raise raise HTTPException(status_code=500, detail=str(e)) from e - return db_flow @@ -127,7 +154,7 @@ async def create_flow( async def read_flows( *, current_user: CurrentActiveUser, - session: DbSession, + session: AsyncDbSession, remove_example_flows: bool = False, components_only: bool = False, get_all: bool = True, @@ -158,10 +185,10 @@ async def read_flows( try: auth_settings = get_settings_service().auth_settings - default_folder = session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME)).first() + default_folder = (await session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME))).first() default_folder_id = default_folder.id if default_folder else None - starter_folder = session.exec(select(Folder).where(Folder.name == STARTER_FOLDER_NAME)).first() + starter_folder = (await session.exec(select(Folder).where(Folder.name == STARTER_FOLDER_NAME))).first() starter_folder_id = starter_folder.id if starter_folder else None if not starter_folder and not default_folder: @@ -187,7 +214,7 @@ async def read_flows( stmt = stmt.where(Flow.is_component == True) # noqa: E712 if get_all: - flows = session.exec(stmt).all() + flows = (await session.exec(stmt)).all() flows = validate_is_component(flows) if components_only: flows = [flow for flow in flows if flow.is_component] @@ -201,16 +228,16 @@ async def read_flows( return flows stmt = stmt.where(Flow.folder_id == folder_id) - return paginate(session, stmt, params=params) + return await paginate(session, stmt, params=params) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e -def _read_flow( - session: Session, +async def _read_flow( + session: AsyncSession, flow_id: UUID, - current_user: User, + user_id: UUID, settings_service: SettingsService, ): """Read a flow.""" @@ -220,20 +247,20 @@ def _read_flow( # If auto login is enable user_id can be current_user.id or None # so write an OR stmt = stmt.where( - (Flow.user_id == current_user.id) | (Flow.user_id == None) # noqa: E711 + (Flow.user_id == user_id) | (Flow.user_id == None) # noqa: E711 ) - return session.exec(stmt).first() + return (await session.exec(stmt)).first() @router.get("/{flow_id}", response_model=FlowRead, status_code=200) async def read_flow( *, - session: DbSession, + session: AsyncDbSession, flow_id: UUID, current_user: CurrentActiveUser, ): """Read a flow.""" - if user_flow := _read_flow(session, flow_id, current_user, get_settings_service()): + if user_flow := await _read_flow(session, flow_id, current_user.id, get_settings_service()): return user_flow raise HTTPException(status_code=404, detail="Flow not found") @@ -241,7 +268,7 @@ async def read_flow( @router.patch("/{flow_id}", response_model=FlowRead, status_code=200) async def update_flow( *, - session: DbSession, + session: AsyncDbSession, flow_id: UUID, flow: FlowUpdate, current_user: CurrentActiveUser, @@ -249,10 +276,10 @@ async def update_flow( """Update a flow.""" settings_service = get_settings_service() try: - db_flow = _read_flow( + db_flow = await _read_flow( session=session, flow_id=flow_id, - current_user=current_user, + user_id=current_user.id, settings_service=settings_service, ) except Exception as e: @@ -272,12 +299,12 @@ async def update_flow( db_flow.updated_at = datetime.now(timezone.utc) if db_flow.folder_id is None: - default_folder = session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME)).first() + default_folder = (await session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME))).first() if default_folder: db_flow.folder_id = default_folder.id session.add(db_flow) - session.commit() - session.refresh(db_flow) + await session.commit() + await session.refresh(db_flow) except Exception as e: # If it is a validation error, return the error message if hasattr(e, "errors"): @@ -299,30 +326,30 @@ async def update_flow( @router.delete("/{flow_id}", status_code=200) -def delete_flow( +async def delete_flow( *, - session: DbSession, + session: AsyncDbSession, flow_id: UUID, current_user: CurrentActiveUser, ): """Delete a flow.""" - flow = _read_flow( + flow = await _read_flow( session=session, flow_id=flow_id, - current_user=current_user, + user_id=current_user.id, settings_service=get_settings_service(), ) if not flow: raise HTTPException(status_code=404, detail="Flow not found") - cascade_delete_flow(session, flow) - session.commit() + await cascade_delete_flow(session, flow.id) + await session.commit() return {"message": "Flow deleted successfully"} @router.post("/batch/", response_model=list[FlowRead], status_code=201) async def create_flows( *, - session: DbSession, + session: AsyncDbSession, flow_list: FlowListCreate, current_user: CurrentActiveUser, ): @@ -333,16 +360,16 @@ async def create_flows( db_flow = Flow.model_validate(flow, from_attributes=True) session.add(db_flow) db_flows.append(db_flow) - session.commit() + await session.commit() for db_flow in db_flows: - session.refresh(db_flow) + await session.refresh(db_flow) return db_flows @router.post("/upload/", response_model=list[FlowRead], status_code=201) async def upload_file( *, - session: DbSession, + session: AsyncDbSession, file: Annotated[UploadFile, File(...)], current_user: CurrentActiveUser, folder_id: UUID | None = None, @@ -357,9 +384,29 @@ async def upload_file( flow.user_id = current_user.id if folder_id: flow.folder_id = folder_id - response = await create_flow(session=session, flow=flow, current_user=current_user) + response = await _new_flow(session=session, flow=flow, user_id=current_user.id) response_list.append(response) + try: + await session.commit() + for db_flow in response_list: + await session.refresh(db_flow) + except Exception as e: + if "UNIQUE constraint failed" in str(e): + # Get the name of the column that failed + columns = str(e).split("UNIQUE constraint failed: ")[1].split(".")[1].split("\n")[0] + # UNIQUE constraint failed: flow.user_id, flow.name + # or UNIQUE constraint failed: flow.name + # if the column has id in it, we want the other column + column = columns.split(",")[1] if "id" in columns.split(",")[0] else columns.split(",")[0] + + raise HTTPException( + status_code=400, detail=f"{column.capitalize().replace('_', ' ')} must be unique" + ) from e + if isinstance(e, HTTPException): + raise + raise HTTPException(status_code=500, detail=str(e)) from e + return response_list @@ -367,7 +414,7 @@ async def upload_file( async def delete_multiple_flows( flow_ids: list[UUID], user: CurrentActiveUser, - db: DbSession, + db: AsyncDbSession, ): """Delete multiple flows by their IDs. @@ -381,19 +428,21 @@ async def delete_multiple_flows( """ try: - flows_to_delete = db.exec(select(Flow).where(col(Flow.id).in_(flow_ids)).where(Flow.user_id == user.id)).all() + flows_to_delete = ( + await db.exec(select(Flow).where(col(Flow.id).in_(flow_ids)).where(Flow.user_id == user.id)) + ).all() for flow in flows_to_delete: - transactions_to_delete = get_transactions_by_flow_id(db, flow.id) + transactions_to_delete = await get_transactions_by_flow_id(db, flow.id) for transaction in transactions_to_delete: - db.delete(transaction) + await db.delete(transaction) - builds_to_delete = get_vertex_builds_by_flow_id(db, flow.id) + builds_to_delete = await get_vertex_builds_by_flow_id(db, flow.id) for build in builds_to_delete: - db.delete(build) + await db.delete(build) - db.delete(flow) + await db.delete(flow) - db.commit() + await db.commit() return {"deleted": len(flows_to_delete)} except Exception as exc: raise HTTPException(status_code=500, detail=str(exc)) from exc @@ -403,10 +452,10 @@ async def delete_multiple_flows( async def download_multiple_file( flow_ids: list[UUID], user: CurrentActiveUser, - db: DbSession, + db: AsyncDbSession, ): """Download all flows as a zip file.""" - flows = db.exec(select(Flow).where(and_(Flow.user_id == user.id, Flow.id.in_(flow_ids)))).all() # type: ignore[attr-defined] + flows = (await db.exec(select(Flow).where(and_(Flow.user_id == user.id, Flow.id.in_(flow_ids))))).all() # type: ignore[attr-defined] if not flows: raise HTTPException(status_code=404, detail="No flows found.") @@ -444,7 +493,7 @@ async def download_multiple_file( @router.get("/basic_examples/", response_model=list[FlowRead], status_code=200) async def read_basic_examples( *, - session: DbSession, + session: AsyncDbSession, ): """Retrieve a list of basic example flows. @@ -456,13 +505,13 @@ async def read_basic_examples( """ try: # Get the starter folder - starter_folder = session.exec(select(Folder).where(Folder.name == STARTER_FOLDER_NAME)).first() + starter_folder = (await session.exec(select(Folder).where(Folder.name == STARTER_FOLDER_NAME))).first() if not starter_folder: return [] # Get all flows in the starter folder - return session.exec(select(Flow).where(Flow.folder_id == starter_folder.id)).all() + return (await session.exec(select(Flow).where(Flow.folder_id == starter_folder.id))).all() except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e diff --git a/src/backend/base/langflow/api/v1/folders.py b/src/backend/base/langflow/api/v1/folders.py index 91f67921c..910cfa19b 100644 --- a/src/backend/base/langflow/api/v1/folders.py +++ b/src/backend/base/langflow/api/v1/folders.py @@ -5,9 +5,10 @@ from fastapi import APIRouter, Depends, File, HTTPException, Response, UploadFil from fastapi_pagination import Params from fastapi_pagination.ext.sqlmodel import paginate from sqlalchemy import or_, update +from sqlalchemy.orm import selectinload from sqlmodel import select -from langflow.api.utils import CurrentActiveUser, DbSession, cascade_delete_flow, custom_params +from langflow.api.utils import AsyncDbSession, CurrentActiveUser, cascade_delete_flow, custom_params from langflow.api.v1.flows import create_flows from langflow.api.v1.schemas import FlowListCreate, FlowListReadWithFolderName from langflow.helpers.flow import generate_unique_flow_name @@ -30,7 +31,7 @@ router = APIRouter(prefix="/folders", tags=["Folders"]) @router.post("/", response_model=FolderRead, status_code=201) async def create_folder( *, - session: DbSession, + session: AsyncDbSession, folder: FolderCreate, current_user: CurrentActiveUser, ): @@ -42,10 +43,12 @@ async def create_folder( # so we need to check if the name is unique with `like` operator # if we find a flow with the same name, we add a number to the end of the name # based on the highest number found - if session.exec( - statement=select(Folder).where(Folder.name == new_folder.name).where(Folder.user_id == current_user.id) + if ( + await session.exec( + statement=select(Folder).where(Folder.name == new_folder.name).where(Folder.user_id == current_user.id) + ) ).first(): - folder_results = session.exec( + folder_results = await session.exec( select(Folder).where( Folder.name.like(f"{new_folder.name}%"), # type: ignore[attr-defined] Folder.user_id == current_user.id, @@ -60,20 +63,20 @@ async def create_folder( new_folder.name = f"{new_folder.name} (1)" session.add(new_folder) - session.commit() - session.refresh(new_folder) + await session.commit() + await session.refresh(new_folder) if folder.components_list: update_statement_components = ( update(Flow).where(Flow.id.in_(folder.components_list)).values(folder_id=new_folder.id) # type: ignore[attr-defined] ) - session.exec(update_statement_components) - session.commit() + await session.exec(update_statement_components) + await session.commit() if folder.flows_list: update_statement_flows = update(Flow).where(Flow.id.in_(folder.flows_list)).values(folder_id=new_folder.id) # type: ignore[attr-defined] - session.exec(update_statement_flows) - session.commit() + await session.exec(update_statement_flows) + await session.commit() except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @@ -84,13 +87,15 @@ async def create_folder( @router.get("/", response_model=list[FolderRead], status_code=200) async def read_folders( *, - session: DbSession, + session: AsyncDbSession, current_user: CurrentActiveUser, ): try: - folders = session.exec( - select(Folder).where( - or_(Folder.user_id == current_user.id, Folder.user_id == None) # noqa: E711 + folders = ( + await session.exec( + select(Folder).where( + or_(Folder.user_id == current_user.id, Folder.user_id == None) # noqa: E711 + ) ) ).all() folders = [folder for folder in folders if folder.name != STARTER_FOLDER_NAME] @@ -102,7 +107,7 @@ async def read_folders( @router.get("/{folder_id}", response_model=FolderWithPaginatedFlows | FolderReadWithFlows, status_code=200) async def read_folder( *, - session: DbSession, + session: AsyncDbSession, folder_id: str, current_user: CurrentActiveUser, params: Annotated[Params | None, Depends(custom_params)], @@ -111,7 +116,13 @@ async def read_folder( search: str = "", ): try: - folder = session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)).first() + folder = ( + await session.exec( + select(Folder) + .options(selectinload(Folder.flows)) + .where(Folder.id == folder_id, Folder.user_id == current_user.id) + ) + ).first() except Exception as e: if "No result found" in str(e): raise HTTPException(status_code=404, detail="Folder not found") from e @@ -132,29 +143,29 @@ async def read_folder( stmt = stmt.where(Flow.is_component == False) # noqa: E712 if search: stmt = stmt.where(Flow.name.like(f"%{search}%")) # type: ignore[attr-defined] - paginated_flows = paginate(session, stmt, params=params) + paginated_flows = await paginate(session, stmt, params=params) return FolderWithPaginatedFlows(folder=FolderRead.model_validate(folder), flows=paginated_flows) - flows_from_current_user_in_folder = [flow for flow in folder.flows if flow.user_id == current_user.id] - folder.flows = flows_from_current_user_in_folder - return folder # noqa: TRY300 - except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e + flows_from_current_user_in_folder = [flow for flow in folder.flows if flow.user_id == current_user.id] + folder.flows = flows_from_current_user_in_folder + return folder + @router.patch("/{folder_id}", response_model=FolderRead, status_code=200) async def update_folder( *, - session: DbSession, + session: AsyncDbSession, folder_id: str, folder: FolderUpdate, # Assuming FolderUpdate is a Pydantic model defining updatable fields current_user: CurrentActiveUser, ): try: - existing_folder = session.exec( - select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id) + existing_folder = ( + await session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)) ).first() except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @@ -166,8 +177,8 @@ async def update_folder( if folder.name and folder.name != existing_folder.name: existing_folder.name = folder.name session.add(existing_folder) - session.commit() - session.refresh(existing_folder) + await session.commit() + await session.refresh(existing_folder) return existing_folder folder_data = existing_folder.model_dump(exclude_unset=True) @@ -175,29 +186,29 @@ async def update_folder( if key not in {"components", "flows"}: setattr(existing_folder, key, value) session.add(existing_folder) - session.commit() - session.refresh(existing_folder) + await session.commit() + await session.refresh(existing_folder) concat_folder_components = folder.components + folder.flows - flows_ids = session.exec(select(Flow.id).where(Flow.folder_id == existing_folder.id)).all() + flows_ids = (await session.exec(select(Flow.id).where(Flow.folder_id == existing_folder.id))).all() excluded_flows = list(set(flows_ids) - set(concat_folder_components)) - my_collection_folder = session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME)).first() + my_collection_folder = (await session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME))).first() if my_collection_folder: update_statement_my_collection = ( update(Flow).where(Flow.id.in_(excluded_flows)).values(folder_id=my_collection_folder.id) # type: ignore[attr-defined] ) - session.exec(update_statement_my_collection) - session.commit() + await session.exec(update_statement_my_collection) + await session.commit() if concat_folder_components: update_statement_components = ( update(Flow).where(Flow.id.in_(concat_folder_components)).values(folder_id=existing_folder.id) # type: ignore[attr-defined] ) - session.exec(update_statement_components) - session.commit() + await session.exec(update_statement_components) + await session.commit() except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @@ -206,19 +217,23 @@ async def update_folder( @router.delete("/{folder_id}", status_code=204) -def delete_folder( +async def delete_folder( *, - session: DbSession, + session: AsyncDbSession, folder_id: str, current_user: CurrentActiveUser, ): try: - flows = session.exec(select(Flow).where(Flow.folder_id == folder_id, Flow.user_id == current_user.id)).all() + flows = ( + await session.exec(select(Flow).where(Flow.folder_id == folder_id, Flow.user_id == current_user.id)) + ).all() if len(flows) > 0: for flow in flows: - cascade_delete_flow(session, flow) + await cascade_delete_flow(session, flow.id) - folder = session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)).first() + folder = ( + await session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)) + ).first() except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @@ -226,8 +241,8 @@ def delete_folder( raise HTTPException(status_code=404, detail="Folder not found") try: - session.delete(folder) - session.commit() + await session.delete(folder) + await session.commit() return Response(status_code=status.HTTP_204_NO_CONTENT) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @@ -236,13 +251,15 @@ def delete_folder( @router.get("/download/{folder_id}", response_model=FlowListReadWithFolderName, status_code=200) async def download_file( *, - session: DbSession, + session: AsyncDbSession, folder_id: str, current_user: CurrentActiveUser, ): """Download all flows from folder.""" try: - folder = session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)).first() + folder = ( + await session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)) + ).first() except Exception as e: if "No result found" in str(e): raise HTTPException(status_code=404, detail="Folder not found") from e @@ -257,7 +274,7 @@ async def download_file( @router.post("/upload/", response_model=list[FlowRead], status_code=201) async def upload_file( *, - session: DbSession, + session: AsyncDbSession, file: Annotated[UploadFile, File(...)], current_user: CurrentActiveUser, ): @@ -268,7 +285,7 @@ async def upload_file( if not data: raise HTTPException(status_code=400, detail="No flows found in the file") - folder_name = generate_unique_folder_name(data["folder_name"], current_user.id, session) + folder_name = await generate_unique_folder_name(data["folder_name"], current_user.id, session) data["folder_name"] = folder_name @@ -278,8 +295,8 @@ async def upload_file( new_folder.id = None new_folder.user_id = current_user.id session.add(new_folder) - session.commit() - session.refresh(new_folder) + await session.commit() + await session.refresh(new_folder) del data["folder_name"] del data["folder_description"] @@ -290,9 +307,9 @@ async def upload_file( raise HTTPException(status_code=400, detail="No flows found in the data") # Now we set the user_id for all flows for flow in flow_list.flows: - flow_name = generate_unique_flow_name(flow.name, current_user.id, session) + flow_name = await generate_unique_flow_name(flow.name, current_user.id, session) flow.name = flow_name flow.user_id = current_user.id flow.folder_id = new_folder.id - return create_flows(session=session, flow_list=flow_list, current_user=current_user) + return await create_flows(session=session, flow_list=flow_list, current_user=current_user) diff --git a/src/backend/base/langflow/api/v1/monitor.py b/src/backend/base/langflow/api/v1/monitor.py index c2a01e454..cf7e28a55 100644 --- a/src/backend/base/langflow/api/v1/monitor.py +++ b/src/backend/base/langflow/api/v1/monitor.py @@ -5,7 +5,7 @@ from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy import delete from sqlmodel import col, select -from langflow.api.utils import DbSession +from langflow.api.utils import AsyncDbSession, DbSession from langflow.schema.message import MessageResponse from langflow.services.auth.utils import get_current_active_user from langflow.services.database.models.message.model import MessageRead, MessageTable, MessageUpdate @@ -21,16 +21,16 @@ router = APIRouter(prefix="/monitor", tags=["Monitor"]) @router.get("/builds") -async def get_vertex_builds(flow_id: Annotated[UUID, Query()], session: DbSession) -> VertexBuildMapModel: +async def get_vertex_builds(flow_id: Annotated[UUID, Query()], session: AsyncDbSession) -> VertexBuildMapModel: try: - vertex_builds = get_vertex_builds_by_flow_id(session, flow_id) + vertex_builds = await get_vertex_builds_by_flow_id(session, flow_id) return VertexBuildMapModel.from_list_of_dicts(vertex_builds) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @router.delete("/builds", status_code=204) -async def delete_vertex_builds(flow_id: Annotated[UUID, Query()], session: DbSession) -> None: +def delete_vertex_builds(flow_id: Annotated[UUID, Query()], session: DbSession) -> None: try: delete_vertex_builds_by_flow_id(session, flow_id) except Exception as e: @@ -39,7 +39,7 @@ async def delete_vertex_builds(flow_id: Annotated[UUID, Query()], session: DbSes @router.get("/messages") async def get_messages( - session: DbSession, + session: AsyncDbSession, flow_id: Annotated[str | None, Query()] = None, session_id: Annotated[str | None, Query()] = None, sender: Annotated[str | None, Query()] = None, @@ -59,17 +59,17 @@ async def get_messages( if order_by: col = getattr(MessageTable, order_by).asc() stmt = stmt.order_by(col) - messages = session.exec(stmt) + messages = await session.exec(stmt) return [MessageResponse.model_validate(d, from_attributes=True) for d in messages] except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @router.delete("/messages", status_code=204, dependencies=[Depends(get_current_active_user)]) -async def delete_messages(message_ids: list[UUID], session: DbSession) -> None: +async def delete_messages(message_ids: list[UUID], session: AsyncDbSession) -> None: try: - session.exec(delete(MessageTable).where(MessageTable.id.in_(message_ids))) # type: ignore[attr-defined] - session.commit() + await session.exec(delete(MessageTable).where(MessageTable.id.in_(message_ids))) # type: ignore[attr-defined] + await session.commit() except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @@ -78,10 +78,10 @@ async def delete_messages(message_ids: list[UUID], session: DbSession) -> None: async def update_message( message_id: UUID, message: MessageUpdate, - session: DbSession, + session: AsyncDbSession, ): try: - db_message = session.get(MessageTable, message_id) + db_message = await session.get(MessageTable, message_id) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @@ -93,8 +93,8 @@ async def update_message( message_dict["edit"] = True db_message.sqlmodel_update(message_dict) session.add(db_message) - session.commit() - session.refresh(db_message) + await session.commit() + await session.refresh(db_message) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @@ -108,12 +108,12 @@ async def update_message( async def update_session_id( old_session_id: str, new_session_id: Annotated[str, Query(..., description="The new session ID to update to")], - session: DbSession, + session: AsyncDbSession, ) -> list[MessageResponse]: try: # Get all messages with the old session ID stmt = select(MessageTable).where(MessageTable.session_id == old_session_id) - messages = session.exec(stmt).all() + messages = (await session.exec(stmt)).all() except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @@ -127,10 +127,10 @@ async def update_session_id( session.add_all(messages) - session.commit() + await session.commit() message_responses = [] for message in messages: - session.refresh(message) + await session.refresh(message) message_responses.append(MessageResponse.model_validate(message, from_attributes=True)) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @@ -141,15 +141,15 @@ async def update_session_id( @router.delete("/messages/session/{session_id}", status_code=204) async def delete_messages_session( session_id: str, - session: DbSession, + session: AsyncDbSession, ): try: - session.exec( + await session.exec( delete(MessageTable) .where(col(MessageTable.session_id) == session_id) .execution_options(synchronize_session="fetch") ) - session.commit() + await session.commit() except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @@ -159,10 +159,10 @@ async def delete_messages_session( @router.get("/transactions") async def get_transactions( flow_id: Annotated[UUID, Query()], - session: DbSession, + session: AsyncDbSession, ) -> list[TransactionReadResponse]: try: - transactions = get_transactions_by_flow_id(session, flow_id) + transactions = await get_transactions_by_flow_id(session, flow_id) return [ TransactionReadResponse( transaction_id=t.id, diff --git a/src/backend/base/langflow/helpers/flow.py b/src/backend/base/langflow/helpers/flow.py index 70535c4a6..0417a6831 100644 --- a/src/backend/base/langflow/helpers/flow.py +++ b/src/backend/base/langflow/helpers/flow.py @@ -283,15 +283,17 @@ def get_flow_by_id_or_endpoint_name(flow_id_or_name: str, user_id: UUID | None = return FlowRead.model_validate(flow, from_attributes=True) -def generate_unique_flow_name(flow_name, user_id, session): +async def generate_unique_flow_name(flow_name, user_id, session): original_name = flow_name n = 1 while True: # Check if a flow with the given name exists - existing_flow = session.exec( - select(Flow).where( - Flow.name == flow_name, - Flow.user_id == user_id, + existing_flow = ( + await session.exec( + select(Flow).where( + Flow.name == flow_name, + Flow.user_id == user_id, + ) ) ).first() diff --git a/src/backend/base/langflow/helpers/folders.py b/src/backend/base/langflow/helpers/folders.py index c066c4e24..be13b0d39 100644 --- a/src/backend/base/langflow/helpers/folders.py +++ b/src/backend/base/langflow/helpers/folders.py @@ -3,15 +3,17 @@ from sqlalchemy import select from langflow.services.database.models.folder.model import Folder -def generate_unique_folder_name(folder_name, user_id, session): +async def generate_unique_folder_name(folder_name, user_id, session): original_name = folder_name n = 1 while True: # Check if a folder with the given name exists - existing_folder = session.exec( - select(Folder).where( - Folder.name == folder_name, - Folder.user_id == user_id, + existing_folder = ( + await session.exec( + select(Folder).where( + Folder.name == folder_name, + Folder.user_id == user_id, + ) ) ).first() diff --git a/src/backend/base/langflow/services/database/models/transactions/crud.py b/src/backend/base/langflow/services/database/models/transactions/crud.py index a4dfe946b..006e6d9bb 100644 --- a/src/backend/base/langflow/services/database/models/transactions/crud.py +++ b/src/backend/base/langflow/services/database/models/transactions/crud.py @@ -2,11 +2,14 @@ from uuid import UUID from sqlalchemy.exc import IntegrityError from sqlmodel import Session, col, select +from sqlmodel.ext.asyncio.session import AsyncSession from langflow.services.database.models.transactions.model import TransactionBase, TransactionTable -def get_transactions_by_flow_id(db: Session, flow_id: UUID, limit: int | None = 1000) -> list[TransactionTable]: +async def get_transactions_by_flow_id( + db: AsyncSession, flow_id: UUID, limit: int | None = 1000 +) -> list[TransactionTable]: stmt = ( select(TransactionTable) .where(TransactionTable.flow_id == flow_id) @@ -14,7 +17,7 @@ def get_transactions_by_flow_id(db: Session, flow_id: UUID, limit: int | None = .limit(limit) ) - transactions = db.exec(stmt) + transactions = await db.exec(stmt) return list(transactions) diff --git a/src/backend/base/langflow/services/database/models/vertex_builds/crud.py b/src/backend/base/langflow/services/database/models/vertex_builds/crud.py index a4cf5c2bf..286640465 100644 --- a/src/backend/base/langflow/services/database/models/vertex_builds/crud.py +++ b/src/backend/base/langflow/services/database/models/vertex_builds/crud.py @@ -2,11 +2,14 @@ from uuid import UUID from sqlalchemy.exc import IntegrityError from sqlmodel import Session, col, delete, select +from sqlmodel.ext.asyncio.session import AsyncSession from langflow.services.database.models.vertex_builds.model import VertexBuildBase, VertexBuildTable -def get_vertex_builds_by_flow_id(db: Session, flow_id: UUID, limit: int | None = 1000) -> list[VertexBuildTable]: +async def get_vertex_builds_by_flow_id( + db: AsyncSession, flow_id: UUID, limit: int | None = 1000 +) -> list[VertexBuildTable]: stmt = ( select(VertexBuildTable) .where(VertexBuildTable.flow_id == flow_id) @@ -14,7 +17,7 @@ def get_vertex_builds_by_flow_id(db: Session, flow_id: UUID, limit: int | None = .limit(limit) ) - builds = db.exec(stmt) + builds = await db.exec(stmt) return list(builds)