ref: Add ruff rules TRY3xx (#4098)

Add ruff rules TRY3xx
This commit is contained in:
Christophe Bornet 2024-10-14 16:14:53 +02:00 committed by GitHub
commit c7d80f3bc7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
48 changed files with 681 additions and 563 deletions

View file

@ -54,9 +54,9 @@ def delete_api_key_route(
):
try:
delete_api_key(db, api_key_id)
return {"detail": "API Key deleted"}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e)) from e
return {"detail": "API Key deleted"}
@router.post("/store")
@ -88,10 +88,11 @@ def save_store_api_key(
domain=auth_settings.COOKIE_DOMAIN,
)
return {"detail": "API Key saved"}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e)) from e
return {"detail": "API Key saved"}
@router.delete("/store")
def delete_store_api_key(
@ -101,6 +102,7 @@ def delete_store_api_key(
try:
current_user.store_api_key = None
db.commit()
return {"detail": "API Key deleted"}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e)) from e
return {"detail": "API Key deleted"}

View file

@ -189,7 +189,6 @@ async def build_flow(
playgroundSuccess=True,
),
)
return first_layer, vertices_to_run, graph
except Exception as exc:
background_tasks.add_task(
telemetry_service.log_package_playground,
@ -205,6 +204,8 @@ async def build_flow(
logger.exception("Error checking build status")
raise HTTPException(status_code=500, detail=str(exc)) from exc
return first_layer, vertices_to_run, graph
async def _build_vertex(vertex_id: str, graph: Graph, event_manager: EventManager) -> VertexBuildResponse:
flow_id_str = str(flow_id)
@ -302,7 +303,6 @@ async def build_flow(
componentErrorMessage=error_message,
),
)
return build_response
except Exception as exc:
background_tasks.add_task(
telemetry_service.log_package_component,
@ -317,6 +317,8 @@ async def build_flow(
message = parse_exception(exc)
raise HTTPException(status_code=500, detail=message) from exc
return build_response
async def build_vertices(
vertex_id: str,
graph: Graph,
@ -588,7 +590,6 @@ async def build_vertex(
componentErrorMessage=error_message,
),
)
return build_response
except Exception as exc:
background_tasks.add_task(
telemetry_service.log_package_component,
@ -603,6 +604,90 @@ async def build_vertex(
message = parse_exception(exc)
raise HTTPException(status_code=500, detail=message) from exc
return build_response
async def _stream_vertex(flow_id: str, vertex_id: str, chat_service: ChatService):
graph = None
try:
try:
cache = await chat_service.get_cache(flow_id)
except Exception as exc: # noqa: BLE001
logger.exception("Error building Component")
yield str(StreamData(event="error", data={"error": str(exc)}))
return
if not cache:
# If there's no cache
msg = f"No cache found for {flow_id}."
logger.error(msg)
yield str(StreamData(event="error", data={"error": msg}))
return
else:
graph = cache.get("result")
try:
vertex: InterfaceVertex = graph.get_vertex(vertex_id)
except Exception as exc: # noqa: BLE001
logger.exception("Error building Component")
yield str(StreamData(event="error", data={"error": str(exc)}))
return
if not hasattr(vertex, "stream"):
msg = f"Vertex {vertex_id} does not support streaming"
logger.error(msg)
yield str(StreamData(event="error", data={"error": msg}))
return
if isinstance(vertex._built_result, str) and vertex._built_result:
stream_data = StreamData(
event="message",
data={"message": f"Streaming vertex {vertex_id}"},
)
yield str(stream_data)
stream_data = StreamData(
event="message",
data={"chunk": vertex._built_result},
)
yield str(stream_data)
elif not vertex.frozen or not vertex._built:
logger.debug(f"Streaming vertex {vertex_id}")
stream_data = StreamData(
event="message",
data={"message": f"Streaming vertex {vertex_id}"},
)
yield str(stream_data)
try:
async for chunk in vertex.stream():
stream_data = StreamData(
event="message",
data={"chunk": chunk},
)
yield str(stream_data)
except Exception as exc: # noqa: BLE001
logger.exception("Error building Component")
exc_message = parse_exception(exc)
if exc_message == "The message must be an iterator or an async iterator.":
exc_message = "This stream has already been closed."
yield str(StreamData(event="error", data={"error": exc_message}))
elif vertex.result is not None:
stream_data = StreamData(
event="message",
data={"chunk": vertex._built_result},
)
yield str(stream_data)
else:
msg = f"No result found for vertex {vertex_id}"
logger.error(msg)
yield str(StreamData(event="error", data={"error": msg}))
return
finally:
logger.debug("Closing stream")
if graph:
await chat_service.set_cache(flow_id, graph)
yield str(StreamData(event="close", data={"message": "Stream closed"}))
@router.get("/build/{flow_id}/{vertex_id}/stream", response_class=StreamingResponse)
async def build_vertex_stream(
@ -638,70 +723,6 @@ async def build_vertex_stream(
HTTPException: If an error occurs while building the vertex.
"""
try:
flow_id_str = str(flow_id)
async def stream_vertex():
graph = None
try:
cache = await chat_service.get_cache(flow_id_str)
if not cache:
# If there's no cache
msg = f"No cache found for {flow_id_str}."
raise ValueError(msg)
else:
graph = cache.get("result")
vertex: InterfaceVertex = graph.get_vertex(vertex_id)
if not hasattr(vertex, "stream"):
msg = f"Vertex {vertex_id} does not support streaming"
raise ValueError(msg)
if isinstance(vertex._built_result, str) and vertex._built_result:
stream_data = StreamData(
event="message",
data={"message": f"Streaming vertex {vertex_id}"},
)
yield str(stream_data)
stream_data = StreamData(
event="message",
data={"chunk": vertex._built_result},
)
yield str(stream_data)
elif not vertex.frozen or not vertex._built:
logger.debug(f"Streaming vertex {vertex_id}")
stream_data = StreamData(
event="message",
data={"message": f"Streaming vertex {vertex_id}"},
)
yield str(stream_data)
async for chunk in vertex.stream():
stream_data = StreamData(
event="message",
data={"chunk": chunk},
)
yield str(stream_data)
elif vertex.result is not None:
stream_data = StreamData(
event="message",
data={"chunk": vertex._built_result},
)
yield str(stream_data)
else:
msg = f"No result found for vertex {vertex_id}"
raise ValueError(msg)
except Exception as exc: # noqa: BLE001
logger.exception("Error building Component")
exc_message = parse_exception(exc)
if exc_message == "The message must be an iterator or an async iterator.":
exc_message = "This stream has already been closed."
yield str(StreamData(event="error", data={"error": exc_message}))
finally:
logger.debug("Closing stream")
if graph:
await chat_service.set_cache(flow_id_str, graph)
yield str(StreamData(event="close", data={"message": "Stream closed"}))
return StreamingResponse(stream_vertex(), media_type="text/event-stream")
return StreamingResponse(_stream_vertex(str(flow_id), vertex_id, chat_service), media_type="text/event-stream")
except Exception as exc:
raise HTTPException(status_code=500, detail="Error building Component") from exc

View file

@ -259,7 +259,6 @@ async def simplified_run_flow(
telemetry_service.log_package_run,
RunPayload(runIsWebhook=False, runSeconds=int(end_time - start_time), runSuccess=True, runErrorMessage=""),
)
return result
except ValueError as exc:
background_tasks.add_task(
@ -291,6 +290,8 @@ async def simplified_run_flow(
)
raise APIException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, exception=exc, flow=flow) from exc
return result
@router.post("/webhook/{flow_id_or_name}", response_model=dict, status_code=HTTPStatus.ACCEPTED) # noqa: RUF100, FAST003
async def webhook_run_flow(
@ -316,58 +317,57 @@ async def webhook_run_flow(
Raises:
HTTPException: If the flow is not found or if there is an error processing the request.
"""
start_time = time.perf_counter()
logger.debug("Received webhook request")
error_msg = ""
try:
start_time = time.perf_counter()
logger.debug("Received webhook request")
data = await request.body()
try:
data = await request.body()
except Exception as exc:
error_msg = str(exc)
raise HTTPException(status_code=500, detail=error_msg) from exc
if not data:
logger.error("Request body is empty")
msg = "Request body is empty. You should provide a JSON payload containing the flow ID."
raise ValueError(
msg,
error_msg = "Request body is empty. You should provide a JSON payload containing the flow ID."
raise HTTPException(status_code=400, detail=error_msg)
try:
# get all webhook components in the flow
webhook_components = get_all_webhook_components_in_flow(flow.data)
tweaks = {}
for component in webhook_components:
tweaks[component["id"]] = {"data": data.decode() if isinstance(data, bytes) else data}
input_request = SimplifiedAPIRequest(
input_value="",
input_type="chat",
output_type="chat",
tweaks=tweaks,
session_id=None,
)
# get all webhook components in the flow
webhook_components = get_all_webhook_components_in_flow(flow.data)
tweaks = {}
for component in webhook_components:
tweaks[component["id"]] = {"data": data.decode() if isinstance(data, bytes) else data}
input_request = SimplifiedAPIRequest(
input_value="",
input_type="chat",
output_type="chat",
tweaks=tweaks,
session_id=None,
)
logger.debug("Starting background task")
background_tasks.add_task(
simple_run_flow_task,
flow=flow,
input_request=input_request,
api_key_user=user,
)
background_tasks.add_task(
telemetry_service.log_package_run,
RunPayload(
runIsWebhook=True, runSeconds=int(time.perf_counter() - start_time), runSuccess=True, runErrorMessage=""
),
)
return {"message": "Task started in the background", "status": "in progress"}
except Exception as exc:
logger.debug("Starting background task")
background_tasks.add_task(
simple_run_flow_task,
flow=flow,
input_request=input_request,
api_key_user=user,
)
except Exception as exc:
error_msg = str(exc)
raise HTTPException(status_code=500, detail=error_msg) from exc
finally:
background_tasks.add_task(
telemetry_service.log_package_run,
RunPayload(
runIsWebhook=True,
runSeconds=int(time.perf_counter() - start_time),
runSuccess=False,
runErrorMessage=str(exc),
runSuccess=error_msg == "",
runErrorMessage=error_msg,
),
)
if "Flow ID is required" in str(exc) or "Request body is empty" in str(exc):
raise HTTPException(status_code=400, detail=str(exc)) from exc
raise HTTPException(status_code=500, detail=str(exc)) from exc
return {"message": "Task started in the background", "status": "in progress"}
@router.post("/run/advanced/{flow_id}", response_model=RunResponse, response_model_exclude_none=True)
@ -429,35 +429,53 @@ async def experimental_run_flow(
This endpoint facilitates complex flow executions with customized inputs, outputs, and configurations,
catering to diverse application requirements.
""" # noqa: E501
try:
flow_id_str = str(flow_id)
if outputs is None:
outputs = []
if inputs is None:
inputs = [InputValueRequest(components=[], input_value="")]
flow_id_str = str(flow_id)
if outputs is None:
outputs = []
if inputs is None:
inputs = [InputValueRequest(components=[], input_value="")]
if session_id:
if session_id:
try:
session_data = await session_service.load_session(session_id, flow_id=flow_id_str)
graph, _artifacts = session_data or (None, None)
if graph is None:
msg = f"Session {session_id} not found"
raise ValueError(msg)
else:
except Exception as exc:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) from exc
graph, _artifacts = session_data or (None, None)
if graph is None:
msg = f"Session {session_id} not found"
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=msg)
else:
try:
# Get the flow that matches the flow_id and belongs to the user
# flow = session.query(Flow).filter(Flow.id == flow_id).filter(Flow.user_id == api_key_user.id).first()
flow = session.exec(
select(Flow).where(Flow.id == flow_id_str).where(Flow.user_id == api_key_user.id)
).first()
if flow is None:
msg = f"Flow {flow_id_str} not found"
raise ValueError(msg)
except sa.exc.StatementError as exc:
# StatementError('(builtins.ValueError) badly formed hexadecimal UUID string')
if "badly formed hexadecimal UUID string" in str(exc):
logger.error(f"Flow ID {flow_id_str} is not a valid UUID")
# This means the Flow ID is not a valid UUID which means it can't find the flow
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) from exc
except Exception as exc:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) from exc
if flow.data is None:
msg = f"Flow {flow_id_str} has no data"
raise ValueError(msg)
if flow is None:
msg = f"Flow {flow_id_str} not found"
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=msg)
if flow.data is None:
msg = f"Flow {flow_id_str} has no data"
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=msg)
try:
graph_data = flow.data
graph_data = process_tweaks(graph_data, tweaks or {})
graph = Graph.from_payload(graph_data, flow_id=flow_id_str)
except Exception as exc:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) from exc
try:
task_result, session_id = await run_graph_internal(
graph=graph,
flow_id=flow_id_str,
@ -466,25 +484,11 @@ async def experimental_run_flow(
outputs=outputs,
stream=stream,
)
return RunResponse(outputs=task_result, session_id=session_id)
except sa.exc.StatementError as exc:
# StatementError('(builtins.ValueError) badly formed hexadecimal UUID string')
if "badly formed hexadecimal UUID string" in str(exc):
logger.exception(f"Flow ID {flow_id_str} is not a valid UUID")
# This means the Flow ID is not a valid UUID which means it can't find the flow
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
except ValueError as exc:
if f"Flow {flow_id_str} not found" in str(exc):
logger.exception(f"Flow {flow_id_str} not found")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
if f"Session {session_id} not found" in str(exc):
logger.exception(f"Session {session_id} not found")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) from exc
except Exception as exc:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) from exc
return RunResponse(outputs=task_result, session_id=session_id)
@router.post(
"/predict/{flow_id}",
@ -639,12 +643,12 @@ async def custom_component_update(
field_value=code_request.field_value,
field_name=code_request.field,
)
component_node["template"] = updated_build_config
return component_node
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
component_node["template"] = updated_build_config
return component_node
@router.get("/config", response_model=ConfigResponse)
def get_config():

View file

@ -47,16 +47,24 @@ async def upload_file(
):
try:
max_file_size_upload = get_storage_service().settings_service.settings.max_file_size_upload
if file.size > max_file_size_upload * 1024 * 1024:
raise HTTPException(
status_code=413, detail=f"File size is larger than the maximum file size {max_file_size_upload}MB."
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
if file.size > max_file_size_upload * 1024 * 1024:
raise HTTPException(
status_code=413, detail=f"File size is larger than the maximum file size {max_file_size_upload}MB."
)
try:
flow_id_str = str(flow_id)
flow = session.get(Flow, flow_id_str)
if flow.user_id != current_user.id:
raise HTTPException(status_code=403, detail="You don't have access to this flow")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
if flow.user_id != current_user.id:
raise HTTPException(status_code=403, detail="You don't have access to this flow")
try:
file_content = await file.read()
timestamp = datetime.now(tz=timezone.utc).astimezone().strftime("%Y-%m-%d_%H-%M-%S")
file_name = file.filename or hashlib.sha256(file_content).hexdigest()
@ -72,18 +80,20 @@ async def upload_file(
async def download_file(
file_name: str, flow_id: UUID, storage_service: Annotated[StorageService, Depends(get_storage_service)]
):
flow_id_str = str(flow_id)
extension = file_name.split(".")[-1]
if not extension:
raise HTTPException(status_code=500, detail=f"Extension not found for file {file_name}")
try:
flow_id_str = str(flow_id)
extension = file_name.split(".")[-1]
if not extension:
raise HTTPException(status_code=500, detail=f"Extension not found for file {file_name}")
content_type = build_content_type_from_extension(extension)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
if not content_type:
raise HTTPException(status_code=500, detail=f"Content type not found for extension {extension}")
if not content_type:
raise HTTPException(status_code=500, detail=f"Content type not found for extension {extension}")
try:
file_content = await storage_service.get_file(flow_id=flow_id_str, file_name=file_name)
headers = {
"Content-Disposition": f"attachment; filename={file_name} filename*=UTF-8''{file_name}",
@ -99,20 +109,22 @@ async def download_file(
async def download_image(
file_name: str, flow_id: UUID, storage_service: Annotated[StorageService, Depends(get_storage_service)]
):
extension = file_name.split(".")[-1]
flow_id_str = str(flow_id)
if not extension:
raise HTTPException(status_code=500, detail=f"Extension not found for file {file_name}")
try:
extension = file_name.split(".")[-1]
flow_id_str = str(flow_id)
if not extension:
raise HTTPException(status_code=500, detail=f"Extension not found for file {file_name}")
content_type = build_content_type_from_extension(extension)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
if not content_type:
raise HTTPException(status_code=500, detail=f"Content type not found for extension {extension}")
if not content_type.startswith("image"):
raise HTTPException(status_code=500, detail=f"Content type {content_type} is not an image")
if not content_type:
raise HTTPException(status_code=500, detail=f"Content type not found for extension {extension}")
if not content_type.startswith("image"):
raise HTTPException(status_code=500, detail=f"Content type {content_type} is not an image")
try:
file_content = await storage_service.get_file(flow_id=flow_id_str, file_name=file_name)
return StreamingResponse(BytesIO(file_content), media_type=content_type)
except Exception as e:
@ -150,14 +162,14 @@ async def list_profile_pictures(storage_service: Annotated[StorageService, Depen
people = await storage_service.list_files(flow_id=people_path) # type: ignore[arg-type]
space = await storage_service.list_files(flow_id=space_path) # type: ignore[arg-type]
files = [f"People/{i}" for i in people]
files += [f"Space/{i}" for i in space]
return {"files": files}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
files = [f"People/{i}" for i in people]
files += [f"Space/{i}" for i in space]
return {"files": files}
@router.get("/list/{flow_id}")
async def list_files(
@ -167,10 +179,11 @@ async def list_files(
try:
flow_id_str = str(flow_id)
files = await storage_service.list_files(flow_id=flow_id_str)
return {"files": files}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
return {"files": files}
@router.delete("/delete/{flow_id}/{file_name}")
async def delete_file(
@ -181,6 +194,7 @@ async def delete_file(
try:
flow_id_str = str(flow_id)
await storage_service.delete_file(flow_id=flow_id_str, file_name=file_name)
return {"message": f"File {file_name} deleted successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
return {"message": f"File {file_name} deleted successfully"}

View file

@ -102,7 +102,6 @@ def create_flow(
session.add(db_flow)
session.commit()
session.refresh(db_flow)
return db_flow
except Exception as e:
# If it is a validation error, return the error message
if hasattr(e, "errors"):
@ -122,6 +121,8 @@ def create_flow(
raise
raise HTTPException(status_code=500, detail=str(e)) from e
return db_flow
@router.get("/", response_model=list[FlowRead] | Page[FlowRead] | list[FlowHeader], status_code=200)
def read_flows(
@ -199,13 +200,11 @@ def read_flows(
raise HTTPException(status_code=500, detail=str(e)) from e
@router.get("/{flow_id}", response_model=FlowRead, status_code=200)
def read_flow(
*,
session: Session = Depends(get_session),
def _read_flow(
session: Session,
flow_id: UUID,
current_user: User = Depends(get_current_active_user),
settings_service: SettingsService = Depends(get_settings_service),
current_user: User,
settings_service: SettingsService,
):
"""Read a flow."""
auth_settings = settings_service.auth_settings
@ -216,7 +215,19 @@ def read_flow(
stmt = stmt.where(
(Flow.user_id == current_user.id) | (Flow.user_id == None) # noqa: E711
)
if user_flow := session.exec(stmt).first():
return session.exec(stmt).first()
@router.get("/{flow_id}", response_model=FlowRead, status_code=200)
def read_flow(
*,
session: Session = Depends(get_session),
flow_id: UUID,
current_user: User = Depends(get_current_active_user),
settings_service: SettingsService = Depends(get_settings_service),
):
"""Read a flow."""
if user_flow := _read_flow(session, flow_id, current_user, settings_service):
return user_flow
raise HTTPException(status_code=404, detail="Flow not found")
@ -232,14 +243,19 @@ def update_flow(
):
"""Update a flow."""
try:
db_flow = read_flow(
db_flow = _read_flow(
session=session,
flow_id=flow_id,
current_user=current_user,
settings_service=settings_service,
)
if not db_flow:
raise HTTPException(status_code=404, detail="Flow not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
if not db_flow:
raise HTTPException(status_code=404, detail="Flow not found")
try:
flow_data = flow.model_dump(exclude_unset=True)
if settings_service.settings.remove_api_keys:
flow_data = remove_api_keys(flow_data)
@ -256,7 +272,6 @@ def update_flow(
session.add(db_flow)
session.commit()
session.refresh(db_flow)
return db_flow
except Exception as e:
# If it is a validation error, return the error message
if hasattr(e, "errors"):
@ -272,10 +287,10 @@ def update_flow(
raise HTTPException(
status_code=400, detail=f"{column.capitalize().replace('_', ' ')} must be unique"
) from e
if isinstance(e, HTTPException):
raise
raise HTTPException(status_code=500, detail=str(e)) from e
return db_flow
@router.delete("/{flow_id}", status_code=200)
async def delete_flow(
@ -286,7 +301,7 @@ async def delete_flow(
settings_service=Depends(get_settings_service),
):
"""Delete a flow."""
flow = read_flow(
flow = _read_flow(
session=session,
flow_id=flow_id,
current_user=current_user,

View file

@ -75,10 +75,11 @@ def create_folder(
session.exec(update_statement_flows)
session.commit()
return new_folder
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
return new_folder
@router.get("/", response_model=list[FolderRead], status_code=200)
def read_folders(
@ -111,9 +112,15 @@ def read_folder(
):
try:
folder = session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)).first()
if not folder:
raise HTTPException(status_code=404, detail="Folder not found")
except Exception as e:
if "No result found" in str(e):
raise HTTPException(status_code=404, detail="Folder not found") from e
raise HTTPException(status_code=500, detail=str(e)) from e
if not folder:
raise HTTPException(status_code=404, detail="Folder not found")
try:
stmt = select(Flow).where(Flow.folder_id == folder_id, Flow.user_id == current_user.id)
if Flow.updated_at is not None:
@ -128,8 +135,6 @@ def read_folder(
return FolderWithPaginatedFlows(folder=FolderRead.model_validate(folder), flows=paginated_flows)
except Exception as e:
if "No result found" in str(e):
raise HTTPException(status_code=404, detail="Folder not found") from e
raise HTTPException(status_code=500, detail=str(e)) from e
@ -145,8 +150,13 @@ def update_folder(
existing_folder = session.exec(
select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)
).first()
if not existing_folder:
raise HTTPException(status_code=404, detail="Folder not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
if not existing_folder:
raise HTTPException(status_code=404, detail="Folder not found")
try:
if folder.name and folder.name != existing_folder.name:
existing_folder.name = folder.name
session.add(existing_folder)
@ -183,11 +193,11 @@ def update_folder(
session.exec(update_statement_components)
session.commit()
return existing_folder
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
return existing_folder
@router.delete("/{folder_id}", status_code=204)
async def delete_folder(
@ -203,11 +213,15 @@ async def delete_folder(
await cascade_delete_flow(session, flow)
folder = session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)).first()
if not folder:
raise HTTPException(status_code=404, detail="Folder not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
if not folder:
raise HTTPException(status_code=404, detail="Folder not found")
try:
session.delete(folder)
session.commit()
return Response(status_code=status.HTTP_204_NO_CONTENT)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@ -222,12 +236,17 @@ async def download_file(
):
"""Download all flows from folder."""
try:
return session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)).first()
folder = session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)).first()
except Exception as e:
if "No result found" in str(e):
raise HTTPException(status_code=404, detail="Folder not found") from e
raise HTTPException(status_code=500, detail=str(e)) from e
if not folder:
raise HTTPException(status_code=404, detail="Folder not found")
return folder
@router.post("/upload/", response_model=list[FlowRead], status_code=201)
async def upload_file(

View file

@ -94,19 +94,23 @@ async def update_message(
):
try:
db_message = session.get(MessageTable, message_id)
if not db_message:
raise HTTPException(status_code=404, detail="Message not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
if not db_message:
raise HTTPException(status_code=404, detail="Message not found")
try:
message_dict = message.model_dump(exclude_unset=True, exclude_none=True)
db_message.sqlmodel_update(message_dict)
session.add(db_message)
session.commit()
session.refresh(db_message)
return db_message
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
return db_message
@router.patch("/messages/session/{old_session_id}", response_model=list[MessageResponse])
async def update_session_id(
@ -119,10 +123,13 @@ async def update_session_id(
# Get all messages with the old session ID
stmt = select(MessageTable).where(MessageTable.session_id == old_session_id)
messages = session.exec(stmt).all()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
if not messages:
raise HTTPException(status_code=404, detail="No messages found with the given session ID")
if not messages:
raise HTTPException(status_code=404, detail="No messages found with the given session ID")
try:
# Update all messages with the new session ID
for message in messages:
message.session_id = new_session_id
@ -134,12 +141,11 @@ async def update_session_id(
for message in messages:
session.refresh(message)
message_responses.append(MessageResponse.model_validate(message, from_attributes=True))
return message_responses
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
return message_responses
@router.delete("/messages/session/{session_id}", status_code=204)
async def delete_messages_session(
@ -153,10 +159,11 @@ async def delete_messages_session(
.execution_options(synchronize_session="fetch")
)
session.commit()
return {"message": "Messages deleted successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
return {"message": "Messages deleted successfully"}
@router.get("/transactions", response_model=list[TransactionReadResponse])
async def get_transactions(

View file

@ -25,19 +25,18 @@ def create_variable(
variable_service: DatabaseVariableService = Depends(get_variable_service),
):
"""Create a new variable."""
if not variable.name and not variable.value:
raise HTTPException(status_code=400, detail="Variable name and value cannot be empty")
if not variable.name:
raise HTTPException(status_code=400, detail="Variable name cannot be empty")
if not variable.value:
raise HTTPException(status_code=400, detail="Variable value cannot be empty")
if variable.name in variable_service.list_variables(user_id=current_user.id, session=session):
raise HTTPException(status_code=400, detail="Variable name already exists")
try:
if not variable.name and not variable.value:
raise HTTPException(status_code=400, detail="Variable name and value cannot be empty")
if not variable.name:
raise HTTPException(status_code=400, detail="Variable name cannot be empty")
if not variable.value:
raise HTTPException(status_code=400, detail="Variable value cannot be empty")
if variable.name in variable_service.list_variables(user_id=current_user.id, session=session):
raise HTTPException(status_code=400, detail="Variable name already exists")
return variable_service.create_variable(
user_id=current_user.id,
name=variable.name,

View file

@ -82,12 +82,13 @@ class LCModelComponent(Component):
message = runnable.invoke(input_value)
result = message.content if hasattr(message, "content") else message
self.status = result
return result
except Exception as e:
if message := self._get_exception_message(e):
raise ValueError(message) from e
raise
return result
def build_status_message(self, message: AIMessage):
"""
Builds a status message from an AIMessage object.
@ -194,12 +195,13 @@ class LCModelComponent(Component):
self.status = result
else:
self.status = result
return result
except Exception as e:
if message := self._get_exception_message(e):
raise ValueError(message) from e
raise
return result
@abstractmethod
def build_model(self) -> LanguageModel: # type: ignore[type-var]
"""

View file

@ -96,7 +96,7 @@ class AddContentToPage(LCToolComponent):
heading_level = text.count("#", 0, 6)
heading_text = text[heading_level:].strip()
if heading_level in range(3):
blocks.append(self.create_block(f"heading_{heading_level+1}", heading_text))
blocks.append(self.create_block(f"heading_{heading_level + 1}", heading_text))
else:
blocks.append(self.create_block("paragraph", text))
elif node.name == "h1":

View file

@ -92,7 +92,6 @@ class NotionPageUpdate(LCToolComponent):
updated_page = response.json()
logger.info(f"Successfully updated Notion page. Response: {json.dumps(updated_page)}")
return updated_page
except requests.exceptions.HTTPError as e:
error_message = f"HTTP Error occurred: {e}"
if e.response is not None:
@ -109,5 +108,7 @@ class NotionPageUpdate(LCToolComponent):
logger.exception(error_message)
return error_message
return updated_page
def __call__(self, *args, **kwargs):
return self._update_notion_page(*args, **kwargs)

View file

@ -132,15 +132,16 @@ class AssemblyAILeMUR(Component):
# Perform LeMUR action
try:
response = self.perform_lemur_action(transcript_group, self.endpoint)
result = Data(data=response)
self.status = result
return result
except Exception as e: # noqa: BLE001
logger.opt(exception=True).debug("Error running LeMUR")
error = f"An Error happened: {e}"
self.status = error
return Data(data={"error": error})
result = Data(data=response)
self.status = result
return result
def perform_lemur_action(self, transcript_group: aai.TranscriptGroup, endpoint: str) -> dict:
logger.info("Endpoint:", endpoint, type(endpoint))
if endpoint == "task":

View file

@ -84,10 +84,11 @@ class AssemblyAIListTranscripts(Component):
page = transcriber.list_transcripts(params)
transcripts = convert_page_to_data_list(page)
self.status = transcripts
return transcripts
except Exception as e: # noqa: BLE001
logger.opt(exception=True).debug("Error listing transcripts")
error_data = Data(data={"error": f"An error occurred: {e}"})
self.status = [error_data]
return [error_data]
self.status = transcripts
return transcripts

View file

@ -173,14 +173,14 @@ class AssemblyAITranscriptionJobCreator(Component):
try:
transcript = aai.Transcriber().submit(audio, config=config)
if transcript.error:
self.status = transcript.error
return Data(data={"error": transcript.error})
result = Data(data={"transcript_id": transcript.id})
self.status = result
return result
except Exception as e: # noqa: BLE001
logger.opt(exception=True).debug("Error submitting transcription job")
self.status = f"An error occurred: {e}"
return Data(data={"error": f"An error occurred: {e}"})
if transcript.error:
self.status = transcript.error
return Data(data={"error": transcript.error})
result = Data(data={"transcript_id": transcript.id})
self.status = result
return result

View file

@ -46,27 +46,27 @@ class MergeDataComponent(Component):
"""
logger.info("Initiating the data merging process.")
data_inputs: list[Data] = self.data_inputs
logger.debug(f"Received {len(data_inputs)} data input(s) for merging.")
if not data_inputs:
logger.warning("No data inputs provided. Returning an empty list.")
return []
# Collect all unique keys from all Data objects
all_keys: set[str] = set()
for idx, data_input in enumerate(data_inputs):
if not isinstance(data_input, Data):
error_message = f"Data input at index {idx} is not of type Data."
logger.error(error_message)
type_error_message = (
f"All items in data_inputs must be of type Data. Item at index {idx} is {type(data_input)}"
)
raise TypeError(type_error_message)
all_keys.update(data_input.data.keys())
logger.debug(f"Collected {len(all_keys)} unique key(s) from input data.")
try:
data_inputs: list[Data] = self.data_inputs
logger.debug(f"Received {len(data_inputs)} data input(s) for merging.")
if not data_inputs:
logger.warning("No data inputs provided. Returning an empty list.")
return []
# Collect all unique keys from all Data objects
all_keys: set[str] = set()
for idx, data_input in enumerate(data_inputs):
if not isinstance(data_input, Data):
error_message = f"Data input at index {idx} is not of type Data."
logger.error(error_message)
type_error_message = (
"All items in data_inputs must be of type Data. " f"Item at index {idx} is {type(data_input)}"
)
raise TypeError(type_error_message)
all_keys.update(data_input.data.keys())
logger.debug(f"Collected {len(all_keys)} unique key(s) from input data.")
# Create new list of Data objects with missing keys filled with empty strings
merged_data_list = []
for idx, data_input in enumerate(data_inputs):
@ -86,9 +86,9 @@ class MergeDataComponent(Component):
merged_data_list.append(merged_data)
logger.debug(f"Merged Data object created for input at index {idx}.")
logger.info("Data merging process completed successfully.")
return merged_data_list
except Exception:
logger.exception("An error occurred during the data merging process.")
raise
logger.info("Data merging process completed successfully.")
return merged_data_list

View file

@ -46,16 +46,21 @@ class SubFlowComponent(CustomComponent):
if field_value is not None and field_name == "flow_name":
try:
flow_data = self.get_flow(field_value)
if not flow_data:
msg = f"Flow {field_value} not found."
raise ValueError(msg)
graph = Graph.from_payload(flow_data.data["data"])
# Get all inputs from the graph
inputs = get_flow_inputs(graph)
# Add inputs to the build config
build_config = self.add_inputs_to_build_config(inputs, build_config)
except Exception: # noqa: BLE001
logger.exception(f"Error getting flow {field_value}")
else:
if not flow_data:
msg = f"Flow {field_value} not found."
logger.error(msg)
else:
try:
graph = Graph.from_payload(flow_data.data["data"])
# Get all inputs from the graph
inputs = get_flow_inputs(graph)
# Add inputs to the build config
build_config = self.add_inputs_to_build_config(inputs, build_config)
except Exception: # noqa: BLE001
logger.exception(f"Error building graph for flow {field_value}")
return build_config

View file

@ -38,46 +38,42 @@ class CSVToDataComponent(Component):
]
def load_csv_to_data(self) -> list[Data]:
if sum(bool(field) for field in [self.csv_file, self.csv_path, self.csv_string]) != 1:
msg = "Please provide exactly one of: CSV file, file path, or CSV string."
raise ValueError(msg)
csv_data = None
try:
if sum(bool(field) for field in [self.csv_file, self.csv_path, self.csv_string]) != 1:
msg = "Please provide exactly one of: CSV file, file path, or CSV string."
raise ValueError(msg)
csv_data = None
if self.csv_file:
resolved_path = self.resolve_path(self.csv_file)
file_path = Path(resolved_path)
if file_path.suffix.lower() != ".csv":
msg = "The provided file must be a CSV file."
raise ValueError(msg)
with file_path.open(newline="", encoding="utf-8") as csvfile:
csv_data = csvfile.read()
self.status = "The provided file must be a CSV file."
else:
with file_path.open(newline="", encoding="utf-8") as csvfile:
csv_data = csvfile.read()
elif self.csv_path:
file_path = Path(self.csv_path)
if file_path.suffix.lower() != ".csv":
msg = "The provided file must be a CSV file."
raise ValueError(msg)
with file_path.open(newline="", encoding="utf-8") as csvfile:
csv_data = csvfile.read()
self.status = "The provided file must be a CSV file."
else:
with file_path.open(newline="", encoding="utf-8") as csvfile:
csv_data = csvfile.read()
elif self.csv_string:
else:
csv_data = self.csv_string
if not csv_data:
msg = "No CSV data provided."
raise ValueError(msg)
if csv_data:
csv_reader = csv.DictReader(io.StringIO(csv_data))
result = [Data(data=row) for row in csv_reader]
csv_reader = csv.DictReader(io.StringIO(csv_data))
result = [Data(data=row) for row in csv_reader]
if not result:
self.status = "The CSV data is empty."
return []
if not result:
self.status = "The CSV data is empty."
return []
self.status = result
return result
self.status = result
return result
except csv.Error as e:
error_message = f"CSV parsing error: {e}"
@ -88,3 +84,6 @@ class CSVToDataComponent(Component):
error_message = f"An error occurred: {e}"
self.status = error_message
raise ValueError(error_message) from e
# An error occurred
raise ValueError(self.status)

View file

@ -41,53 +41,50 @@ class JSONToDataComponent(Component):
]
def convert_json_to_data(self) -> Data | list[Data]:
if sum(bool(field) for field in [self.json_file, self.json_path, self.json_string]) != 1:
msg = "Please provide exactly one of: JSON file, file path, or JSON string."
self.status = msg
raise ValueError(msg)
json_data = None
try:
if sum(bool(field) for field in [self.json_file, self.json_path, self.json_string]) != 1:
msg = "Please provide exactly one of: JSON file, file path, or JSON string."
raise ValueError(msg)
json_data = None
if self.json_file:
resolved_path = self.resolve_path(self.json_file)
file_path = Path(resolved_path)
if file_path.suffix.lower() != ".json":
msg = "The provided file must be a JSON file."
raise ValueError(msg)
with file_path.open(encoding="utf-8") as jsonfile:
json_data = jsonfile.read()
self.status = "The provided file must be a JSON file."
else:
with file_path.open(encoding="utf-8") as jsonfile:
json_data = jsonfile.read()
elif self.json_path:
file_path = Path(self.json_path)
if file_path.suffix.lower() != ".json":
msg = "The provided file must be a JSON file."
raise ValueError(msg)
with file_path.open(encoding="utf-8") as jsonfile:
json_data = jsonfile.read()
self.status = "The provided file must be a JSON file."
else:
with file_path.open(encoding="utf-8") as jsonfile:
json_data = jsonfile.read()
elif self.json_string:
else:
json_data = self.json_string
if not json_data:
msg = "No JSON data provided."
raise ValueError(msg)
if json_data:
# Try to parse the JSON string
try:
parsed_data = json.loads(json_data)
except json.JSONDecodeError:
# If JSON parsing fails, try to repair the JSON string
repaired_json_string = repair_json(json_data)
parsed_data = json.loads(repaired_json_string)
# Try to parse the JSON string
try:
parsed_data = json.loads(json_data)
except json.JSONDecodeError:
# If JSON parsing fails, try to repair the JSON string
repaired_json_string = repair_json(json_data)
parsed_data = json.loads(repaired_json_string)
# Check if the parsed data is a list
if isinstance(parsed_data, list):
result = [Data(data=item) for item in parsed_data]
else:
result = Data(data=parsed_data)
self.status = result
return result
# Check if the parsed data is a list
if isinstance(parsed_data, list):
result = [Data(data=item) for item in parsed_data]
else:
result = Data(data=parsed_data)
self.status = result
return result
except (json.JSONDecodeError, SyntaxError, ValueError) as e:
error_message = f"Invalid JSON or Python literal: {e}"
@ -98,3 +95,6 @@ class JSONToDataComponent(Component):
error_message = f"An error occurred: {e}"
self.status = error_message
raise ValueError(error_message) from e
# An error occurred
raise ValueError(self.status)

View file

@ -46,27 +46,27 @@ class MergeDataComponent(Component):
"""
logger.info("Initiating the data merging process.")
data_inputs: list[Data] = self.data_inputs
logger.debug(f"Received {len(data_inputs)} data input(s) for merging.")
if not data_inputs:
logger.warning("No data inputs provided. Returning an empty list.")
return []
# Collect all unique keys from all Data objects
all_keys: set[str] = set()
for idx, data_input in enumerate(data_inputs):
if not isinstance(data_input, Data):
error_message = f"Data input at index {idx} is not of type Data."
logger.error(error_message)
type_error_message = (
f"All items in data_inputs must be of type Data. Item at index {idx} is {type(data_input)}"
)
raise TypeError(type_error_message)
all_keys.update(data_input.data.keys())
logger.debug(f"Collected {len(all_keys)} unique key(s) from input data.")
try:
data_inputs: list[Data] = self.data_inputs
logger.debug(f"Received {len(data_inputs)} data input(s) for merging.")
if not data_inputs:
logger.warning("No data inputs provided. Returning an empty list.")
return []
# Collect all unique keys from all Data objects
all_keys: set[str] = set()
for idx, data_input in enumerate(data_inputs):
if not isinstance(data_input, Data):
error_message = f"Data input at index {idx} is not of type Data."
logger.error(error_message)
type_error_message = (
"All items in data_inputs must be of type Data. " f"Item at index {idx} is {type(data_input)}"
)
raise TypeError(type_error_message)
all_keys.update(data_input.data.keys())
logger.debug(f"Collected {len(all_keys)} unique key(s) from input data.")
# Create new list of Data objects with missing keys filled with empty strings
merged_data_list = []
for idx, data_input in enumerate(data_inputs):
@ -86,9 +86,9 @@ class MergeDataComponent(Component):
merged_data_list.append(merged_data)
logger.debug("Merged Data object created for input at index: " + str(idx))
logger.info("Data merging process completed successfully.")
return merged_data_list
except Exception:
logger.exception("An error occurred during the data merging process.")
raise
logger.info("Data merging process completed successfully.")
return merged_data_list

View file

@ -26,18 +26,14 @@ class MessageToDataComponent(Component):
]
def convert_message_to_data(self) -> Data:
try:
if not isinstance(self.message, Message):
msg = "Input must be a Message object"
raise TypeError(msg)
if isinstance(self.message, Message):
# Convert Message to Data
data = Data(data=self.message.data)
self.status = "Successfully converted Message to Data"
return data
except Exception as e: # noqa: BLE001
error_message = f"Error converting Message to Data: {e}"
logger.opt(exception=True).debug(error_message)
self.status = error_message
return Data(data={"error": error_message})
msg = "Error converting Message to Data: Input must be a Message object"
logger.opt(exception=True).debug(msg)
self.status = msg
return Data(data={"error": msg})

View file

@ -57,12 +57,12 @@ class JSONCleaner(Component):
normalize_unicode = self.normalize_unicode
validate_json = self.validate_json
start = json_str.find("{")
end = json_str.rfind("}")
if start == -1 or end == -1:
msg = "Invalid JSON string: Missing '{' or '}'"
raise ValueError(msg)
try:
start = json_str.find("{")
end = json_str.rfind("}")
if start == -1 or end == -1:
msg = "Invalid JSON string: Missing '{' or '}'"
raise ValueError(msg)
json_str = json_str[start : end + 1]
if remove_control_chars:
@ -93,7 +93,7 @@ class JSONCleaner(Component):
"""Validate the JSON string."""
try:
json.loads(s)
return s
except json.JSONDecodeError as e:
msg = f"Invalid JSON string: {e}"
raise ValueError(msg) from e
return s

View file

@ -38,16 +38,21 @@ class SubFlowComponent(Component):
if field_value is not None and field_name == "flow_name":
try:
flow_data = self.get_flow(field_value)
if not flow_data:
msg = f"Flow {field_value} not found."
raise ValueError(msg)
graph = Graph.from_payload(flow_data.data["data"])
# Get all inputs from the graph
inputs = get_flow_inputs(graph)
# Add inputs to the build config
build_config = self.add_inputs_to_build_config(inputs, build_config)
except Exception: # noqa: BLE001
logger.exception(f"Error getting flow {field_value}")
else:
if not flow_data:
msg = f"Flow {field_value} not found."
logger.error(msg)
else:
try:
graph = Graph.from_payload(flow_data.data["data"])
# Get all inputs from the graph
inputs = get_flow_inputs(graph)
# Add inputs to the build config
build_config = self.add_inputs_to_build_config(inputs, build_config)
except Exception: # noqa: BLE001
logger.exception(f"Error building graph for flow {field_value}")
return build_config

View file

@ -64,11 +64,12 @@ class ComposioAPIComponent(LCToolComponent):
entity = toolset.client.get_entity(id=self.entity_id)
try:
entity.get_connection(app=app)
return f"{app} CONNECTED"
except Exception: # noqa: BLE001
logger.opt(exception=True).debug("Authorization error")
return self._handle_authorization_failure(toolset, entity, app)
return f"{app} CONNECTED"
def _handle_authorization_failure(self, toolset: ComposioToolSet, entity: Any, app: str) -> str:
"""
Handles the authorization failure by attempting to process API key auth or initiate default connection.

View file

@ -39,29 +39,28 @@ class CalculatorToolComponent(LCToolComponent):
args_schema=self.CalculatorToolSchema,
)
def _eval_expr(self, node):
# Define the allowed operators
operators = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Pow: operator.pow,
}
if isinstance(node, ast.Num):
return node.n
if isinstance(node, ast.BinOp):
return operators[type(node.op)](self._eval_expr(node.left), self._eval_expr(node.right))
if isinstance(node, ast.UnaryOp):
return operators[type(node.op)](self._eval_expr(node.operand))
raise TypeError(node)
def _evaluate_expression(self, expression: str) -> list[Data]:
try:
# Define the allowed operators
operators = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Pow: operator.pow,
}
def eval_expr(node):
if isinstance(node, ast.Num):
return node.n
if isinstance(node, ast.BinOp):
return operators[type(node.op)](eval_expr(node.left), eval_expr(node.right))
if isinstance(node, ast.UnaryOp):
return operators[type(node.op)](eval_expr(node.operand))
raise TypeError(node)
# Parse the expression and evaluate it
tree = ast.parse(expression, mode="eval")
result = eval_expr(tree.body)
result = self._eval_expr(tree.body)
# Format the result to a reasonable number of decimal places
formatted_result = f"{result:.6f}".rstrip("0").rstrip(".")

View file

@ -86,9 +86,10 @@ class SerpAPIComponent(LCToolComponent):
data_list = [Data(data=result, text=result.get("snippet", "")) for result in results]
self.status = data_list
return data_list
except Exception as e: # noqa: BLE001
logger.opt(exception=True).debug("Error running SerpAPI")
self.status = f"Error: {e}"
return [Data(data={"error": str(e)}, text=str(e))]
self.status = data_list
return data_list

View file

@ -148,9 +148,6 @@ Note: Check 'Advanced' for all options.
if include_images and search_results.get("images"):
data_results.append(Data(data={"images": search_results["images"]}))
self.status: Any = data_results
return data_results
except httpx.HTTPStatusError as e:
error_message = f"HTTP error: {e.response.status_code} - {e.response.text}"
self.status = error_message
@ -160,3 +157,6 @@ Note: Check 'Advanced' for all options.
error_message = f"Unexpected error: {e}"
self.status = error_message
return [Data(data={"error": error_message})]
self.status: Any = data_results
return data_results

View file

@ -94,10 +94,10 @@ class YfinanceToolComponent(LCToolComponent):
else:
data_list = [Data(data={"result": result})]
return data_list
except Exception as e: # noqa: BLE001
error_message = f"Error retrieving data: {e}"
logger.opt(exception=True).debug(error_message)
self.status = error_message
return [Data(data={"error": error_message})]
return data_list

View file

@ -189,14 +189,15 @@ class ElasticsearchVectorStoreComponent(LCVectorStoreComponent):
if query:
search_type = self.search_type.lower()
if search_type not in ["similarity", "mmr"]:
msg = f"Invalid search type: {self.search_type}"
logger.error(msg)
raise ValueError(msg)
try:
if search_type == "similarity":
results = vector_store.similarity_search_with_score(query, **search_kwargs)
elif search_type == "mmr":
results = vector_store.max_marginal_relevance_search(query, **search_kwargs)
else:
msg = f"Invalid search type: {self.search_type}"
raise ValueError(msg)
except Exception as e:
msg = (
"Error occurred while querying the Elasticsearch VectorStore,"

View file

@ -229,15 +229,15 @@ class OpenSearchVectorStoreComponent(LCVectorStoreComponent):
results = vector_store.max_marginal_relevance_search(query, **search_kwargs)
return [{"page_content": doc.page_content, "metadata": doc.metadata} for doc in results]
error_message = f"Invalid search type:: {self.search_type}"
logger.exception(error_message)
raise ValueError(error_message)
except Exception as e:
error_message = f"Error during search: {e}"
logger.exception(error_message)
raise RuntimeError(error_message) from e
error_message = f"Error during search. Invalid search type: {self.search_type}"
logger.error(error_message)
raise ValueError(error_message)
def search_documents(self) -> list[Data]:
"""
Search for documents in the vector store based on the search input.
@ -253,9 +253,10 @@ class OpenSearchVectorStoreComponent(LCVectorStoreComponent):
)
for result in results
]
self.status = retrieved_data
return retrieved_data
except Exception as e:
error_message = f"Error during document search: {e}"
logger.exception(error_message)
raise RuntimeError(error_message) from e
self.status = retrieved_data
return retrieved_data

View file

@ -91,9 +91,9 @@ class DirectoryReader:
"""
try:
ast.parse(file_content)
return True
except SyntaxError:
return False
return True
def validate_build(self, file_content):
"""

View file

@ -260,33 +260,38 @@ def run_build_inputs(
def get_component_instance(custom_component: CustomComponent, user_id: str | UUID | None = None):
try:
if custom_component._code is None:
msg = "Code is None"
raise ValueError(msg)
if isinstance(custom_component._code, str):
if custom_component._code is None:
error = "Code is None"
elif not isinstance(custom_component._code, str):
error = "Invalid code type"
else:
try:
custom_class = eval_custom_component_code(custom_component._code)
else:
msg = "Invalid code type"
raise TypeError(msg)
except Exception as exc:
logger.exception("Error while evaluating custom component code")
raise HTTPException(
status_code=400,
detail={
"error": ("Invalid type convertion. Please check your code and try again."),
"traceback": traceback.format_exc(),
},
) from exc
except Exception as exc:
logger.exception("Error while evaluating custom component code")
raise HTTPException(
status_code=400,
detail={
"error": ("Invalid type conversion. Please check your code and try again."),
"traceback": traceback.format_exc(),
},
) from exc
try:
return custom_class(_user_id=user_id, _code=custom_component._code)
except Exception as exc:
logger.exception("Error while instantiating custom component")
if hasattr(exc, "detail") and "traceback" in exc.detail:
logger.error(exc.detail["traceback"])
try:
return custom_class(_user_id=user_id, _code=custom_component._code)
except Exception as exc:
logger.exception("Error while instantiating custom component")
if hasattr(exc, "detail") and "traceback" in exc.detail:
logger.error(exc.detail["traceback"])
raise
raise
msg = f"Invalid type conversion: {error}. Please check your code and try again."
logger.error(msg)
raise HTTPException(
status_code=400,
detail={"error": msg},
)
def run_build_config(
@ -295,46 +300,49 @@ def run_build_config(
) -> tuple[dict, CustomComponent]:
"""Build the field configuration for a custom component"""
try:
if custom_component._code is None:
msg = "Code is None"
raise ValueError(msg)
if isinstance(custom_component._code, str):
if custom_component._code is None:
error = "Code is None"
elif not isinstance(custom_component._code, str):
error = "Invalid code type"
else:
try:
custom_class = eval_custom_component_code(custom_component._code)
else:
msg = "Invalid code type"
raise TypeError(msg)
except Exception as exc:
logger.exception("Error while evaluating custom component code")
raise HTTPException(
status_code=400,
detail={
"error": ("Invalid type convertion. Please check your code and try again."),
"traceback": traceback.format_exc(),
},
) from exc
except Exception as exc:
logger.exception("Error while evaluating custom component code")
raise HTTPException(
status_code=400,
detail={
"error": ("Invalid type conversion. Please check your code and try again."),
"traceback": traceback.format_exc(),
},
) from exc
try:
custom_instance = custom_class(_user_id=user_id)
build_config: dict = custom_instance.build_config()
try:
custom_instance = custom_class(_user_id=user_id)
build_config: dict = custom_instance.build_config()
for field_name, field in build_config.copy().items():
# Allow user to build Input as well
# as a dict with the same keys as Input
field_dict = get_field_dict(field)
# Let's check if "rangeSpec" is a RangeSpec object
if "rangeSpec" in field_dict and isinstance(field_dict["rangeSpec"], RangeSpec):
field_dict["rangeSpec"] = field_dict["rangeSpec"].model_dump()
build_config[field_name] = field_dict
for field_name, field in build_config.copy().items():
# Allow user to build Input as well
# as a dict with the same keys as Input
field_dict = get_field_dict(field)
# Let's check if "rangeSpec" is a RangeSpec object
if "rangeSpec" in field_dict and isinstance(field_dict["rangeSpec"], RangeSpec):
field_dict["rangeSpec"] = field_dict["rangeSpec"].model_dump()
build_config[field_name] = field_dict
except Exception as exc:
logger.exception("Error while building field config")
if hasattr(exc, "detail") and "traceback" in exc.detail:
logger.error(exc.detail["traceback"])
raise
return build_config, custom_instance
except Exception as exc:
logger.exception("Error while building field config")
if hasattr(exc, "detail") and "traceback" in exc.detail:
logger.error(exc.detail["traceback"])
raise
msg = f"Invalid type conversion: {error}. Please check your code and try again."
logger.error(msg)
raise HTTPException(
status_code=400,
detail={"error": msg},
)
def add_code_field(frontend_node: CustomComponentFrontendNode, raw_code):
@ -386,13 +394,23 @@ def build_custom_component_template(
) -> tuple[dict[str, Any], CustomComponent | Component]:
"""Build a custom component template"""
try:
if not hasattr(custom_component, "template_config"):
raise HTTPException(
status_code=400,
detail={
"error": ("Please check if you are importing Component correctly."),
},
)
has_template_config = hasattr(custom_component, "template_config")
except Exception as exc:
raise HTTPException(
status_code=400,
detail={
"error": (f"Error building Component: {exc}"),
"traceback": traceback.format_exc(),
},
) from exc
if not has_template_config:
raise HTTPException(
status_code=400,
detail={
"error": ("Error building Component. Please check if you are importing Component correctly."),
},
)
try:
if "inputs" in custom_component.template_config:
return build_custom_component_template_from_inputs(custom_component, user_id=user_id)
frontend_node = CustomComponentFrontendNode(**custom_component.template_config)

View file

@ -752,11 +752,12 @@ class Graph:
try:
# Attempt to get the running event loop; if none, an exception is raised
loop = asyncio.get_running_loop()
if loop.is_closed():
msg = "The running event loop is closed."
raise RuntimeError(msg)
except RuntimeError:
# If there's no running event loop or it's closed, use asyncio.run
# If there's no running event loop, use asyncio.run
return asyncio.run(coro)
# If the event loop is closed, use asyncio.run
if loop.is_closed():
return asyncio.run(coro)
# If there's an existing, open event loop, use it to run the async function
@ -1031,7 +1032,6 @@ class Graph:
edges = payload["edges"]
graph = cls(flow_id=flow_id, flow_name=flow_name, user_id=user_id)
graph.add_nodes_and_edges(vertices, edges)
return graph
except KeyError as exc:
logger.exception(exc)
if "nodes" not in payload and "edges" not in payload:
@ -1040,6 +1040,8 @@ class Graph:
msg = f"Error while creating graph from payload: {exc}"
raise ValueError(msg) from exc
else:
return graph
def __eq__(self, other: object) -> bool:
if not isinstance(other, Graph):
@ -1399,23 +1401,24 @@ class Graph:
await set_cache(key=vertex.id, data=vertex_dict)
if vertex.result is not None:
params = f"{vertex._built_object_repr()}{params}"
valid = True
result_dict = vertex.result
artifacts = vertex.artifacts
else:
msg = f"No result found for vertex {vertex_id}"
raise ValueError(msg)
return VertexBuildResult(
result_dict=result_dict, params=params, valid=valid, artifacts=artifacts, vertex=vertex
)
except Exception as exc:
if not isinstance(exc, ComponentBuildException):
logger.exception("Error building Component")
raise
if vertex.result is not None:
params = f"{vertex._built_object_repr()}{params}"
valid = True
result_dict = vertex.result
artifacts = vertex.artifacts
else:
msg = f"Error building Component: no result found for vertex {vertex_id}"
raise ValueError(msg)
return VertexBuildResult(
result_dict=result_dict, params=params, valid=valid, artifacts=artifacts, vertex=vertex
)
def get_vertex_edges(
self,
vertex_id: str,

View file

@ -870,9 +870,10 @@ class Vertex:
# self._data is a dict and we need to compare them
# to check if they are equal
data_are_equal = self.data == __o.data
return ids_are_equal and data_are_equal
except AttributeError:
return False
else:
return ids_are_equal and data_are_equal
def __hash__(self) -> int:
return id(self)

View file

@ -123,12 +123,12 @@ def update_params_with_load_from_db_fields(
raise
logger.debug(str(e))
if fallback_to_env_vars and key is None:
var = os.getenv(params[field])
if var is None:
key = os.getenv(params[field])
if key is None:
msg = f"Environment variable {params[field]} is not set."
raise ValueError(msg)
key = var
logger.info(f"Using environment variable {params[field]} for {field}")
logger.error(msg)
else:
logger.info(f"Using environment variable {params[field]} for {field}")
if key is None:
logger.warning(f"Could not get value for {field}. Setting it to None.")

View file

@ -22,7 +22,7 @@ def upload(file_path: str, host: str, flow_id: str):
dict: A dictionary containing the file path.
Raises:
Exception: If an error occurs during the upload process.
UploadError: If an error occurs during the upload process.
"""
try:
url = f"{host}/api/v1/upload/{flow_id}"
@ -33,9 +33,9 @@ def upload(file_path: str, host: str, flow_id: str):
except Exception as e:
msg = f"Error uploading file: {e}"
raise UploadError(msg) from e
else:
msg = f"Error uploading file: {response.status_code}"
raise UploadError(msg)
msg = f"Error uploading file: {response.status_code}"
raise UploadError(msg)
def upload_file(file_path: str, host: str, flow_id: str, components: list[str], tweaks: dict | None = None):
@ -54,26 +54,27 @@ def upload_file(file_path: str, host: str, flow_id: str, components: list[str],
dict: A dictionary containing the file path and any tweaks that were applied.
Raises:
Exception: If an error occurs during the upload process.
UploadError: If an error occurs during the upload process.
"""
if not tweaks:
tweaks = {}
try:
response = upload(file_path, host, flow_id)
if response["file_path"]:
for component in components:
if isinstance(component, str):
tweaks[component] = {"path": response["file_path"]}
else:
msg = f"Component ID or name must be a string. Got {type(component)}"
raise TypeError(msg)
return tweaks
except Exception as e:
msg = f"Error uploading file: {e}"
raise UploadError(msg) from e
else:
msg = "Error uploading file"
raise UploadError(msg)
if not tweaks:
tweaks = {}
if response["file_path"]:
for component in components:
if isinstance(component, str):
tweaks[component] = {"path": response["file_path"]}
else:
msg = f"Error uploading file: component ID or name must be a string. Got {type(component)}"
raise UploadError(msg)
return tweaks
msg = "Error uploading file"
raise UploadError(msg)
def get_flow(url: str, flow_id: str):
@ -88,7 +89,7 @@ def get_flow(url: str, flow_id: str):
dict: A dictionary containing the details of the flow.
Raises:
Exception: If an error occurs during the retrieval process.
UploadError: If an error occurs during the retrieval process.
"""
try:
flow_url = f"{url}/api/v1/flows/{flow_id}"
@ -99,6 +100,6 @@ def get_flow(url: str, flow_id: str):
except Exception as e:
msg = f"Error retrieving flow: {e}"
raise UploadError(msg) from e
else:
msg = f"Error retrieving flow: {response.status_code}"
raise UploadError(msg)
msg = f"Error retrieving flow: {response.status_code}"
raise UploadError(msg)

View file

@ -57,15 +57,16 @@ def add_messages(messages: Message | list[Message], flow_id: str | None = None):
"""
Add a message to the monitor service.
"""
if not isinstance(messages, list):
messages = [messages]
if not all(isinstance(message, Message) for message in messages):
types = ", ".join([str(type(message)) for message in messages])
msg = f"The messages must be instances of Message. Found: {types}"
raise ValueError(msg)
try:
if not isinstance(messages, list):
messages = [messages]
if not all(isinstance(message, Message) for message in messages):
types = ", ".join([str(type(message)) for message in messages])
msg = f"The messages must be instances of Message. Found: {types}"
raise ValueError(msg)
messages_models = [MessageTable.from_message(msg, flow_id=flow_id) for msg in messages]
with session_scope() as session:
messages_models = add_messagetables(messages_models, session)

View file

@ -28,10 +28,11 @@ class dotdict(dict):
if isinstance(value, dict) and not isinstance(value, dotdict):
value = dotdict(value)
self[attr] = value # Update self to nest dotdict for future accesses
return value
except KeyError as e:
msg = f"'dotdict' object has no attribute '{attr}'"
raise AttributeError(msg) from e
else:
return value
def __setattr__(self, key, value):
"""

View file

@ -12,9 +12,9 @@ def is_image_file(file_path):
try:
with PILImage.open(file_path) as img:
img.verify() # Verify that it is, in fact, an image
return True
except (OSError, SyntaxError):
return False
return True
async def get_file_paths(files: list[str]):

View file

@ -34,10 +34,10 @@ def _timestamp_to_str(timestamp: datetime | str) -> str:
# Just check if the string is a valid datetime
try:
datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S") # noqa: DTZ007
return timestamp
except ValueError as e:
msg = f"Invalid timestamp: {timestamp}"
raise ValueError(msg) from e
return timestamp
return timestamp.strftime("%Y-%m-%d %H:%M:%S")

View file

@ -235,10 +235,10 @@ class RedisCache(AsyncBaseCacheService, Generic[LockType]):
try:
self._client.ping()
return True
except redis.exceptions.ConnectionError:
logger.exception("RedisCache could not connect to the Redis server")
return False
return True
async def get(self, key, lock=None):
"""

View file

@ -23,7 +23,7 @@ def log_transaction(db: Session, transaction: TransactionBase) -> TransactionTab
db.add(table)
try:
db.commit()
return table
except IntegrityError:
db.rollback()
raise
return table

View file

@ -23,10 +23,10 @@ def log_vertex_build(db: Session, vertex_build: VertexBuildBase) -> VertexBuildT
db.add(table)
try:
db.commit()
return table
except IntegrityError:
db.rollback()
raise
return table
def delete_vertex_builds_by_flow_id(db: Session, flow_id: UUID) -> None:

View file

@ -61,13 +61,14 @@ class S3StorageService(StorageService):
"""
try:
response = self.s3_client.list_objects_v2(Bucket=self.bucket, Prefix=folder)
files = [item["Key"] for item in response.get("Contents", []) if "/" not in item["Key"][len(folder) :]]
logger.info(f"{len(files)} files listed in folder {folder}.")
return files
except ClientError:
logger.exception(f"Error listing files in folder {folder}")
raise
files = [item["Key"] for item in response.get("Contents", []) if "/" not in item["Key"][len(folder) :]]
logger.info(f"{len(files)} files listed in folder {folder}.")
return files
async def delete_file(self, folder: str, file_name: str):
"""
Delete a file from the S3 bucket.

View file

@ -68,13 +68,14 @@ class AnyIOBackend(TaskBackend):
try:
task_result = AnyIOTaskResult(tg)
tg.start_soon(task_result.run, task_func, *args, **kwargs)
task_id = str(id(task_result))
self.tasks[task_id] = task_result
logger.info(f"Task {task_id} started.")
return task_id, task_result
except Exception: # noqa: BLE001
logger.exception("An error occurred while launching the task")
return None, None
task_id = str(id(task_result))
self.tasks[task_id] = task_result
logger.info(f"Task {task_id} started.")
return task_id, task_result
def get_task(self, task_id: str) -> Any:
return self.tasks.get(task_id)

View file

@ -164,10 +164,7 @@ def initialize_services(fix_migration: bool = False, socketio_server=None):
# Test cache connection
get_service(ServiceType.CACHE_SERVICE, default=CacheServiceFactory())
# Setup the superuser
try:
initialize_database(fix_migration=fix_migration)
except Exception:
raise
initialize_database(fix_migration=fix_migration)
setup_superuser(get_service(ServiceType.SETTINGS_SERVICE), next(get_session()))
try:
get_db_service().migrate_flows_if_auto_login()

View file

@ -36,14 +36,14 @@ def _get_version_info():
__version__ = metadata.version(pkg_name)
prerelease_version = __version__
version = _compute_non_prerelease_version(prerelease_version)
except (ImportError, metadata.PackageNotFoundError):
pass
else:
return {
"version": prerelease_version,
"main_version": version,
"package": display_name,
}
except (ImportError, metadata.PackageNotFoundError):
pass
if __version__ is None:
msg = f"Package not found from options {package_options}"

View file

@ -24,9 +24,9 @@ def build_vertex(self, vertex: Vertex) -> Vertex:
try:
vertex.task_id = self.request.id
async_to_sync(vertex.build)()
return vertex
except SoftTimeLimitExceeded as e:
raise self.retry(exc=SoftTimeLimitExceeded("Task took too long"), countdown=2) from e
return vertex
@celery_app.task(acks_late=True)

View file

@ -52,6 +52,7 @@ ignore = [
"RUF012", # Pydantic models are currently not well detected. See https://github.com/astral-sh/ruff/issues/13630
"TD002", # Missing author in TODO
"TD003", # Missing issue link in TODO
"TRY301", # A bit too harsh (Abstract `raise` to an inner function)
# Rules that are TODOs
"ANN",
@ -62,7 +63,6 @@ ignore = [
"N",
"S",
"SLF",
"TRY3",
]
[tool.ruff.lint.per-file-ignores]