fix: add initial setup validations (#5385)

* convert flow ids to uuids when loading from dir

* Edit a migration file to check for foreign key existence

* [autofix.ci] apply automated fixes

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Jordan Frazier 2024-12-20 16:23:21 -08:00 committed by GitHub
commit b2e7476d3b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 23 additions and 2 deletions

View file

@ -40,11 +40,17 @@ def upgrade() -> None:
batch_op.add_column(sa.Column("folder", sqlmodel.sql.sqltypes.AutoString(), nullable=True))
if "user_id" not in flow_columns:
batch_op.add_column(sa.Column("user_id", sqlmodel.sql.sqltypes.types.Uuid(), nullable=True))
indices = inspector.get_indexes("flow")
indices_names = [index["name"] for index in indices]
if "ix_flow_user_id" not in indices_names:
batch_op.create_index(batch_op.f("ix_flow_user_id"), ["user_id"], unique=False)
if "fk_flow_user_id_user" not in indices_names:
# Check for existing foreign key constraints
constraints = inspector.get_foreign_keys("flow")
constraint_names = [constraint["name"] for constraint in constraints]
if "fk_flow_user_id_user" not in constraint_names:
batch_op.create_foreign_key("fk_flow_user_id_user", "user", ["user_id"], ["id"])
except Exception as e:

View file

@ -565,6 +565,13 @@ async def load_flows_from_directory() -> None:
flow["id"] = no_json_name
flow_id = flow.get("id")
if isinstance(flow_id, str):
try:
flow_id = UUID(flow_id)
except ValueError:
logger.error(f"Invalid UUID string: {flow_id}")
return
existing = await find_existing_flow(session, flow_id, flow_endpoint_name)
if existing:
logger.debug(f"Found existing flow: {existing.name}")
@ -583,17 +590,24 @@ async def load_flows_from_directory() -> None:
folder_id = await get_default_folder_id(session, user_id)
existing.folder_id = folder_id
if isinstance(existing.id, str):
try:
existing.id = UUID(existing.id)
except ValueError:
logger.error(f"Invalid UUID string: {existing.id}")
return
session.add(existing)
else:
logger.info(f"Creating new flow: {flow_id} with endpoint name {flow_endpoint_name}")
# Current behavior loads all new flows into default folder
folder_id = await get_default_folder_id(session, user_id)
flow["user_id"] = user_id
flow["folder_id"] = folder_id
flow = Flow.model_validate(flow, from_attributes=True)
flow.updated_at = datetime.now(tz=timezone.utc).astimezone()
session.add(flow)
@ -604,6 +618,7 @@ async def find_existing_flow(session, flow_id, flow_endpoint_name):
if existing := (await session.exec(stmt)).first():
logger.debug(f"Found existing flow by endpoint name: {existing.name}")
return existing
stmt = select(Flow).where(Flow.id == flow_id)
if existing := (await session.exec(stmt)).first():
logger.debug(f"Found existing flow by id: {flow_id}")