diff --git a/src/backend/base/langflow/alembic/versions/631faacf5da2_add_webhook_columns.py b/src/backend/base/langflow/alembic/versions/631faacf5da2_add_webhook_columns.py new file mode 100644 index 000000000..379fba17c --- /dev/null +++ b/src/backend/base/langflow/alembic/versions/631faacf5da2_add_webhook_columns.py @@ -0,0 +1,45 @@ +"""Add webhook columns + +Revision ID: 631faacf5da2 +Revises: 1c79524817ed +Create Date: 2024-04-22 15:14:43.454784 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.engine.reflection import Inspector + +# revision identifiers, used by Alembic. +revision: str = "631faacf5da2" +down_revision: Union[str, None] = "1c79524817ed" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + conn = op.get_bind() + inspector = Inspector.from_engine(conn) # type: ignore + table_names = inspector.get_table_names() + # ### commands auto generated by Alembic - please adjust! ### + column_names = [column["name"] for column in inspector.get_columns("flow")] + with op.batch_alter_table("flow", schema=None) as batch_op: + if "flow" in table_names and "webhook" not in column_names: + batch_op.add_column(sa.Column("webhook", sa.Boolean(), nullable=True)) + + # ### end Alembic commands ### + + +def downgrade() -> None: + conn = op.get_bind() + inspector = Inspector.from_engine(conn) # type: ignore + table_names = inspector.get_table_names() + # ### commands auto generated by Alembic - please adjust! ### + column_names = [column["name"] for column in inspector.get_columns("flow")] + with op.batch_alter_table("flow", schema=None) as batch_op: + if "flow" in table_names and "webhook" in column_names: + batch_op.drop_column("webhook") + + # ### end Alembic commands ### diff --git a/src/backend/base/langflow/api/v1/endpoints.py b/src/backend/base/langflow/api/v1/endpoints.py index 26633082e..df55e21d4 100644 --- a/src/backend/base/langflow/api/v1/endpoints.py +++ b/src/backend/base/langflow/api/v1/endpoints.py @@ -3,7 +3,7 @@ from typing import TYPE_CHECKING, Annotated, List, Optional, Union from uuid import UUID import sqlalchemy as sa -from fastapi import APIRouter, Body, Depends, HTTPException, UploadFile, status +from fastapi import APIRouter, BackgroundTasks, Body, Depends, HTTPException, Request, UploadFile, status from loguru import logger from sqlmodel import Session, select @@ -22,11 +22,14 @@ from langflow.api.v1.schemas import ( from langflow.custom import CustomComponent from langflow.custom.utils import build_custom_component_template from langflow.graph.graph.base import Graph +from langflow.graph.schema import RunOutputs +from langflow.helpers.flow import get_flow_by_id_or_endpoint_name from langflow.processing.process import process_tweaks, run_graph_internal from langflow.schema.graph import Tweaks from langflow.services.auth.utils import api_key_security, get_current_active_user from langflow.services.cache.utils import save_uploaded_file from langflow.services.database.models.flow import Flow +from langflow.services.database.models.flow.utils import get_all_webhook_components_in_flow, get_flow_by_id from langflow.services.database.models.user.model import User from langflow.services.deps import get_session, get_session_service, get_settings_service, get_task_service from langflow.services.session.service import SessionService @@ -53,10 +56,75 @@ def get_all( raise HTTPException(status_code=500, detail=str(exc)) from exc +async def simple_run_flow( + db: Session, + flow: Flow, + input_request: SimplifiedAPIRequest, + session_service: SessionService, + stream: bool = False, + api_key_user: Optional[User] = None, +): + try: + task_result: List[RunOutputs] = [] + artifacts = {} + user_id = api_key_user.id if api_key_user else None + flow_id_str = str(flow.id) + if input_request.session_id: + session_data = await session_service.load_session(input_request.session_id, flow_id=flow_id_str) + graph, artifacts = session_data if session_data else (None, None) + if graph is None: + raise ValueError(f"Session {input_request.session_id} not found") + else: + if flow.data is None: + raise ValueError(f"Flow {flow_id_str} has no data") + graph_data = flow.data + graph_data = process_tweaks(graph_data, input_request.tweaks or {}) + graph = Graph.from_payload(graph_data, flow_id=flow_id_str, user_id=str(user_id)) + inputs = [ + InputValueRequest(components=[], input_value=input_request.input_value, type=input_request.input_type) + ] + # outputs is a list of all components that should return output + # we need to get them by checking their type + # if the output type is debug, we return all outputs + # if the output type is any, we return all outputs that are either chat or text + # if the output type is chat or text, we return only the outputs that match the type + if input_request.output_component: + outputs = [input_request.output_component] + else: + outputs = [ + vertex.id + for vertex in graph.vertices + if input_request.output_type == "debug" + or ( + vertex.is_output + and (input_request.output_type == "any" or input_request.output_type in vertex.id.lower()) + ) + ] + task_result, session_id = await run_graph_internal( + graph=graph, + flow_id=flow_id_str, + session_id=input_request.session_id, + inputs=inputs, + outputs=outputs, + artifacts=artifacts, + session_service=session_service, + stream=stream, + ) + + return RunResponse(outputs=task_result, session_id=session_id) + + except sa.exc.StatementError as exc: + # StatementError('(builtins.ValueError) badly formed hexadecimal UUID string') + if "badly formed hexadecimal UUID string" in str(exc): + logger.error(f"Flow ID {flow_id_str} is not a valid UUID") + # This means the Flow ID is not a valid UUID which means it can't find the flow + raise ValueError(str(exc)) from exc + + @router.post("/run/{flow_id_or_name}", response_model=RunResponse, response_model_exclude_none=True) async def simplified_run_flow( db: Annotated[Session, Depends(get_session)], - flow_id_or_name: str, + flow: Annotated[Flow, Depends(get_flow_by_id_or_endpoint_name)], input_request: SimplifiedAPIRequest = SimplifiedAPIRequest(), stream: bool = False, api_key_user: User = Depends(api_key_security), @@ -67,7 +135,7 @@ async def simplified_run_flow( ### Parameters: - `db` (Session): Database session for executing queries. - - `flow_id` (str): Unique identifier of the flow to be executed. + - `flow_id_or_name` (str): ID or endpoint name of the flow to run. - `input_request` (SimplifiedAPIRequest): Request object containing input values, types, output selection, tweaks, and session ID. - `api_key_user` (User): User object derived from the provided API key, used for authentication. - `session_service` (SessionService): Service for managing flow sessions, essential for session reuse and caching. @@ -110,89 +178,21 @@ async def simplified_run_flow( This endpoint provides a powerful interface for executing flows with enhanced flexibility and efficiency, supporting a wide range of applications by allowing for dynamic input and output configuration along with performance optimizations through session management and caching. """ - session_id = input_request.session_id - endpoint_name = None - flow_id_str = None try: - try: - flow_id = UUID(flow_id_or_name) - - except ValueError: - endpoint_name = flow_id_or_name - flow = db.exec( - select(Flow).where(Flow.endpoint_name == endpoint_name).where(Flow.user_id == api_key_user.id) - ).first() - if flow is None: - raise ValueError(f"Flow with endpoint name {endpoint_name} not found") - flow_id = flow.id - - flow_id_str = str(flow_id) - artifacts = {} - if input_request.session_id: - session_data = await session_service.load_session(input_request.session_id, flow_id=flow_id_str) - graph, artifacts = session_data if session_data else (None, None) - if graph is None: - raise ValueError(f"Session {input_request.session_id} not found") - else: - # Get the flow that matches the flow_id and belongs to the user - # flow = session.query(Flow).filter(Flow.id == flow_id).filter(Flow.user_id == api_key_user.id).first() - flow = db.exec(select(Flow).where(Flow.id == flow_id_str).where(Flow.user_id == api_key_user.id)).first() - if flow is None: - raise ValueError(f"Flow {flow_id_str} not found") - - if flow.data is None: - raise ValueError(f"Flow {flow_id_str} has no data") - graph_data = flow.data - - graph_data = process_tweaks(graph_data, input_request.tweaks or {}, stream=stream) - graph = Graph.from_payload(graph_data, flow_id=flow_id_str, user_id=str(api_key_user.id)) - inputs = [ - InputValueRequest(components=[], input_value=input_request.input_value, type=input_request.input_type) - ] - # outputs is a list of all components that should return output - # we need to get them by checking their type - # if the output type is debug, we return all outputs - # if the output type is any, we return all outputs that are either chat or text - # if the output type is chat or text, we return only the outputs that match the type - if input_request.output_component: - outputs = [input_request.output_component] - else: - outputs = [ - vertex.id - for vertex in graph.vertices - if input_request.output_type == "debug" - or ( - vertex.is_output - and (input_request.output_type == "any" or input_request.output_type in vertex.id.lower()) - ) - ] - task_result, session_id = await run_graph_internal( - graph=graph, - flow_id=flow_id_str, - session_id=input_request.session_id, - inputs=inputs, - outputs=outputs, - artifacts=artifacts, + return await simple_run_flow( + db=db, + flow=flow, + input_request=input_request, session_service=session_service, stream=stream, + api_key_user=api_key_user, ) - return RunResponse(outputs=task_result, session_id=session_id) - except sa.exc.StatementError as exc: - # StatementError('(builtins.ValueError) badly formed hexadecimal UUID string') - if "badly formed hexadecimal UUID string" in str(exc): - logger.error(f"Flow ID {flow_id_str} is not a valid UUID") - # This means the Flow ID is not a valid UUID which means it can't find the flow - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc except ValueError as exc: - if flow_id_str and f"Flow {flow_id_str} not found" in str(exc): - logger.error(f"Flow {flow_id_str} not found") - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc - elif endpoint_name and f"Flow with endpoint name {endpoint_name} not found" in str(exc): - logger.error(f"Flow with endpoint name {endpoint_name} not found") - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc - elif session_id and f"Session {session_id} not found" in str(exc): - logger.error(f"Session {session_id} not found") + if "badly formed hexadecimal UUID string" in str(exc): + # This means the Flow ID is not a valid UUID which means it can't find the flow + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc + if "not found" in str(exc): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc else: logger.exception(exc) @@ -202,6 +202,68 @@ async def simplified_run_flow( raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) from exc +@router.post("/webhook/{flow_id}", response_model=dict, status_code=HTTPStatus.ACCEPTED) +async def webhook_run_flow( + db: Annotated[Session, Depends(get_session)], + flow: Annotated[Flow, Depends(get_flow_by_id)], + request: Request, + background_tasks: BackgroundTasks, + session_service: SessionService = Depends(get_session_service), +): + """ + Run a flow using a webhook request. + + Args: + db (Session): The database session. + request (Request): The incoming HTTP request. + background_tasks (BackgroundTasks): The background tasks manager. + session_service (SessionService, optional): The session service. Defaults to Depends(get_session_service). + flow (Flow, optional): The flow to be executed. Defaults to Depends(get_flow_by_id). + + Returns: + dict: A dictionary containing the status of the task. + + Raises: + HTTPException: If the flow is not found or if there is an error processing the request. + """ + try: + logger.debug("Received webhook request") + data = await request.body() + if not data: + logger.error("Request body is empty") + raise ValueError( + "Request body is empty. You should provide a JSON payload containing the flow ID.", + ) + + # get all webhook components in the flow + webhook_components = get_all_webhook_components_in_flow(flow.data) + tweaks = {} + data_dict = await request.json() + for component in webhook_components: + tweaks[component["id"]] = {"data": data.decode() if isinstance(data, bytes) else data} + input_request = SimplifiedAPIRequest( + input_value=data_dict.get("input_value", ""), + input_type=data_dict.get("input_type", "chat"), + output_type=data_dict.get("output_type", "chat"), + tweaks=tweaks, + session_id=data_dict.get("session_id"), + ) + logger.debug("Starting background task") + background_tasks.add_task( + simple_run_flow, + db=db, + flow=flow, + input_request=input_request, + session_service=session_service, + ) + return {"message": "Task started in the background", "status": "in progress"} + except Exception as exc: + if "Flow ID is required" in str(exc) or "Request body is empty" in str(exc): + raise HTTPException(status_code=400, detail=str(exc)) from exc + logger.exception(exc) + raise HTTPException(status_code=500, detail=str(exc)) from exc + + @router.post("/run/advanced/{flow_id}", response_model=RunResponse, response_model_exclude_none=True) async def experimental_run_flow( session: Annotated[Session, Depends(get_session)], diff --git a/src/backend/base/langflow/api/v1/flows.py b/src/backend/base/langflow/api/v1/flows.py index 44b1fa275..36030a12d 100644 --- a/src/backend/base/langflow/api/v1/flows.py +++ b/src/backend/base/langflow/api/v1/flows.py @@ -13,6 +13,7 @@ from langflow.api.v1.schemas import FlowListCreate, FlowListIds, FlowListRead from langflow.initial_setup.setup import STARTER_FOLDER_NAME from langflow.services.auth.utils import get_current_active_user from langflow.services.database.models.flow import Flow, FlowCreate, FlowRead, FlowUpdate +from langflow.services.database.models.flow.utils import get_webhook_component_in_flow from langflow.services.database.models.folder.constants import DEFAULT_FOLDER_NAME from langflow.services.database.models.folder.model import Folder from langflow.services.database.models.user.model import User @@ -150,6 +151,8 @@ def update_flow( for key, value in flow_data.items(): if value is not None: setattr(db_flow, key, value) + webhook_component = get_webhook_component_in_flow(db_flow.data) + db_flow.webhook = webhook_component is not None db_flow.updated_at = datetime.now(timezone.utc) if db_flow.folder_id is None: default_folder = session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME)).first() diff --git a/src/backend/base/langflow/api/v1/folders.py b/src/backend/base/langflow/api/v1/folders.py index 3aa57842c..7402881c7 100644 --- a/src/backend/base/langflow/api/v1/folders.py +++ b/src/backend/base/langflow/api/v1/folders.py @@ -1,5 +1,4 @@ from typing import List -from uuid import UUID import orjson from fastapi import APIRouter, Depends, File, HTTPException, Response, UploadFile, status @@ -88,7 +87,7 @@ def read_folders( def read_folder( *, session: Session = Depends(get_session), - folder_id: UUID, + folder_id: str, current_user: User = Depends(get_current_active_user), ): try: @@ -106,7 +105,7 @@ def read_folder( def update_folder( *, session: Session = Depends(get_session), - folder_id: UUID, + folder_id: str, folder: FolderUpdate, # Assuming FolderUpdate is a Pydantic model defining updatable fields current_user: User = Depends(get_current_active_user), ): @@ -155,7 +154,7 @@ def update_folder( def delete_folder( *, session: Session = Depends(get_session), - folder_id: UUID, + folder_id: str, current_user: User = Depends(get_current_active_user), ): try: @@ -177,7 +176,7 @@ def delete_folder( async def download_file( *, session: Session = Depends(get_session), - folder_id: UUID, + folder_id: str, current_user: User = Depends(get_current_active_user), ): """Download all flows from folder.""" diff --git a/src/backend/base/langflow/components/data/Webhook.py b/src/backend/base/langflow/components/data/Webhook.py new file mode 100644 index 000000000..cf82e07d2 --- /dev/null +++ b/src/backend/base/langflow/components/data/Webhook.py @@ -0,0 +1,39 @@ +import json +import uuid +from typing import Any, Optional + +from langflow.custom import CustomComponent +from langflow.schema.dotdict import dotdict +from langflow.schema.schema import Record + + +class WebhookComponent(CustomComponent): + display_name = "Webhook Input" + description = "Defines a webhook input for the flow." + + def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None): + if field_name == "webhook_id": + build_config["webhook_id"]["value"] = uuid.uuid4().hex + return build_config + + def build_config(self): + return { + "data": { + "display_name": "Data", + "info": "Use this field to quickly test the webhook component by providing a JSON payload.", + "multiline": True, + } + } + + def build(self, data: Optional[str] = "") -> Record: + message = "" + try: + body = json.loads(data or "{}") + except json.JSONDecodeError: + body = {"payload": data} + message = f"Invalid JSON payload. Please check the format.\n\n{data}" + record = Record(data=body) + if not message: + message = json.dumps(body, indent=2) + self.status = message + return record diff --git a/src/backend/base/langflow/components/data/__init__.py b/src/backend/base/langflow/components/data/__init__.py index ca82e3eb8..c57cf8656 100644 --- a/src/backend/base/langflow/components/data/__init__.py +++ b/src/backend/base/langflow/components/data/__init__.py @@ -1,7 +1,8 @@ from .APIRequest import APIRequest from .Directory import DirectoryComponent from .File import FileComponent +from .Webhook import WebhookComponent from .URL import URLComponent -__all__ = ["APIRequest", "DirectoryComponent", "FileComponent", "URLComponent"] +__all__ = ["APIRequest", "DirectoryComponent", "FileComponent", "URLComponent", "WebhookComponent"] diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index 956fda7bf..f4e71bd7c 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -20,6 +20,7 @@ from langflow.schema.schema import INPUT_FIELD_NAME, InputType from langflow.services.cache.utils import CacheMiss from langflow.services.chat.service import ChatService from langflow.services.deps import get_chat_service +from langflow.services.monitor.utils import log_transaction if TYPE_CHECKING: from langflow.graph.schema import ResultData @@ -763,9 +764,11 @@ class Graph: next_runnable_vertices, top_level_vertices = await self.get_next_and_top_level_vertices( lock, set_cache_coro, vertex ) + log_transaction(vertex, status="success") return next_runnable_vertices, top_level_vertices, result_dict, params, valid, artifacts, vertex except Exception as exc: logger.exception(f"Error building vertex: {exc}") + log_transaction(vertex, status="failure", error=str(exc)) raise exc async def get_next_and_top_level_vertices( diff --git a/src/backend/base/langflow/graph/vertex/base.py b/src/backend/base/langflow/graph/vertex/base.py index 561a91848..963ae2115 100644 --- a/src/backend/base/langflow/graph/vertex/base.py +++ b/src/backend/base/langflow/graph/vertex/base.py @@ -696,7 +696,8 @@ class Vertex: self._finalize_build() - return await self.get_requester_result(requester) + result = await self.get_requester_result(requester) + return result async def get_requester_result(self, requester: Optional["Vertex"]): # If the requester is None, this means that diff --git a/src/backend/base/langflow/helpers/flow.py b/src/backend/base/langflow/helpers/flow.py index a20462f3d..0f2a1e170 100644 --- a/src/backend/base/langflow/helpers/flow.py +++ b/src/backend/base/langflow/helpers/flow.py @@ -1,13 +1,14 @@ from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Optional, Tuple, Type, Union, cast from uuid import UUID +from fastapi import Depends, HTTPException from pydantic.v1 import BaseModel, Field, create_model -from sqlmodel import select +from sqlmodel import Session, select from langflow.graph.schema import RunOutputs from langflow.schema.schema import INPUT_FIELD_NAME, Record -from langflow.services.database.models.flow.model import Flow -from langflow.services.deps import session_scope +from langflow.services.database.models.flow import Flow +from langflow.services.deps import get_session, session_scope if TYPE_CHECKING: from langflow.graph.graph.base import Graph @@ -235,3 +236,22 @@ def get_arg_names(inputs: List["Vertex"]) -> List[dict[str, str]]: {"component_name": input_.display_name, "arg_name": input_.display_name.lower().replace(" ", "_")} for input_ in inputs ] + + +def get_flow_by_id_or_endpoint_name( + flow_id_or_name: str, db: Session = Depends(get_session), user_id: Optional[UUID] = None +) -> Flow: + endpoint_name = None + try: + flow_id = UUID(flow_id_or_name) + flow = db.get(Flow, flow_id) + except ValueError: + endpoint_name = flow_id_or_name + stmt = select(Flow).where(Flow.name == endpoint_name) + if user_id: + stmt = stmt.where(Flow.user_id == user_id) + flow = db.exec(stmt).first() + if flow is None: + raise HTTPException(status_code=404, detail=f"Flow identifier {flow_id_or_name} not found") + + return flow diff --git a/src/backend/base/langflow/main.py b/src/backend/base/langflow/main.py index 93d0e7f04..c81c014e2 100644 --- a/src/backend/base/langflow/main.py +++ b/src/backend/base/langflow/main.py @@ -14,7 +14,11 @@ from rich import print as rprint from starlette.middleware.base import BaseHTTPMiddleware from langflow.api import router -from langflow.initial_setup.setup import create_or_update_starter_projects, initialize_super_user_if_needed, load_flows_from_directory +from langflow.initial_setup.setup import ( + create_or_update_starter_projects, + initialize_super_user_if_needed, + load_flows_from_directory, +) from langflow.interface.utils import setup_llm_caching from langflow.services.plugins.langfuse_plugin import LangfuseInstance from langflow.services.utils import initialize_services, teardown_services @@ -33,20 +37,13 @@ class JavaScriptMIMETypeMiddleware(BaseHTTPMiddleware): return response -def get_lifespan(fix_migration=False, socketio_server=None): - try: - from langflow.version import __version__ # type: ignore - except ImportError: - from importlib.metadata import version - - __version__ = version("langflow-base") - +def get_lifespan(fix_migration=False, socketio_server=None, version=None): @asynccontextmanager async def lifespan(app: FastAPI): nest_asyncio.apply() # Startup message - if __version__: - rprint(f"[bold green]Starting Langflow v{__version__}...[/bold green]") + if version: + rprint(f"[bold green]Starting Langflow v{version}...[/bold green]") else: rprint("[bold green]Starting Langflow...[/bold green]") try: @@ -70,11 +67,17 @@ def get_lifespan(fix_migration=False, socketio_server=None): def create_app(): """Create the FastAPI app and include the router.""" + try: + from langflow.version import __version__ # type: ignore + except ImportError: + from importlib.metadata import version + + __version__ = version("langflow-base") configure() socketio_server = socketio.AsyncServer(async_mode="asgi", cors_allowed_origins="*", logger=True) - lifespan = get_lifespan(socketio_server=socketio_server) - app = FastAPI(lifespan=lifespan) + lifespan = get_lifespan(socketio_server=socketio_server, version=__version__) + app = FastAPI(lifespan=lifespan, title="Langflow", version=__version__) origins = ["*"] app.add_middleware( diff --git a/src/backend/base/langflow/services/database/models/flow/model.py b/src/backend/base/langflow/services/database/models/flow/model.py index ff6e19cb7..4de1e0bc8 100644 --- a/src/backend/base/langflow/services/database/models/flow/model.py +++ b/src/backend/base/langflow/services/database/models/flow/model.py @@ -28,6 +28,7 @@ class FlowBase(SQLModel): data: Optional[Dict] = Field(default=None, nullable=True) is_component: Optional[bool] = Field(default=False, nullable=True) updated_at: Optional[datetime] = Field(default_factory=lambda: datetime.now(timezone.utc), nullable=True) + webhook: Optional[bool] = Field(default=False, nullable=True, description="Can be used on the webhook endpoint") folder_id: Optional[UUID] = Field(default=None, nullable=True) endpoint_name: Optional[str] = Field(default=None, nullable=True, index=True) diff --git a/src/backend/base/langflow/services/database/models/flow/utils.py b/src/backend/base/langflow/services/database/models/flow/utils.py new file mode 100644 index 000000000..b8ea9d658 --- /dev/null +++ b/src/backend/base/langflow/services/database/models/flow/utils.py @@ -0,0 +1,33 @@ +from typing import Optional + +from fastapi import Depends +from sqlmodel import Session + +from langflow.services.deps import get_session + +from .model import Flow + + +def get_flow_by_id(session: Session = Depends(get_session), flow_id: Optional[str] = None) -> Flow | None: + """Get flow by id.""" + + if flow_id is None: + raise ValueError("Flow id is required.") + + return session.get(Flow, flow_id) + + +def get_webhook_component_in_flow(flow_data: dict): + """Get webhook component in flow data.""" + + for node in flow_data.get("nodes", []): + if "Webhook" in node.get("id"): + return node + return None + + +def get_all_webhook_components_in_flow(flow_data: dict | None): + """Get all webhook components in flow data.""" + if not flow_data: + return [] + return [node for node in flow_data.get("nodes", []) if "Webhook" in node.get("id")] diff --git a/src/backend/base/langflow/services/monitor/utils.py b/src/backend/base/langflow/services/monitor/utils.py index aec5ae0c6..f603b3fde 100644 --- a/src/backend/base/langflow/services/monitor/utils.py +++ b/src/backend/base/langflow/services/monitor/utils.py @@ -8,6 +8,7 @@ from langflow.services.deps import get_monitor_service if TYPE_CHECKING: from langflow.api.v1.schemas import ResultDataResponse + from langflow.graph.vertex.base import Vertex INDEX_KEY = "index" @@ -165,3 +166,35 @@ async def log_vertex_build( monitor_service.add_row(table_name="vertex_builds", data=row) except Exception as e: logger.exception(f"Error logging vertex build: {e}") + + +def build_clean_params(target: "Vertex") -> dict: + """ + Cleans the parameters of the target vertex. + """ + # Removes all keys that the values aren't python types like str, int, bool, etc. + params = { + key: value for key, value in target.params.items() if isinstance(value, (str, int, bool, float, list, dict)) + } + # if it is a list we need to check if the contents are python types + for key, value in params.items(): + if isinstance(value, list): + params[key] = [item for item in value if isinstance(item, (str, int, bool, float, list, dict))] + return params + + +def log_transaction(vertex: "Vertex", status, error=None): + try: + monitor_service = get_monitor_service() + clean_params = build_clean_params(vertex) + data = { + "vertex_id": vertex.id, + "inputs": clean_params, + "output": str(vertex.result), + "timestamp": monitor_service.get_timestamp(), + "status": status, + "error": error, + } + monitor_service.add_row(table_name="transactions", data=data) + except Exception as e: + logger.error(f"Error logging transaction: {e}") diff --git a/src/frontend/src/components/accordionComponent/composite/folderAccordionComponent/index.tsx b/src/frontend/src/components/accordionComponent/composite/folderAccordionComponent/index.tsx index d4fb95b5c..212a03fa2 100644 --- a/src/frontend/src/components/accordionComponent/composite/folderAccordionComponent/index.tsx +++ b/src/frontend/src/components/accordionComponent/composite/folderAccordionComponent/index.tsx @@ -15,7 +15,7 @@ export default function FolderAccordionComponent({ options, }: AccordionComponentType): JSX.Element { const [value, setValue] = useState( - open.length === 0 ? "" : getOpenAccordion(), + open.length === 0 ? "" : getOpenAccordion() ); function getOpenAccordion(): string { diff --git a/src/frontend/src/components/cardComponent/components/dragCardComponent/index.tsx b/src/frontend/src/components/cardComponent/components/dragCardComponent/index.tsx index e4425c61c..54dbf4846 100644 --- a/src/frontend/src/components/cardComponent/components/dragCardComponent/index.tsx +++ b/src/frontend/src/components/cardComponent/components/dragCardComponent/index.tsx @@ -1,7 +1,6 @@ import { storeComponent } from "../../../../types/store"; import { cn } from "../../../../utils/utils"; import ForwardedIconComponent from "../../../genericIconComponent"; -import ShadTooltip from "../../../shadTooltipComponent"; import { Card, CardHeader, CardTitle } from "../../../ui/card"; export default function DragCardComponent({ data }: { data: storeComponent }) { @@ -11,7 +10,7 @@ export default function DragCardComponent({ data }: { data: storeComponent }) { draggable //TODO check color schema className={cn( - "group relative flex flex-col justify-between overflow-hidden transition-all hover:bg-muted/50 hover:shadow-md hover:dark:bg-[#ffffff10]", + "group relative flex flex-col justify-between overflow-hidden transition-all hover:bg-muted/50 hover:shadow-md hover:dark:bg-[#ffffff10]" )} >
@@ -23,7 +22,7 @@ export default function DragCardComponent({ data }: { data: storeComponent }) { "visible flex-shrink-0", data.is_component ? "mx-0.5 h-6 w-6 text-component-icon" - : "h-7 w-7 flex-shrink-0 text-flow-icon", + : "h-7 w-7 flex-shrink-0 text-flow-icon" )} name={data.is_component ? "ToyBrick" : "Group"} /> diff --git a/src/frontend/src/components/codeAreaComponent/index.tsx b/src/frontend/src/components/codeAreaComponent/index.tsx index cd8a0f77d..dd68e774a 100644 --- a/src/frontend/src/components/codeAreaComponent/index.tsx +++ b/src/frontend/src/components/codeAreaComponent/index.tsx @@ -18,7 +18,7 @@ export default function CodeAreaComponent({ setOpen, }: CodeAreaComponentType) { const [myValue, setMyValue] = useState( - typeof value == "string" ? value : JSON.stringify(value), + typeof value == "string" ? value : JSON.stringify(value) ); useEffect(() => { if (disabled && myValue !== "") { diff --git a/src/frontend/src/components/dropdownComponent/index.tsx b/src/frontend/src/components/dropdownComponent/index.tsx index 8402d166e..bff138681 100644 --- a/src/frontend/src/components/dropdownComponent/index.tsx +++ b/src/frontend/src/components/dropdownComponent/index.tsx @@ -59,7 +59,7 @@ export default function Dropdown({ ? "dropdown-component-outline" : "dropdown-component-false-outline", "w-full justify-between font-normal", - editNode ? "input-edit-node" : "py-2", + editNode ? "input-edit-node" : "py-2" )} > @@ -107,7 +107,7 @@ export default function Dropdown({ name="Check" className={cn( "ml-auto h-4 w-4 text-primary", - value === option ? "opacity-100" : "opacity-0", + value === option ? "opacity-100" : "opacity-0" )} /> diff --git a/src/frontend/src/components/genericIconComponent/index.tsx b/src/frontend/src/components/genericIconComponent/index.tsx index 9f7c687a1..398ae6e7f 100644 --- a/src/frontend/src/components/genericIconComponent/index.tsx +++ b/src/frontend/src/components/genericIconComponent/index.tsx @@ -18,7 +18,7 @@ export const ForwardedIconComponent = memo( strokeWidth, id = "", }: IconComponentProps, - ref, + ref ) => { const [showFallback, setShowFallback] = useState(false); @@ -65,8 +65,8 @@ export const ForwardedIconComponent = memo( /> ); - }, - ), + } + ) ); export default ForwardedIconComponent; diff --git a/src/frontend/src/components/headerComponent/components/menuBar/index.tsx b/src/frontend/src/components/headerComponent/components/menuBar/index.tsx index 41f7b6d5b..7f7130090 100644 --- a/src/frontend/src/components/headerComponent/components/menuBar/index.tsx +++ b/src/frontend/src/components/headerComponent/components/menuBar/index.tsx @@ -132,7 +132,7 @@ export const MenuBar = ({}: {}): JSX.Element => { title: UPLOAD_ERROR_ALERT, list: [error], }); - }, + } ); }} > @@ -214,7 +214,7 @@ export const MenuBar = ({}: {}): JSX.Element => { name={isBuilding || saveLoading ? "Loader2" : "CheckCircle2"} className={cn( "h-4 w-4", - isBuilding || saveLoading ? "animate-spin" : "animate-wiggle", + isBuilding || saveLoading ? "animate-spin" : "animate-wiggle" )} /> {printByBuildStatus()} diff --git a/src/frontend/src/components/horizontalScrollFadeComponent/index.tsx b/src/frontend/src/components/horizontalScrollFadeComponent/index.tsx index e0bf48917..7e0c788a6 100644 --- a/src/frontend/src/components/horizontalScrollFadeComponent/index.tsx +++ b/src/frontend/src/components/horizontalScrollFadeComponent/index.tsx @@ -32,11 +32,11 @@ export default function HorizontalScrollFadeComponent({ fadeContainerRef.current.classList.toggle( "fade-left", - isScrollable && !atStart, + isScrollable && !atStart ); fadeContainerRef.current.classList.toggle( "fade-right", - isScrollable && !atEnd, + isScrollable && !atEnd ); }; diff --git a/src/frontend/src/components/inputFileComponent/index.tsx b/src/frontend/src/components/inputFileComponent/index.tsx index 15a79ec1d..bcea7fb79 100644 --- a/src/frontend/src/components/inputFileComponent/index.tsx +++ b/src/frontend/src/components/inputFileComponent/index.tsx @@ -104,8 +104,8 @@ export default function InputFileComponent({ editNode ? "input-edit-node input-dialog text-muted-foreground" : disabled - ? "input-disable input-dialog primary-input" - : "input-dialog primary-input text-muted-foreground" + ? "input-disable input-dialog primary-input" + : "input-dialog primary-input text-muted-foreground" } > {myValue !== "" ? myValue : "No file"} diff --git a/src/frontend/src/components/inputGlobalComponent/index.tsx b/src/frontend/src/components/inputGlobalComponent/index.tsx index e9edd2e50..318c57518 100644 --- a/src/frontend/src/components/inputGlobalComponent/index.tsx +++ b/src/frontend/src/components/inputGlobalComponent/index.tsx @@ -19,15 +19,15 @@ export default function InputGlobalComponent({ editNode = false, }: InputGlobalComponentType): JSX.Element { const globalVariablesEntries = useGlobalVariablesStore( - (state) => state.globalVariablesEntries, + (state) => state.globalVariablesEntries ); const getVariableId = useGlobalVariablesStore((state) => state.getVariableId); const unavaliableFields = useGlobalVariablesStore( - (state) => state.unavaliableFields, + (state) => state.unavaliableFields ); const removeGlobalVariable = useGlobalVariablesStore( - (state) => state.removeGlobalVariable, + (state) => state.removeGlobalVariable ); const setErrorData = useAlertStore((state) => state.setErrorData); @@ -130,7 +130,7 @@ export default function InputGlobalComponent({