fix: Use AsyncSession in some non-blocking APIs (#4484)

Fix sync/async DB usage in APIs:
* flows
* files
* folders
* monitor
This commit is contained in:
Christophe Bornet 2024-11-12 00:51:33 +01:00 committed by GitHub
commit 8bb0f65b47
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 238 additions and 162 deletions

View file

@ -266,13 +266,13 @@ def parse_value(value: Any, input_type: str) -> Any:
return value
def cascade_delete_flow(session: Session, flow: Flow) -> None:
async def cascade_delete_flow(session: AsyncSession, flow_id: uuid.UUID) -> None:
try:
session.exec(delete(TransactionTable).where(TransactionTable.flow_id == flow.id))
session.exec(delete(VertexBuildTable).where(VertexBuildTable.flow_id == flow.id))
session.exec(delete(Flow).where(Flow.id == flow.id))
await session.exec(delete(TransactionTable).where(TransactionTable.flow_id == flow_id))
await session.exec(delete(VertexBuildTable).where(VertexBuildTable.flow_id == flow_id))
await session.exec(delete(Flow).where(Flow.id == flow_id))
except Exception as e:
msg = f"Unable to cascade delete flow: ${flow.id}"
msg = f"Unable to cascade delete flow: ${flow_id}"
raise RuntimeError(msg, e) from e

View file

@ -9,7 +9,7 @@ from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, UploadFile
from fastapi.responses import StreamingResponse
from langflow.api.utils import CurrentActiveUser, DbSession
from langflow.api.utils import AsyncDbSession, CurrentActiveUser
from langflow.api.v1.schemas import UploadFileResponse
from langflow.services.database.models.flow import Flow
from langflow.services.deps import get_settings_service, get_storage_service
@ -22,14 +22,14 @@ router = APIRouter(tags=["Files"], prefix="/files")
# Create dep that gets the flow_id from the request
# then finds it in the database and returns it while
# using the current user as the owner
def get_flow_id(
async def get_flow_id(
flow_id: UUID,
current_user: CurrentActiveUser,
session: DbSession,
session: AsyncDbSession,
):
flow_id_str = str(flow_id)
# AttributeError: 'SelectOfScalar' object has no attribute 'first'
flow = session.get(Flow, flow_id_str)
flow = await session.get(Flow, flow_id_str)
if not flow:
raise HTTPException(status_code=404, detail="Flow not found")
if flow.user_id != current_user.id:
@ -43,7 +43,7 @@ async def upload_file(
file: UploadFile,
flow_id: Annotated[UUID, Depends(get_flow_id)],
current_user: CurrentActiveUser,
session: DbSession,
session: AsyncDbSession,
storage_service: Annotated[StorageService, Depends(get_storage_service)],
) -> UploadFileResponse:
try:
@ -58,7 +58,7 @@ async def upload_file(
try:
flow_id_str = str(flow_id)
flow = session.get(Flow, flow_id_str)
flow = await session.get(Flow, flow_id_str)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e

View file

@ -14,9 +14,16 @@ from fastapi.encoders import jsonable_encoder
from fastapi.responses import StreamingResponse
from fastapi_pagination import Page, Params, add_pagination
from fastapi_pagination.ext.sqlalchemy import paginate
from sqlmodel import Session, and_, col, select
from sqlmodel import and_, col, select
from sqlmodel.ext.asyncio.session import AsyncSession
from langflow.api.utils import CurrentActiveUser, DbSession, cascade_delete_flow, remove_api_keys, validate_is_component
from langflow.api.utils import (
AsyncDbSession,
CurrentActiveUser,
cascade_delete_flow,
remove_api_keys,
validate_is_component,
)
from langflow.api.v1.schemas import FlowListCreate
from langflow.initial_setup.setup import STARTER_FOLDER_NAME
from langflow.services.database.models.flow import Flow, FlowCreate, FlowRead, FlowUpdate
@ -25,7 +32,6 @@ from langflow.services.database.models.flow.utils import get_webhook_component_i
from langflow.services.database.models.folder.constants import DEFAULT_FOLDER_NAME
from langflow.services.database.models.folder.model import Folder
from langflow.services.database.models.transactions.crud import get_transactions_by_flow_id
from langflow.services.database.models.user.model import User
from langflow.services.database.models.vertex_builds.crud import get_vertex_builds_by_flow_id
from langflow.services.deps import get_settings_service
from langflow.services.settings.service import SettingsService
@ -34,26 +40,27 @@ from langflow.services.settings.service import SettingsService
router = APIRouter(prefix="/flows", tags=["Flows"])
@router.post("/", response_model=FlowRead, status_code=201)
async def create_flow(
async def _new_flow(
*,
session: DbSession,
session: AsyncSession,
flow: FlowCreate,
current_user: CurrentActiveUser,
user_id: UUID,
):
try:
"""Create a new flow."""
if flow.user_id is None:
flow.user_id = current_user.id
flow.user_id = user_id
# First check if the flow.name is unique
# there might be flows with name like: "MyFlow", "MyFlow (1)", "MyFlow (2)"
# so we need to check if the name is unique with `like` operator
# if we find a flow with the same name, we add a number to the end of the name
# based on the highest number found
if session.exec(select(Flow).where(Flow.name == flow.name).where(Flow.user_id == current_user.id)).first():
flows = session.exec(
select(Flow).where(Flow.name.like(f"{flow.name} (%")).where(Flow.user_id == current_user.id) # type: ignore[attr-defined]
if (await session.exec(select(Flow).where(Flow.name == flow.name).where(Flow.user_id == user_id))).first():
flows = (
await session.exec(
select(Flow).where(Flow.name.like(f"{flow.name} (%")).where(Flow.user_id == user_id) # type: ignore[attr-defined]
)
).all()
if flows:
extract_number = re.compile(r"\((\d+)\)$")
@ -69,17 +76,21 @@ async def create_flow(
# Now check if the endpoint is unique
if (
flow.endpoint_name
and session.exec(
select(Flow).where(Flow.endpoint_name == flow.endpoint_name).where(Flow.user_id == current_user.id)
and (
await session.exec(
select(Flow).where(Flow.endpoint_name == flow.endpoint_name).where(Flow.user_id == user_id)
)
).first()
):
flows = session.exec(
select(Flow)
.where(Flow.endpoint_name.like(f"{flow.endpoint_name}-%")) # type: ignore[union-attr]
.where(Flow.user_id == current_user.id)
flows = (
await session.exec(
select(Flow)
.where(Flow.endpoint_name.like(f"{flow.endpoint_name}-%")) # type: ignore[union-attr]
.where(Flow.user_id == user_id)
)
).all()
if flows:
# The endpoitn name is like "my-endpoint","my-endpoint-1", "my-endpoint-2"
# The endpoint name is like "my-endpoint","my-endpoint-1", "my-endpoint-2"
# so we need to get the highest number and add 1
# we need to get the last part of the endpoint name
numbers = [int(flow.endpoint_name.split("-")[-1]) for flow in flows]
@ -92,19 +103,36 @@ async def create_flow(
if db_flow.folder_id is None:
# Make sure flows always have a folder
default_folder = session.exec(
select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME, Folder.user_id == current_user.id)
default_folder = (
await session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME, Folder.user_id == user_id))
).first()
if default_folder:
db_flow.folder_id = default_folder.id
session.add(db_flow)
session.commit()
session.refresh(db_flow)
except Exception as e:
# If it is a validation error, return the error message
if hasattr(e, "errors"):
raise HTTPException(status_code=400, detail=str(e)) from e
if isinstance(e, HTTPException):
raise
raise HTTPException(status_code=500, detail=str(e)) from e
return db_flow
@router.post("/", response_model=FlowRead, status_code=201)
async def create_flow(
*,
session: AsyncDbSession,
flow: FlowCreate,
current_user: CurrentActiveUser,
):
try:
db_flow = await _new_flow(session=session, flow=flow, user_id=current_user.id)
await session.commit()
await session.refresh(db_flow)
except Exception as e:
if "UNIQUE constraint failed" in str(e):
# Get the name of the column that failed
columns = str(e).split("UNIQUE constraint failed: ")[1].split(".")[1].split("\n")[0]
@ -119,7 +147,6 @@ async def create_flow(
if isinstance(e, HTTPException):
raise
raise HTTPException(status_code=500, detail=str(e)) from e
return db_flow
@ -127,7 +154,7 @@ async def create_flow(
async def read_flows(
*,
current_user: CurrentActiveUser,
session: DbSession,
session: AsyncDbSession,
remove_example_flows: bool = False,
components_only: bool = False,
get_all: bool = True,
@ -158,10 +185,10 @@ async def read_flows(
try:
auth_settings = get_settings_service().auth_settings
default_folder = session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME)).first()
default_folder = (await session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME))).first()
default_folder_id = default_folder.id if default_folder else None
starter_folder = session.exec(select(Folder).where(Folder.name == STARTER_FOLDER_NAME)).first()
starter_folder = (await session.exec(select(Folder).where(Folder.name == STARTER_FOLDER_NAME))).first()
starter_folder_id = starter_folder.id if starter_folder else None
if not starter_folder and not default_folder:
@ -187,7 +214,7 @@ async def read_flows(
stmt = stmt.where(Flow.is_component == True) # noqa: E712
if get_all:
flows = session.exec(stmt).all()
flows = (await session.exec(stmt)).all()
flows = validate_is_component(flows)
if components_only:
flows = [flow for flow in flows if flow.is_component]
@ -201,16 +228,16 @@ async def read_flows(
return flows
stmt = stmt.where(Flow.folder_id == folder_id)
return paginate(session, stmt, params=params)
return await paginate(session, stmt, params=params)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
def _read_flow(
session: Session,
async def _read_flow(
session: AsyncSession,
flow_id: UUID,
current_user: User,
user_id: UUID,
settings_service: SettingsService,
):
"""Read a flow."""
@ -220,20 +247,20 @@ def _read_flow(
# If auto login is enable user_id can be current_user.id or None
# so write an OR
stmt = stmt.where(
(Flow.user_id == current_user.id) | (Flow.user_id == None) # noqa: E711
(Flow.user_id == user_id) | (Flow.user_id == None) # noqa: E711
)
return session.exec(stmt).first()
return (await session.exec(stmt)).first()
@router.get("/{flow_id}", response_model=FlowRead, status_code=200)
async def read_flow(
*,
session: DbSession,
session: AsyncDbSession,
flow_id: UUID,
current_user: CurrentActiveUser,
):
"""Read a flow."""
if user_flow := _read_flow(session, flow_id, current_user, get_settings_service()):
if user_flow := await _read_flow(session, flow_id, current_user.id, get_settings_service()):
return user_flow
raise HTTPException(status_code=404, detail="Flow not found")
@ -241,7 +268,7 @@ async def read_flow(
@router.patch("/{flow_id}", response_model=FlowRead, status_code=200)
async def update_flow(
*,
session: DbSession,
session: AsyncDbSession,
flow_id: UUID,
flow: FlowUpdate,
current_user: CurrentActiveUser,
@ -249,10 +276,10 @@ async def update_flow(
"""Update a flow."""
settings_service = get_settings_service()
try:
db_flow = _read_flow(
db_flow = await _read_flow(
session=session,
flow_id=flow_id,
current_user=current_user,
user_id=current_user.id,
settings_service=settings_service,
)
except Exception as e:
@ -272,12 +299,12 @@ async def update_flow(
db_flow.updated_at = datetime.now(timezone.utc)
if db_flow.folder_id is None:
default_folder = session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME)).first()
default_folder = (await session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME))).first()
if default_folder:
db_flow.folder_id = default_folder.id
session.add(db_flow)
session.commit()
session.refresh(db_flow)
await session.commit()
await session.refresh(db_flow)
except Exception as e:
# If it is a validation error, return the error message
if hasattr(e, "errors"):
@ -299,30 +326,30 @@ async def update_flow(
@router.delete("/{flow_id}", status_code=200)
def delete_flow(
async def delete_flow(
*,
session: DbSession,
session: AsyncDbSession,
flow_id: UUID,
current_user: CurrentActiveUser,
):
"""Delete a flow."""
flow = _read_flow(
flow = await _read_flow(
session=session,
flow_id=flow_id,
current_user=current_user,
user_id=current_user.id,
settings_service=get_settings_service(),
)
if not flow:
raise HTTPException(status_code=404, detail="Flow not found")
cascade_delete_flow(session, flow)
session.commit()
await cascade_delete_flow(session, flow.id)
await session.commit()
return {"message": "Flow deleted successfully"}
@router.post("/batch/", response_model=list[FlowRead], status_code=201)
async def create_flows(
*,
session: DbSession,
session: AsyncDbSession,
flow_list: FlowListCreate,
current_user: CurrentActiveUser,
):
@ -333,16 +360,16 @@ async def create_flows(
db_flow = Flow.model_validate(flow, from_attributes=True)
session.add(db_flow)
db_flows.append(db_flow)
session.commit()
await session.commit()
for db_flow in db_flows:
session.refresh(db_flow)
await session.refresh(db_flow)
return db_flows
@router.post("/upload/", response_model=list[FlowRead], status_code=201)
async def upload_file(
*,
session: DbSession,
session: AsyncDbSession,
file: Annotated[UploadFile, File(...)],
current_user: CurrentActiveUser,
folder_id: UUID | None = None,
@ -357,9 +384,29 @@ async def upload_file(
flow.user_id = current_user.id
if folder_id:
flow.folder_id = folder_id
response = await create_flow(session=session, flow=flow, current_user=current_user)
response = await _new_flow(session=session, flow=flow, user_id=current_user.id)
response_list.append(response)
try:
await session.commit()
for db_flow in response_list:
await session.refresh(db_flow)
except Exception as e:
if "UNIQUE constraint failed" in str(e):
# Get the name of the column that failed
columns = str(e).split("UNIQUE constraint failed: ")[1].split(".")[1].split("\n")[0]
# UNIQUE constraint failed: flow.user_id, flow.name
# or UNIQUE constraint failed: flow.name
# if the column has id in it, we want the other column
column = columns.split(",")[1] if "id" in columns.split(",")[0] else columns.split(",")[0]
raise HTTPException(
status_code=400, detail=f"{column.capitalize().replace('_', ' ')} must be unique"
) from e
if isinstance(e, HTTPException):
raise
raise HTTPException(status_code=500, detail=str(e)) from e
return response_list
@ -367,7 +414,7 @@ async def upload_file(
async def delete_multiple_flows(
flow_ids: list[UUID],
user: CurrentActiveUser,
db: DbSession,
db: AsyncDbSession,
):
"""Delete multiple flows by their IDs.
@ -381,19 +428,21 @@ async def delete_multiple_flows(
"""
try:
flows_to_delete = db.exec(select(Flow).where(col(Flow.id).in_(flow_ids)).where(Flow.user_id == user.id)).all()
flows_to_delete = (
await db.exec(select(Flow).where(col(Flow.id).in_(flow_ids)).where(Flow.user_id == user.id))
).all()
for flow in flows_to_delete:
transactions_to_delete = get_transactions_by_flow_id(db, flow.id)
transactions_to_delete = await get_transactions_by_flow_id(db, flow.id)
for transaction in transactions_to_delete:
db.delete(transaction)
await db.delete(transaction)
builds_to_delete = get_vertex_builds_by_flow_id(db, flow.id)
builds_to_delete = await get_vertex_builds_by_flow_id(db, flow.id)
for build in builds_to_delete:
db.delete(build)
await db.delete(build)
db.delete(flow)
await db.delete(flow)
db.commit()
await db.commit()
return {"deleted": len(flows_to_delete)}
except Exception as exc:
raise HTTPException(status_code=500, detail=str(exc)) from exc
@ -403,10 +452,10 @@ async def delete_multiple_flows(
async def download_multiple_file(
flow_ids: list[UUID],
user: CurrentActiveUser,
db: DbSession,
db: AsyncDbSession,
):
"""Download all flows as a zip file."""
flows = db.exec(select(Flow).where(and_(Flow.user_id == user.id, Flow.id.in_(flow_ids)))).all() # type: ignore[attr-defined]
flows = (await db.exec(select(Flow).where(and_(Flow.user_id == user.id, Flow.id.in_(flow_ids))))).all() # type: ignore[attr-defined]
if not flows:
raise HTTPException(status_code=404, detail="No flows found.")
@ -444,7 +493,7 @@ async def download_multiple_file(
@router.get("/basic_examples/", response_model=list[FlowRead], status_code=200)
async def read_basic_examples(
*,
session: DbSession,
session: AsyncDbSession,
):
"""Retrieve a list of basic example flows.
@ -456,13 +505,13 @@ async def read_basic_examples(
"""
try:
# Get the starter folder
starter_folder = session.exec(select(Folder).where(Folder.name == STARTER_FOLDER_NAME)).first()
starter_folder = (await session.exec(select(Folder).where(Folder.name == STARTER_FOLDER_NAME))).first()
if not starter_folder:
return []
# Get all flows in the starter folder
return session.exec(select(Flow).where(Flow.folder_id == starter_folder.id)).all()
return (await session.exec(select(Flow).where(Flow.folder_id == starter_folder.id))).all()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e

View file

@ -5,9 +5,10 @@ from fastapi import APIRouter, Depends, File, HTTPException, Response, UploadFil
from fastapi_pagination import Params
from fastapi_pagination.ext.sqlmodel import paginate
from sqlalchemy import or_, update
from sqlalchemy.orm import selectinload
from sqlmodel import select
from langflow.api.utils import CurrentActiveUser, DbSession, cascade_delete_flow, custom_params
from langflow.api.utils import AsyncDbSession, CurrentActiveUser, cascade_delete_flow, custom_params
from langflow.api.v1.flows import create_flows
from langflow.api.v1.schemas import FlowListCreate, FlowListReadWithFolderName
from langflow.helpers.flow import generate_unique_flow_name
@ -30,7 +31,7 @@ router = APIRouter(prefix="/folders", tags=["Folders"])
@router.post("/", response_model=FolderRead, status_code=201)
async def create_folder(
*,
session: DbSession,
session: AsyncDbSession,
folder: FolderCreate,
current_user: CurrentActiveUser,
):
@ -42,10 +43,12 @@ async def create_folder(
# so we need to check if the name is unique with `like` operator
# if we find a flow with the same name, we add a number to the end of the name
# based on the highest number found
if session.exec(
statement=select(Folder).where(Folder.name == new_folder.name).where(Folder.user_id == current_user.id)
if (
await session.exec(
statement=select(Folder).where(Folder.name == new_folder.name).where(Folder.user_id == current_user.id)
)
).first():
folder_results = session.exec(
folder_results = await session.exec(
select(Folder).where(
Folder.name.like(f"{new_folder.name}%"), # type: ignore[attr-defined]
Folder.user_id == current_user.id,
@ -60,20 +63,20 @@ async def create_folder(
new_folder.name = f"{new_folder.name} (1)"
session.add(new_folder)
session.commit()
session.refresh(new_folder)
await session.commit()
await session.refresh(new_folder)
if folder.components_list:
update_statement_components = (
update(Flow).where(Flow.id.in_(folder.components_list)).values(folder_id=new_folder.id) # type: ignore[attr-defined]
)
session.exec(update_statement_components)
session.commit()
await session.exec(update_statement_components)
await session.commit()
if folder.flows_list:
update_statement_flows = update(Flow).where(Flow.id.in_(folder.flows_list)).values(folder_id=new_folder.id) # type: ignore[attr-defined]
session.exec(update_statement_flows)
session.commit()
await session.exec(update_statement_flows)
await session.commit()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@ -84,13 +87,15 @@ async def create_folder(
@router.get("/", response_model=list[FolderRead], status_code=200)
async def read_folders(
*,
session: DbSession,
session: AsyncDbSession,
current_user: CurrentActiveUser,
):
try:
folders = session.exec(
select(Folder).where(
or_(Folder.user_id == current_user.id, Folder.user_id == None) # noqa: E711
folders = (
await session.exec(
select(Folder).where(
or_(Folder.user_id == current_user.id, Folder.user_id == None) # noqa: E711
)
)
).all()
folders = [folder for folder in folders if folder.name != STARTER_FOLDER_NAME]
@ -102,7 +107,7 @@ async def read_folders(
@router.get("/{folder_id}", response_model=FolderWithPaginatedFlows | FolderReadWithFlows, status_code=200)
async def read_folder(
*,
session: DbSession,
session: AsyncDbSession,
folder_id: str,
current_user: CurrentActiveUser,
params: Annotated[Params | None, Depends(custom_params)],
@ -111,7 +116,13 @@ async def read_folder(
search: str = "",
):
try:
folder = session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)).first()
folder = (
await session.exec(
select(Folder)
.options(selectinload(Folder.flows))
.where(Folder.id == folder_id, Folder.user_id == current_user.id)
)
).first()
except Exception as e:
if "No result found" in str(e):
raise HTTPException(status_code=404, detail="Folder not found") from e
@ -132,29 +143,29 @@ async def read_folder(
stmt = stmt.where(Flow.is_component == False) # noqa: E712
if search:
stmt = stmt.where(Flow.name.like(f"%{search}%")) # type: ignore[attr-defined]
paginated_flows = paginate(session, stmt, params=params)
paginated_flows = await paginate(session, stmt, params=params)
return FolderWithPaginatedFlows(folder=FolderRead.model_validate(folder), flows=paginated_flows)
flows_from_current_user_in_folder = [flow for flow in folder.flows if flow.user_id == current_user.id]
folder.flows = flows_from_current_user_in_folder
return folder # noqa: TRY300
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
flows_from_current_user_in_folder = [flow for flow in folder.flows if flow.user_id == current_user.id]
folder.flows = flows_from_current_user_in_folder
return folder
@router.patch("/{folder_id}", response_model=FolderRead, status_code=200)
async def update_folder(
*,
session: DbSession,
session: AsyncDbSession,
folder_id: str,
folder: FolderUpdate, # Assuming FolderUpdate is a Pydantic model defining updatable fields
current_user: CurrentActiveUser,
):
try:
existing_folder = session.exec(
select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)
existing_folder = (
await session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id))
).first()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@ -166,8 +177,8 @@ async def update_folder(
if folder.name and folder.name != existing_folder.name:
existing_folder.name = folder.name
session.add(existing_folder)
session.commit()
session.refresh(existing_folder)
await session.commit()
await session.refresh(existing_folder)
return existing_folder
folder_data = existing_folder.model_dump(exclude_unset=True)
@ -175,29 +186,29 @@ async def update_folder(
if key not in {"components", "flows"}:
setattr(existing_folder, key, value)
session.add(existing_folder)
session.commit()
session.refresh(existing_folder)
await session.commit()
await session.refresh(existing_folder)
concat_folder_components = folder.components + folder.flows
flows_ids = session.exec(select(Flow.id).where(Flow.folder_id == existing_folder.id)).all()
flows_ids = (await session.exec(select(Flow.id).where(Flow.folder_id == existing_folder.id))).all()
excluded_flows = list(set(flows_ids) - set(concat_folder_components))
my_collection_folder = session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME)).first()
my_collection_folder = (await session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME))).first()
if my_collection_folder:
update_statement_my_collection = (
update(Flow).where(Flow.id.in_(excluded_flows)).values(folder_id=my_collection_folder.id) # type: ignore[attr-defined]
)
session.exec(update_statement_my_collection)
session.commit()
await session.exec(update_statement_my_collection)
await session.commit()
if concat_folder_components:
update_statement_components = (
update(Flow).where(Flow.id.in_(concat_folder_components)).values(folder_id=existing_folder.id) # type: ignore[attr-defined]
)
session.exec(update_statement_components)
session.commit()
await session.exec(update_statement_components)
await session.commit()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@ -206,19 +217,23 @@ async def update_folder(
@router.delete("/{folder_id}", status_code=204)
def delete_folder(
async def delete_folder(
*,
session: DbSession,
session: AsyncDbSession,
folder_id: str,
current_user: CurrentActiveUser,
):
try:
flows = session.exec(select(Flow).where(Flow.folder_id == folder_id, Flow.user_id == current_user.id)).all()
flows = (
await session.exec(select(Flow).where(Flow.folder_id == folder_id, Flow.user_id == current_user.id))
).all()
if len(flows) > 0:
for flow in flows:
cascade_delete_flow(session, flow)
await cascade_delete_flow(session, flow.id)
folder = session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)).first()
folder = (
await session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id))
).first()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@ -226,8 +241,8 @@ def delete_folder(
raise HTTPException(status_code=404, detail="Folder not found")
try:
session.delete(folder)
session.commit()
await session.delete(folder)
await session.commit()
return Response(status_code=status.HTTP_204_NO_CONTENT)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@ -236,13 +251,15 @@ def delete_folder(
@router.get("/download/{folder_id}", response_model=FlowListReadWithFolderName, status_code=200)
async def download_file(
*,
session: DbSession,
session: AsyncDbSession,
folder_id: str,
current_user: CurrentActiveUser,
):
"""Download all flows from folder."""
try:
folder = session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)).first()
folder = (
await session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id))
).first()
except Exception as e:
if "No result found" in str(e):
raise HTTPException(status_code=404, detail="Folder not found") from e
@ -257,7 +274,7 @@ async def download_file(
@router.post("/upload/", response_model=list[FlowRead], status_code=201)
async def upload_file(
*,
session: DbSession,
session: AsyncDbSession,
file: Annotated[UploadFile, File(...)],
current_user: CurrentActiveUser,
):
@ -268,7 +285,7 @@ async def upload_file(
if not data:
raise HTTPException(status_code=400, detail="No flows found in the file")
folder_name = generate_unique_folder_name(data["folder_name"], current_user.id, session)
folder_name = await generate_unique_folder_name(data["folder_name"], current_user.id, session)
data["folder_name"] = folder_name
@ -278,8 +295,8 @@ async def upload_file(
new_folder.id = None
new_folder.user_id = current_user.id
session.add(new_folder)
session.commit()
session.refresh(new_folder)
await session.commit()
await session.refresh(new_folder)
del data["folder_name"]
del data["folder_description"]
@ -290,9 +307,9 @@ async def upload_file(
raise HTTPException(status_code=400, detail="No flows found in the data")
# Now we set the user_id for all flows
for flow in flow_list.flows:
flow_name = generate_unique_flow_name(flow.name, current_user.id, session)
flow_name = await generate_unique_flow_name(flow.name, current_user.id, session)
flow.name = flow_name
flow.user_id = current_user.id
flow.folder_id = new_folder.id
return create_flows(session=session, flow_list=flow_list, current_user=current_user)
return await create_flows(session=session, flow_list=flow_list, current_user=current_user)

View file

@ -5,7 +5,7 @@ from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import delete
from sqlmodel import col, select
from langflow.api.utils import DbSession
from langflow.api.utils import AsyncDbSession, DbSession
from langflow.schema.message import MessageResponse
from langflow.services.auth.utils import get_current_active_user
from langflow.services.database.models.message.model import MessageRead, MessageTable, MessageUpdate
@ -21,16 +21,16 @@ router = APIRouter(prefix="/monitor", tags=["Monitor"])
@router.get("/builds")
async def get_vertex_builds(flow_id: Annotated[UUID, Query()], session: DbSession) -> VertexBuildMapModel:
async def get_vertex_builds(flow_id: Annotated[UUID, Query()], session: AsyncDbSession) -> VertexBuildMapModel:
try:
vertex_builds = get_vertex_builds_by_flow_id(session, flow_id)
vertex_builds = await get_vertex_builds_by_flow_id(session, flow_id)
return VertexBuildMapModel.from_list_of_dicts(vertex_builds)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@router.delete("/builds", status_code=204)
async def delete_vertex_builds(flow_id: Annotated[UUID, Query()], session: DbSession) -> None:
def delete_vertex_builds(flow_id: Annotated[UUID, Query()], session: DbSession) -> None:
try:
delete_vertex_builds_by_flow_id(session, flow_id)
except Exception as e:
@ -39,7 +39,7 @@ async def delete_vertex_builds(flow_id: Annotated[UUID, Query()], session: DbSes
@router.get("/messages")
async def get_messages(
session: DbSession,
session: AsyncDbSession,
flow_id: Annotated[str | None, Query()] = None,
session_id: Annotated[str | None, Query()] = None,
sender: Annotated[str | None, Query()] = None,
@ -59,17 +59,17 @@ async def get_messages(
if order_by:
col = getattr(MessageTable, order_by).asc()
stmt = stmt.order_by(col)
messages = session.exec(stmt)
messages = await session.exec(stmt)
return [MessageResponse.model_validate(d, from_attributes=True) for d in messages]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@router.delete("/messages", status_code=204, dependencies=[Depends(get_current_active_user)])
async def delete_messages(message_ids: list[UUID], session: DbSession) -> None:
async def delete_messages(message_ids: list[UUID], session: AsyncDbSession) -> None:
try:
session.exec(delete(MessageTable).where(MessageTable.id.in_(message_ids))) # type: ignore[attr-defined]
session.commit()
await session.exec(delete(MessageTable).where(MessageTable.id.in_(message_ids))) # type: ignore[attr-defined]
await session.commit()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@ -78,10 +78,10 @@ async def delete_messages(message_ids: list[UUID], session: DbSession) -> None:
async def update_message(
message_id: UUID,
message: MessageUpdate,
session: DbSession,
session: AsyncDbSession,
):
try:
db_message = session.get(MessageTable, message_id)
db_message = await session.get(MessageTable, message_id)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@ -93,8 +93,8 @@ async def update_message(
message_dict["edit"] = True
db_message.sqlmodel_update(message_dict)
session.add(db_message)
session.commit()
session.refresh(db_message)
await session.commit()
await session.refresh(db_message)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@ -108,12 +108,12 @@ async def update_message(
async def update_session_id(
old_session_id: str,
new_session_id: Annotated[str, Query(..., description="The new session ID to update to")],
session: DbSession,
session: AsyncDbSession,
) -> list[MessageResponse]:
try:
# Get all messages with the old session ID
stmt = select(MessageTable).where(MessageTable.session_id == old_session_id)
messages = session.exec(stmt).all()
messages = (await session.exec(stmt)).all()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@ -127,10 +127,10 @@ async def update_session_id(
session.add_all(messages)
session.commit()
await session.commit()
message_responses = []
for message in messages:
session.refresh(message)
await session.refresh(message)
message_responses.append(MessageResponse.model_validate(message, from_attributes=True))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@ -141,15 +141,15 @@ async def update_session_id(
@router.delete("/messages/session/{session_id}", status_code=204)
async def delete_messages_session(
session_id: str,
session: DbSession,
session: AsyncDbSession,
):
try:
session.exec(
await session.exec(
delete(MessageTable)
.where(col(MessageTable.session_id) == session_id)
.execution_options(synchronize_session="fetch")
)
session.commit()
await session.commit()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@ -159,10 +159,10 @@ async def delete_messages_session(
@router.get("/transactions")
async def get_transactions(
flow_id: Annotated[UUID, Query()],
session: DbSession,
session: AsyncDbSession,
) -> list[TransactionReadResponse]:
try:
transactions = get_transactions_by_flow_id(session, flow_id)
transactions = await get_transactions_by_flow_id(session, flow_id)
return [
TransactionReadResponse(
transaction_id=t.id,

View file

@ -283,15 +283,17 @@ def get_flow_by_id_or_endpoint_name(flow_id_or_name: str, user_id: UUID | None =
return FlowRead.model_validate(flow, from_attributes=True)
def generate_unique_flow_name(flow_name, user_id, session):
async def generate_unique_flow_name(flow_name, user_id, session):
original_name = flow_name
n = 1
while True:
# Check if a flow with the given name exists
existing_flow = session.exec(
select(Flow).where(
Flow.name == flow_name,
Flow.user_id == user_id,
existing_flow = (
await session.exec(
select(Flow).where(
Flow.name == flow_name,
Flow.user_id == user_id,
)
)
).first()

View file

@ -3,15 +3,17 @@ from sqlalchemy import select
from langflow.services.database.models.folder.model import Folder
def generate_unique_folder_name(folder_name, user_id, session):
async def generate_unique_folder_name(folder_name, user_id, session):
original_name = folder_name
n = 1
while True:
# Check if a folder with the given name exists
existing_folder = session.exec(
select(Folder).where(
Folder.name == folder_name,
Folder.user_id == user_id,
existing_folder = (
await session.exec(
select(Folder).where(
Folder.name == folder_name,
Folder.user_id == user_id,
)
)
).first()

View file

@ -2,11 +2,14 @@ from uuid import UUID
from sqlalchemy.exc import IntegrityError
from sqlmodel import Session, col, select
from sqlmodel.ext.asyncio.session import AsyncSession
from langflow.services.database.models.transactions.model import TransactionBase, TransactionTable
def get_transactions_by_flow_id(db: Session, flow_id: UUID, limit: int | None = 1000) -> list[TransactionTable]:
async def get_transactions_by_flow_id(
db: AsyncSession, flow_id: UUID, limit: int | None = 1000
) -> list[TransactionTable]:
stmt = (
select(TransactionTable)
.where(TransactionTable.flow_id == flow_id)
@ -14,7 +17,7 @@ def get_transactions_by_flow_id(db: Session, flow_id: UUID, limit: int | None =
.limit(limit)
)
transactions = db.exec(stmt)
transactions = await db.exec(stmt)
return list(transactions)

View file

@ -2,11 +2,14 @@ from uuid import UUID
from sqlalchemy.exc import IntegrityError
from sqlmodel import Session, col, delete, select
from sqlmodel.ext.asyncio.session import AsyncSession
from langflow.services.database.models.vertex_builds.model import VertexBuildBase, VertexBuildTable
def get_vertex_builds_by_flow_id(db: Session, flow_id: UUID, limit: int | None = 1000) -> list[VertexBuildTable]:
async def get_vertex_builds_by_flow_id(
db: AsyncSession, flow_id: UUID, limit: int | None = 1000
) -> list[VertexBuildTable]:
stmt = (
select(VertexBuildTable)
.where(VertexBuildTable.flow_id == flow_id)
@ -14,7 +17,7 @@ def get_vertex_builds_by_flow_id(db: Session, flow_id: UUID, limit: int | None =
.limit(limit)
)
builds = db.exec(stmt)
builds = await db.exec(stmt)
return list(builds)