Add support for running flows by endpoint name (#2012)

* feat: Add support for running flows by endpoint name

This commit modifies the `simplified_run_flow` endpoint in `endpoints.py` to allow running flows using the endpoint name instead of the flow ID. It introduces a new route parameter `flow_id_or_name` which can accept either a UUID or a string representing the endpoint name. The code first attempts to parse the parameter as a UUID, and if that fails, it queries the database to find a flow with the matching endpoint name. This change improves the usability of the API by providing an alternative way to identify flows for execution.

* feat: Add endpoint_name field to FlowType

This commit adds the `endpoint_name` field to the `FlowType` interface in the `index.ts` file. The `endpoint_name` field is an optional string that represents the name of the endpoint associated with the flow. This change allows for more flexibility in identifying flows by endpoint name instead of just the flow ID. It improves the usability of the codebase by providing an alternative way to reference flows.

* 🐛 (endpoints.py): change type of flow_id_or_name parameter from Union[str, UUID] to str to simplify the API and improve readability

* feat: Add migration utility functions for table, column, foreign key, and constraint existence checks

This commit adds utility functions to the `migration.py` module in the `langflow.utils` package. These functions provide convenient ways to check the existence of tables, columns, foreign keys, and constraints in a database using SQLAlchemy. The functions `table_exists`, `column_exists`, `foreign_key_exists`, and `constraint_exists` take the table name, column name, foreign key name, and constraint name respectively, along with the SQLAlchemy engine or connection object. They use the `Inspector` class from `sqlalchemy.engine.reflection` to retrieve the necessary information and return a boolean value indicating whether the specified element exists in the database. These utility functions improve the readability and maintainability of the codebase by encapsulating the common existence checks in a reusable and modular way.

* feat: Add unique constraints for per-user folders and flows

This commit adds unique constraints for per-user folders and flows in the database. It introduces the `unique_folder_name` constraint for the `folder` table, ensuring that each user can have only one folder with a specific name. Similarly, it adds the `unique_flow_endpoint_name` and `unique_flow_name` constraints for the `flow` table, enforcing uniqueness of endpoint names and flow names per user. These constraints improve data integrity and prevent duplicate entries in the database, providing a more robust and reliable system.

* feat: Add poetry installation and caching steps to GitHub Actions workflow

This commit updates the GitHub Actions workflow file `action.yml` to include additional steps for installing poetry and caching its dependencies. The `run` step now installs poetry using the specified version and ensures that the poetry binary is available in the PATH. Additionally, the workflow now includes a step to restore pip and poetry cached dependencies using the `actions/cache` action. These changes improve the workflow by providing a more efficient and reliable way to manage poetry dependencies and caching.

* refactor: Improve error handling in update_flow function

This commit improves the error handling in the `update_flow` function in `flows.py`. It adds a new `elif` condition to check if the exception is an instance of `HTTPException` and re-raises it. This ensures that any `HTTPException` raised during the update process is properly handled and returned as a response. Additionally, it removes the unnecessary `else` block and simplifies the code logic. This refactor enhances the reliability and maintainability of the `update_flow` function.
This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-05-30 07:46:28 -07:00 committed by GitHub
commit 94e8bf1e2b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 504 additions and 136 deletions

View file

@ -11,6 +11,7 @@ from alembic import op
import sqlalchemy as sa
import sqlmodel
from sqlalchemy.engine.reflection import Inspector
from langflow.utils import migration
${imports if imports else ""}
# revision identifiers, used by Alembic.
@ -22,13 +23,9 @@ depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
table_names = inspector.get_table_names()
${upgrades if upgrades else "pass"}
def downgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
table_names = inspector.get_table_names()
${downgrades if downgrades else "pass"}

View file

@ -0,0 +1,42 @@
"""Add unique constraints per user in folder table
Revision ID: 1c79524817ed
Revises: 3bb0ddf32dfb
Create Date: 2024-05-29 23:12:09.146880
"""
from typing import Sequence, Union
from alembic import op
from sqlalchemy.engine.reflection import Inspector
# revision identifiers, used by Alembic.
revision: str = "1c79524817ed"
down_revision: Union[str, None] = "3bb0ddf32dfb"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
constraints_names = [constraint["name"] for constraint in inspector.get_unique_constraints("folder")]
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("folder", schema=None) as batch_op:
if "unique_folder_name" not in constraints_names:
batch_op.create_unique_constraint("unique_folder_name", ["user_id", "name"])
# ### end Alembic commands ###
def downgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
constraints_names = [constraint["name"] for constraint in inspector.get_unique_constraints("folder")]
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("folder", schema=None) as batch_op:
if "unique_folder_name" in constraints_names:
batch_op.drop_constraint("unique_folder_name", type_="unique")
# ### end Alembic commands ###

View file

@ -0,0 +1,54 @@
"""Add unique constraints per user in flow table
Revision ID: 3bb0ddf32dfb
Revises: a72f5cf9c2f9
Create Date: 2024-05-29 23:08:43.935040
"""
from typing import Sequence, Union
from alembic import op
from sqlalchemy.engine.reflection import Inspector
# revision identifiers, used by Alembic.
revision: str = "3bb0ddf32dfb"
down_revision: Union[str, None] = "a72f5cf9c2f9"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
# ### commands auto generated by Alembic - please adjust! ###
indexes_names = [index["name"] for index in inspector.get_indexes("flow")]
constraints_names = [constraint["name"] for constraint in inspector.get_unique_constraints("flow")]
with op.batch_alter_table("flow", schema=None) as batch_op:
if "ix_flow_endpoint_name" in indexes_names:
batch_op.drop_index("ix_flow_endpoint_name")
batch_op.create_index(batch_op.f("ix_flow_endpoint_name"), ["endpoint_name"], unique=False)
if "unique_flow_endpoint_name" not in constraints_names:
batch_op.create_unique_constraint("unique_flow_endpoint_name", ["user_id", "endpoint_name"])
if "unique_flow_name" not in constraints_names:
batch_op.create_unique_constraint("unique_flow_name", ["user_id", "name"])
# ### end Alembic commands ###
def downgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
# ### commands auto generated by Alembic - please adjust! ###
indexes_names = [index["name"] for index in inspector.get_indexes("flow")]
constraints_names = [constraint["name"] for constraint in inspector.get_unique_constraints("flow")]
with op.batch_alter_table("flow", schema=None) as batch_op:
if "unique_flow_name" in constraints_names:
batch_op.drop_constraint("unique_flow_name", type_="unique")
if "unique_flow_endpoint_name" in constraints_names:
batch_op.drop_constraint("unique_flow_endpoint_name", type_="unique")
if "ix_flow_endpoint_name" in indexes_names:
batch_op.drop_index(batch_op.f("ix_flow_endpoint_name"))
batch_op.create_index("ix_flow_endpoint_name", ["endpoint_name"], unique=1)
# ### end Alembic commands ###

View file

@ -0,0 +1,52 @@
"""Add endpoint name col
Revision ID: a72f5cf9c2f9
Revises: 29fe8f1f806b
Create Date: 2024-05-29 21:44:04.240816
"""
from typing import Sequence, Union
import sqlalchemy as sa
import sqlmodel
from alembic import op
from sqlalchemy.engine.reflection import Inspector
# revision identifiers, used by Alembic.
revision: str = "a72f5cf9c2f9"
down_revision: Union[str, None] = "29fe8f1f806b"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
# ### commands auto generated by Alembic - please adjust! ###
column_names = [column["name"] for column in inspector.get_columns("flow")]
indexes = inspector.get_indexes("flow")
index_names = [index["name"] for index in indexes]
with op.batch_alter_table("flow", schema=None) as batch_op:
if "endpoint_name" not in column_names:
batch_op.add_column(sa.Column("endpoint_name", sqlmodel.sql.sqltypes.AutoString(), nullable=True))
if "ix_flow_endpoint_name" not in index_names:
batch_op.create_index(batch_op.f("ix_flow_endpoint_name"), ["endpoint_name"], unique=True)
# ### end Alembic commands ###
def downgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
# ### commands auto generated by Alembic - please adjust! ###
column_names = [column["name"] for column in inspector.get_columns("flow")]
indexes = inspector.get_indexes("flow")
index_names = [index["name"] for index in indexes]
with op.batch_alter_table("flow", schema=None) as batch_op:
if "ix_flow_endpoint_name" in index_names:
batch_op.drop_index(batch_op.f("ix_flow_endpoint_name"))
if "endpoint_name" in column_names:
batch_op.drop_column("endpoint_name")
# ### end Alembic commands ###

View file

@ -53,10 +53,10 @@ def get_all(
raise HTTPException(status_code=500, detail=str(exc)) from exc
@router.post("/run/{flow_id}", response_model=RunResponse, response_model_exclude_none=True)
@router.post("/run/{flow_id_or_name}", response_model=RunResponse, response_model_exclude_none=True)
async def simplified_run_flow(
db: Annotated[Session, Depends(get_session)],
flow_id: UUID,
flow_id_or_name: str,
input_request: SimplifiedAPIRequest = SimplifiedAPIRequest(),
stream: bool = False,
api_key_user: User = Depends(api_key_security),
@ -111,8 +111,21 @@ async def simplified_run_flow(
This endpoint provides a powerful interface for executing flows with enhanced flexibility and efficiency, supporting a wide range of applications by allowing for dynamic input and output configuration along with performance optimizations through session management and caching.
"""
session_id = input_request.session_id
endpoint_name = None
flow_id_str = None
try:
try:
flow_id = UUID(flow_id_or_name)
except ValueError:
endpoint_name = flow_id_or_name
flow = db.exec(
select(Flow).where(Flow.endpoint_name == endpoint_name).where(Flow.user_id == api_key_user.id)
).first()
if flow is None:
raise ValueError(f"Flow with endpoint name {endpoint_name} not found")
flow_id = flow.id
flow_id_str = str(flow_id)
artifacts = {}
if input_request.session_id:
@ -172,10 +185,13 @@ async def simplified_run_flow(
# This means the Flow ID is not a valid UUID which means it can't find the flow
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
except ValueError as exc:
if f"Flow {flow_id_str} not found" in str(exc):
if flow_id_str and f"Flow {flow_id_str} not found" in str(exc):
logger.error(f"Flow {flow_id_str} not found")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
elif f"Session {session_id} not found" in str(exc):
elif endpoint_name and f"Flow with endpoint name {endpoint_name} not found" in str(exc):
logger.error(f"Flow with endpoint name {endpoint_name} not found")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
elif session_id and f"Session {session_id} not found" in str(exc):
logger.error(f"Session {session_id} not found")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
else:

View file

@ -135,30 +135,49 @@ def update_flow(
settings_service=Depends(get_settings_service),
):
"""Update a flow."""
try:
db_flow = read_flow(
session=session,
flow_id=flow_id,
current_user=current_user,
settings_service=settings_service,
)
if not db_flow:
raise HTTPException(status_code=404, detail="Flow not found")
flow_data = flow.model_dump(exclude_unset=True)
if settings_service.settings.remove_api_keys:
flow_data = remove_api_keys(flow_data)
for key, value in flow_data.items():
if value is not None:
setattr(db_flow, key, value)
db_flow.updated_at = datetime.now(timezone.utc)
if db_flow.folder_id is None:
default_folder = session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME)).first()
if default_folder:
db_flow.folder_id = default_folder.id
session.add(db_flow)
session.commit()
session.refresh(db_flow)
return db_flow
except Exception as e:
# If it is a validation error, return the error message
if hasattr(e, "errors"):
raise HTTPException(status_code=400, detail=str(e)) from e
elif "UNIQUE constraint failed" in str(e):
# Get the name of the column that failed
columns = str(e).split("UNIQUE constraint failed: ")[1].split(".")[1].split("\n")[0]
# UNIQUE constraint failed: flow.user_id, flow.name
# or UNIQUE constraint failed: flow.name
# if the column has id in it, we want the other column
column = columns.split(",")[1] if "id" in columns.split(",")[0] else columns.split(",")[0]
db_flow = read_flow(
session=session,
flow_id=flow_id,
current_user=current_user,
settings_service=settings_service,
)
if not db_flow:
raise HTTPException(status_code=404, detail="Flow not found")
flow_data = flow.model_dump(exclude_unset=True)
if settings_service.settings.remove_api_keys:
flow_data = remove_api_keys(flow_data)
for key, value in flow_data.items():
if value is not None:
setattr(db_flow, key, value)
db_flow.updated_at = datetime.now(timezone.utc)
if db_flow.folder_id is None:
default_folder = session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME)).first()
if default_folder:
db_flow.folder_id = default_folder.id
session.add(db_flow)
session.commit()
session.refresh(db_flow)
return db_flow
raise HTTPException(
status_code=400, detail=f"{column.capitalize().replace('_', ' ')} must be unique"
) from e
elif isinstance(e, HTTPException):
raise e
else:
raise HTTPException(status_code=500, detail=str(e)) from e
@router.delete("/{flow_id}", status_code=200)

View file

@ -1,5 +1,6 @@
# Path: src/backend/langflow/services/database/models/flow/model.py
import re
import warnings
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Dict, Optional
@ -7,7 +8,9 @@ from uuid import UUID, uuid4
import emoji
from emoji import purely_emoji # type: ignore
from fastapi import HTTPException, status
from pydantic import field_serializer, field_validator
from sqlalchemy import UniqueConstraint
from sqlmodel import JSON, Column, Field, Relationship, SQLModel
from langflow.schema.schema import Record
@ -26,6 +29,24 @@ class FlowBase(SQLModel):
is_component: Optional[bool] = Field(default=False, nullable=True)
updated_at: Optional[datetime] = Field(default_factory=lambda: datetime.now(timezone.utc), nullable=True)
folder_id: Optional[UUID] = Field(default=None, nullable=True)
endpoint_name: Optional[str] = Field(default=None, nullable=True, index=True)
@field_validator("endpoint_name")
@classmethod
def validate_endpoint_name(cls, v):
# Endpoint name must be a string containing only letters, numbers, hyphens, and underscores
if v is not None:
if not isinstance(v, str):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Endpoint name must be a string",
)
if not re.match(r"^[a-zA-Z0-9_-]+$", v):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Endpoint name must contain only letters, numbers, hyphens, and underscores",
)
return v
@field_validator("icon_bg_color")
def validate_icon_bg_color(cls, v):
@ -128,6 +149,11 @@ class Flow(FlowBase, table=True):
record = Record(data=data)
return record
__table_args__ = (
UniqueConstraint("user_id", "name", name="unique_flow_name"),
UniqueConstraint("user_id", "endpoint_name", name="unique_flow_endpoint_name"),
)
class FlowCreate(FlowBase):
user_id: Optional[UUID] = None
@ -145,3 +171,21 @@ class FlowUpdate(SQLModel):
description: Optional[str] = None
data: Optional[Dict] = None
folder_id: Optional[UUID] = None
endpoint_name: Optional[str] = None
@field_validator("endpoint_name")
@classmethod
def validate_endpoint_name(cls, v):
# Endpoint name must be a string containing only letters, numbers, hyphens, and underscores
if v is not None:
if not isinstance(v, str):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Endpoint name must be a string",
)
if not re.match(r"^[a-zA-Z0-9_-]+$", v):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Endpoint name must contain only letters, numbers, hyphens, and underscores",
)
return v

View file

@ -1,6 +1,7 @@
from typing import TYPE_CHECKING, List, Optional
from uuid import UUID, uuid4
from sqlalchemy import UniqueConstraint
from sqlmodel import Field, Relationship, SQLModel
from langflow.services.database.models.flow.model import FlowRead
@ -30,6 +31,8 @@ class Folder(FolderBase, table=True):
back_populates="folder", sa_relationship_kwargs={"cascade": "all, delete, delete-orphan"}
)
__table_args__ = (UniqueConstraint("user_id", "name", name="unique_folder_name"),)
class FolderCreate(FolderBase):
components_list: Optional[List[UUID]] = None

View file

@ -0,0 +1,65 @@
from sqlalchemy.engine.reflection import Inspector
def table_exists(name, conn):
"""
Check if a table exists.
Parameters:
name (str): The name of the table to check.
conn (sqlalchemy.engine.Engine or sqlalchemy.engine.Connection): The SQLAlchemy engine or connection to use.
Returns:
bool: True if the table exists, False otherwise.
"""
inspector = Inspector.from_engine(conn)
return name in inspector.get_table_names()
def column_exists(table_name, column_name, conn):
"""
Check if a column exists in a table.
Parameters:
table_name (str): The name of the table to check.
column_name (str): The name of the column to check.
conn (sqlalchemy.engine.Engine or sqlalchemy.engine.Connection): The SQLAlchemy engine or connection to use.
Returns:
bool: True if the column exists, False otherwise.
"""
inspector = Inspector.from_engine(conn)
return column_name in [column["name"] for column in inspector.get_columns(table_name)]
def foreign_key_exists(table_name, fk_name, conn):
"""
Check if a foreign key exists in a table.
Parameters:
table_name (str): The name of the table to check.
fk_name (str): The name of the foreign key to check.
conn (sqlalchemy.engine.Engine or sqlalchemy.engine.Connection): The SQLAlchemy engine or connection to use.
Returns:
bool: True if the foreign key exists, False otherwise.
"""
inspector = Inspector.from_engine(conn)
return fk_name in [fk["name"] for fk in inspector.get_foreign_keys(table_name)]
def constraint_exists(table_name, constraint_name, conn):
"""
Check if a constraint exists in a table.
Parameters:
table_name (str): The name of the table to check.
constraint_name (str): The name of the constraint to check.
conn (sqlalchemy.engine.Engine or sqlalchemy.engine.Connection): The SQLAlchemy engine or connection to use.
Returns:
bool: True if the constraint exists, False otherwise.
"""
inspector = Inspector.from_engine(conn)
constraints = inspector.get_unique_constraints(table_name)
return constraint_name in [constraint["name"] for constraint in constraints]