From 9ac861da2fda36b26e2593953685ce87be09affe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Fri, 26 Jul 2024 16:57:38 +0200 Subject: [PATCH] feat: migrate transactions to sql database (#2915) * feat: migrate transactions to sql database * feat: migrate transactions to sql database * feat: migrate transactions to sql database * feat: migrate transactions to sql database * feat: migrate transactions to sql database * feat: migrate transactions to sql database * [autofix.ci] apply automated fixes * remove useless * remove useless --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- .../90be8e2ed91e_create_transactions_table.py | 51 ++++++++++ src/backend/base/langflow/api/v1/flows.py | 4 +- src/backend/base/langflow/api/v1/monitor.py | 46 ++++----- src/backend/base/langflow/graph/graph/base.py | 5 - src/backend/base/langflow/graph/utils.py | 49 ++++++++- .../base/langflow/graph/vertex/base.py | 7 +- .../base/langflow/graph/vertex/types.py | 13 ++- src/backend/base/langflow/memory.py | 3 - .../services/database/models/__init__.py | 3 +- .../services/database/models/flow/model.py | 2 + .../services/database/models/flow/utils.py | 12 +++ .../database/models/transactions/__init__.py | 3 + .../database/models/transactions/crud.py | 30 ++++++ .../database/models/transactions/model.py | 44 +++++++++ .../langflow/services/database/service.py | 18 +++- .../base/langflow/services/database/utils.py | 32 +++++- src/backend/base/langflow/services/manager.py | 8 +- .../base/langflow/services/monitor/service.py | 69 +++++-------- .../base/langflow/services/monitor/utils.py | 99 +++++++------------ .../base/langflow/utils/concurrency.py | 63 ++++++++++++ src/backend/base/poetry.lock | 18 +++- src/backend/base/pyproject.toml | 1 + src/backend/tests/unit/test_database.py | 52 +++++++++- src/frontend/src/controllers/API/index.ts | 15 --- 24 files changed, 467 insertions(+), 180 deletions(-) create mode 100644 src/backend/base/langflow/alembic/versions/90be8e2ed91e_create_transactions_table.py create mode 100644 src/backend/base/langflow/services/database/models/transactions/__init__.py create mode 100644 src/backend/base/langflow/services/database/models/transactions/crud.py create mode 100644 src/backend/base/langflow/services/database/models/transactions/model.py create mode 100644 src/backend/base/langflow/utils/concurrency.py diff --git a/src/backend/base/langflow/alembic/versions/90be8e2ed91e_create_transactions_table.py b/src/backend/base/langflow/alembic/versions/90be8e2ed91e_create_transactions_table.py new file mode 100644 index 000000000..c7eff439b --- /dev/null +++ b/src/backend/base/langflow/alembic/versions/90be8e2ed91e_create_transactions_table.py @@ -0,0 +1,51 @@ +"""create transactions table + +Revision ID: 90be8e2ed91e +Revises: 325180f0c4e1 +Create Date: 2024-07-24 11:37:48.532933 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +import sqlmodel +from langflow.utils import migration + + +# revision identifiers, used by Alembic. +revision: str = "90be8e2ed91e" +down_revision: Union[str, None] = "325180f0c4e1" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + conn = op.get_bind() + if not migration.table_exists("transaction", conn): + op.create_table( + "transaction", + sa.Column("timestamp", sa.DateTime(), nullable=False), + sa.Column("vertex_id", sqlmodel.sql.sqltypes.AutoString(), nullable=False), + sa.Column("target_id", sqlmodel.sql.sqltypes.AutoString(), nullable=True), + sa.Column("inputs", sa.JSON(), nullable=True), + sa.Column("outputs", sa.JSON(), nullable=True), + sa.Column("status", sqlmodel.sql.sqltypes.AutoString(), nullable=False), + sa.Column("id", sqlmodel.sql.sqltypes.GUID(), nullable=False), + sa.Column("flow_id", sqlmodel.sql.sqltypes.GUID(), nullable=False), + sa.Column("error", sqlmodel.sql.sqltypes.AutoString(), nullable=True), + sa.ForeignKeyConstraint( + ["flow_id"], + ["flow.id"], + ), + sa.PrimaryKeyConstraint("id"), + ) + pass + + +def downgrade() -> None: + conn = op.get_bind() + if migration.table_exists("transaction", conn): + op.drop_table("transaction") + pass diff --git a/src/backend/base/langflow/api/v1/flows.py b/src/backend/base/langflow/api/v1/flows.py index d6f18a2c5..ed5e5c091 100644 --- a/src/backend/base/langflow/api/v1/flows.py +++ b/src/backend/base/langflow/api/v1/flows.py @@ -18,7 +18,7 @@ from langflow.api.v1.schemas import FlowListCreate, FlowListRead from langflow.initial_setup.setup import STARTER_FOLDER_NAME from langflow.services.auth.utils import get_current_active_user from langflow.services.database.models.flow import Flow, FlowCreate, FlowRead, FlowUpdate -from langflow.services.database.models.flow.utils import get_webhook_component_in_flow +from langflow.services.database.models.flow.utils import get_webhook_component_in_flow, delete_flow_by_id from langflow.services.database.models.folder.constants import DEFAULT_FOLDER_NAME from langflow.services.database.models.folder.model import Folder from langflow.services.database.models.user.model import User @@ -266,7 +266,7 @@ def delete_flow( ) if not flow: raise HTTPException(status_code=404, detail="Flow not found") - session.delete(flow) + delete_flow_by_id(str(flow_id), session) session.commit() return {"message": "Flow deleted successfully"} diff --git a/src/backend/base/langflow/api/v1/monitor.py b/src/backend/base/langflow/api/v1/monitor.py index 224a26912..dd765f3d1 100644 --- a/src/backend/base/langflow/api/v1/monitor.py +++ b/src/backend/base/langflow/api/v1/monitor.py @@ -7,9 +7,11 @@ from sqlmodel import Session, col, select from langflow.services.auth.utils import get_current_active_user from langflow.services.database.models.message.model import MessageRead, MessageTable, MessageUpdate +from langflow.services.database.models.transactions.crud import get_transactions_by_flow_id +from langflow.services.database.models.transactions.model import TransactionReadResponse from langflow.services.database.models.user.model import User from langflow.services.deps import get_monitor_service, get_session -from langflow.services.monitor.schema import MessageModelResponse, TransactionModelResponse, VertexBuildMapModel +from langflow.services.monitor.schema import MessageModelResponse, VertexBuildMapModel from langflow.services.monitor.service import MonitorService router = APIRouter(prefix="/monitor", tags=["Monitor"]) @@ -126,34 +128,26 @@ async def delete_messages_session( raise HTTPException(status_code=500, detail=str(e)) -@router.get("/transactions", response_model=List[TransactionModelResponse]) +@router.get("/transactions", response_model=List[TransactionReadResponse]) async def get_transactions( - source: Optional[str] = Query(None), - target: Optional[str] = Query(None), - status: Optional[str] = Query(None), - order_by: Optional[str] = Query("timestamp"), - flow_id: Optional[str] = Query(None), - monitor_service: MonitorService = Depends(get_monitor_service), + flow_id: UUID = Query(), + session: Session = Depends(get_session), ): try: - dicts = monitor_service.get_transactions( - source=source, target=target, status=status, order_by=order_by, flow_id=flow_id - ) - result = [] - for d in dicts: - d = TransactionModelResponse( - index=d["index"], - timestamp=d["timestamp"], - vertex_id=d["vertex_id"], - inputs=d["inputs"], - outputs=d["outputs"], - status=d["status"], - error=d["error"], - flow_id=d["flow_id"], - source=d["vertex_id"], - target=d["target_id"], + transactions = get_transactions_by_flow_id(session, flow_id) + return [ + TransactionReadResponse( + transaction_id=t.id, + timestamp=t.timestamp, + vertex_id=t.vertex_id, + target_id=t.target_id, + inputs=t.inputs, + outputs=t.outputs, + status=t.status, + error=t.error, + flow_id=t.flow_id, ) - result.append(d) - return result + for t in transactions + ] except Exception as e: raise HTTPException(status_code=500, detail=str(e)) diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index d0adb66ff..4de6396c6 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -22,7 +22,6 @@ from langflow.schema.schema import INPUT_FIELD_NAME, InputType from langflow.services.cache.utils import CacheMiss from langflow.services.chat.service import ChatService from langflow.services.deps import get_chat_service, get_tracing_service -from langflow.services.monitor.utils import log_transaction if TYPE_CHECKING: from langflow.graph.schema import ResultData @@ -913,14 +912,10 @@ class Graph: artifacts = vertex.artifacts else: raise ValueError(f"No result found for vertex {vertex_id}") - flow_id = self.flow_id - log_transaction(flow_id, vertex, status="success") return result_dict, params, valid, artifacts, vertex except Exception as exc: if not isinstance(exc, ComponentBuildException): logger.exception(f"Error building Component: \n\n{exc}") - flow_id = self.flow_id - log_transaction(flow_id, vertex, status="failure", error=str(exc)) raise exc def get_vertex_edges( diff --git a/src/backend/base/langflow/graph/utils.py b/src/backend/base/langflow/graph/utils.py index 7183b923f..a5a1d74cd 100644 --- a/src/backend/base/langflow/graph/utils.py +++ b/src/backend/base/langflow/graph/utils.py @@ -1,5 +1,7 @@ +import json from enum import Enum -from typing import Any, Generator, Union +from typing import TYPE_CHECKING, Any, Generator, Union, Optional +from uuid import UUID from langchain_core.documents import Document from pydantic import BaseModel @@ -7,6 +9,14 @@ from pydantic import BaseModel from langflow.interface.utils import extract_input_variables_from_prompt from langflow.schema.data import Data from langflow.schema.message import Message +from langflow.services.database.models.transactions.model import TransactionBase +from langflow.services.database.models.transactions.crud import log_transaction as crud_log_transaction +from langflow.services.database.utils import session_getter +from langflow.services.deps import get_db_service +from loguru import logger + +if TYPE_CHECKING: + from langflow.graph.vertex.base import Vertex class UnbuiltObject: @@ -98,3 +108,40 @@ def post_process_raw(raw, artifact_type: str): raw = "" return raw + + +def _vertex_to_primitive_dict(target: "Vertex") -> dict: + """ + Cleans the parameters of the target vertex. + """ + # Removes all keys that the values aren't python types like str, int, bool, etc. + params = { + key: value for key, value in target.params.items() if isinstance(value, (str, int, bool, float, list, dict)) + } + # if it is a list we need to check if the contents are python types + for key, value in params.items(): + if isinstance(value, list): + params[key] = [item for item in value if isinstance(item, (str, int, bool, float, list, dict))] + return params + + +async def log_transaction( + flow_id: Union[str, UUID], source: "Vertex", status, target: Optional["Vertex"] = None, error=None +) -> None: + try: + inputs = _vertex_to_primitive_dict(source) + transaction = TransactionBase( + vertex_id=source.id, + target_id=target.id if target else None, + inputs=inputs, + # ugly hack to get the model dump with weird datatypes + outputs=json.loads(source.result.model_dump_json()) if source.result else None, + status=status, + error=error, + flow_id=flow_id if isinstance(flow_id, UUID) else UUID(flow_id), + ) + with session_getter(get_db_service()) as session: + inserted = crud_log_transaction(session, transaction) + logger.debug(f"Logged transaction: {inserted.id}") + except Exception as e: + logger.error(f"Error logging transaction: {e}") diff --git a/src/backend/base/langflow/graph/vertex/base.py b/src/backend/base/langflow/graph/vertex/base.py index 1fc6a08cc..c12bdd88d 100644 --- a/src/backend/base/langflow/graph/vertex/base.py +++ b/src/backend/base/langflow/graph/vertex/base.py @@ -12,7 +12,7 @@ from loguru import logger from langflow.exceptions.component import ComponentBuildException from langflow.graph.schema import INPUT_COMPONENTS, OUTPUT_COMPONENTS, InterfaceComponentTypes, ResultData -from langflow.graph.utils import UnbuiltObject, UnbuiltResult +from langflow.graph.utils import UnbuiltObject, UnbuiltResult, log_transaction from langflow.interface.initialize import loading from langflow.interface.listing import lazy_load_dict from langflow.schema.artifact import ArtifactType @@ -20,7 +20,6 @@ from langflow.schema.data import Data from langflow.schema.message import Message from langflow.schema.schema import INPUT_FIELD_NAME, OutputValue, build_output_logs from langflow.services.deps import get_storage_service -from langflow.services.monitor.utils import log_transaction from langflow.services.tracing.schema import Log from langflow.utils.constants import DIRECT_TYPES from langflow.utils.schemas import ChatOutputResponse @@ -583,11 +582,11 @@ class Vertex: """ flow_id = self.graph.flow_id if not self._built: - log_transaction(flow_id, source=self, target=requester, status="error") + asyncio.create_task(log_transaction(str(flow_id), source=self, target=requester, status="error")) raise ValueError(f"Component {self.display_name} has not been built yet") result = self._built_result if self.use_result else self._built_object - log_transaction(flow_id, source=self, target=requester, status="success") + asyncio.create_task(log_transaction(str(flow_id), source=self, target=requester, status="success")) return result async def _build_vertex_and_update_params(self, key, vertex: "Vertex"): diff --git a/src/backend/base/langflow/graph/vertex/types.py b/src/backend/base/langflow/graph/vertex/types.py index 78c541f65..68d4a4a31 100644 --- a/src/backend/base/langflow/graph/vertex/types.py +++ b/src/backend/base/langflow/graph/vertex/types.py @@ -1,3 +1,4 @@ +import asyncio import json from typing import TYPE_CHECKING, Any, AsyncIterator, Dict, Generator, Iterator, List @@ -6,13 +7,13 @@ from langchain_core.messages import AIMessage, AIMessageChunk from loguru import logger from langflow.graph.schema import CHAT_COMPONENTS, RECORDS_COMPONENTS, InterfaceComponentTypes, ResultData -from langflow.graph.utils import UnbuiltObject, serialize_field +from langflow.graph.utils import UnbuiltObject, serialize_field, log_transaction from langflow.graph.vertex.base import Vertex from langflow.schema import Data from langflow.schema.artifact import ArtifactType from langflow.schema.message import Message from langflow.schema.schema import INPUT_FIELD_NAME -from langflow.services.monitor.utils import log_transaction, log_vertex_build +from langflow.services.monitor.utils import log_vertex_build from langflow.template.field.base import UNDEFINED from langflow.utils.schemas import ChatOutputResponse, DataOutputResponse from langflow.utils.util import unescape_string @@ -81,7 +82,9 @@ class ComponentVertex(Vertex): The built result if use_result is True, else the built object. """ if not self._built: - log_transaction(source=self, target=requester, flow_id=self.graph.flow_id, status="error") + asyncio.create_task( + log_transaction(source=self, target=requester, flow_id=str(self.graph.flow_id), status="error") + ) raise ValueError(f"Component {self.display_name} has not been built yet") if requester is None: @@ -101,7 +104,9 @@ class ComponentVertex(Vertex): raise ValueError(f"Result not found for {edge.source_handle.name}. Results: {self.results}") else: raise ValueError(f"Result not found for {edge.source_handle.name}") - log_transaction(source=self, target=requester, flow_id=self.graph.flow_id, status="success") + asyncio.create_task( + log_transaction(source=self, target=requester, flow_id=str(self.graph.flow_id), status="success") + ) return result def extract_messages_from_artifacts(self, artifacts: Dict[str, Any]) -> List[dict]: diff --git a/src/backend/base/langflow/memory.py b/src/backend/base/langflow/memory.py index f87629327..7561e22c6 100644 --- a/src/backend/base/langflow/memory.py +++ b/src/backend/base/langflow/memory.py @@ -8,7 +8,6 @@ from sqlmodel import Session, col, select from langflow.schema.message import Message from langflow.services.database.models.message.model import MessageRead, MessageTable -from langflow.services.database.utils import migrate_messages_from_monitor_service_to_database from langflow.services.deps import session_scope from langflow.field_typing import BaseChatMessageHistory from langchain_core.messages import BaseMessage @@ -36,8 +35,6 @@ def get_messages( Returns: List[Data]: A list of Data objects representing the retrieved messages. """ - with session_scope() as session: - migrate_messages_from_monitor_service_to_database(session) messages_read: list[Message] = [] with session_scope() as session: stmt = select(MessageTable) diff --git a/src/backend/base/langflow/services/database/models/__init__.py b/src/backend/base/langflow/services/database/models/__init__.py index 6e1f09fe3..60df9c648 100644 --- a/src/backend/base/langflow/services/database/models/__init__.py +++ b/src/backend/base/langflow/services/database/models/__init__.py @@ -4,5 +4,6 @@ from .folder import Folder from .message import MessageTable from .user import User from .variable import Variable +from .transactions import TransactionTable -__all__ = ["Flow", "User", "ApiKey", "Variable", "Folder", "MessageTable"] +__all__ = ["Flow", "User", "ApiKey", "Variable", "Folder", "MessageTable", "TransactionTable"] diff --git a/src/backend/base/langflow/services/database/models/flow/model.py b/src/backend/base/langflow/services/database/models/flow/model.py index 6d3e4aea8..707d9d3f3 100644 --- a/src/backend/base/langflow/services/database/models/flow/model.py +++ b/src/backend/base/langflow/services/database/models/flow/model.py @@ -19,6 +19,7 @@ if TYPE_CHECKING: from langflow.services.database.models.folder import Folder from langflow.services.database.models.message import MessageTable from langflow.services.database.models.user import User + from langflow.services.database.models import TransactionTable class FlowBase(SQLModel): @@ -143,6 +144,7 @@ class Flow(FlowBase, table=True): folder_id: Optional[UUID] = Field(default=None, foreign_key="folder.id", nullable=True, index=True) folder: Optional["Folder"] = Relationship(back_populates="flows") messages: List["MessageTable"] = Relationship(back_populates="flow") + transactions: List["TransactionTable"] = Relationship(back_populates="flow") def to_data(self): serialized = self.model_dump() diff --git a/src/backend/base/langflow/services/database/models/flow/utils.py b/src/backend/base/langflow/services/database/models/flow/utils.py index b8ea9d658..0cb45ebd7 100644 --- a/src/backend/base/langflow/services/database/models/flow/utils.py +++ b/src/backend/base/langflow/services/database/models/flow/utils.py @@ -2,10 +2,13 @@ from typing import Optional from fastapi import Depends from sqlmodel import Session +from sqlalchemy import delete from langflow.services.deps import get_session from .model import Flow +from .. import TransactionTable, MessageTable +from loguru import logger def get_flow_by_id(session: Session = Depends(get_session), flow_id: Optional[str] = None) -> Flow | None: @@ -17,6 +20,15 @@ def get_flow_by_id(session: Session = Depends(get_session), flow_id: Optional[st return session.get(Flow, flow_id) +def delete_flow_by_id(flow_id: str, session: Session) -> None: + """Delete flow by id.""" + # Manually delete flow, transactions and messages because foreign key constraints might be disabled + session.exec(delete(Flow).where(Flow.id == flow_id)) # type: ignore + session.exec(delete(TransactionTable).where(TransactionTable.flow_id == flow_id)) # type: ignore + session.exec(delete(MessageTable).where(MessageTable.flow_id == flow_id)) # type: ignore + logger.info(f"Deleted flow {flow_id}") + + def get_webhook_component_in_flow(flow_data: dict): """Get webhook component in flow data.""" diff --git a/src/backend/base/langflow/services/database/models/transactions/__init__.py b/src/backend/base/langflow/services/database/models/transactions/__init__.py new file mode 100644 index 000000000..0acec05fa --- /dev/null +++ b/src/backend/base/langflow/services/database/models/transactions/__init__.py @@ -0,0 +1,3 @@ +from .model import TransactionTable + +__all__ = ["TransactionTable"] diff --git a/src/backend/base/langflow/services/database/models/transactions/crud.py b/src/backend/base/langflow/services/database/models/transactions/crud.py new file mode 100644 index 000000000..d56260c79 --- /dev/null +++ b/src/backend/base/langflow/services/database/models/transactions/crud.py @@ -0,0 +1,30 @@ +from typing import Optional +from uuid import UUID + +from sqlalchemy.exc import IntegrityError +from sqlmodel import Session, select, col + +from langflow.services.database.models.transactions.model import TransactionBase, TransactionTable + + +def get_transactions_by_flow_id(db: Session, flow_id: UUID, limit: Optional[int] = 1000) -> list[TransactionTable]: + stmt = ( + select(TransactionTable) + .where(TransactionTable.flow_id == flow_id) + .order_by(col(TransactionTable.timestamp)) + .limit(limit) + ) + + transactions = db.exec(stmt) + return [t for t in transactions] + + +def log_transaction(db: Session, transaction: TransactionBase) -> TransactionTable: + table = TransactionTable(**transaction.model_dump()) + db.add(table) + try: + db.commit() + return table + except IntegrityError as e: + db.rollback() + raise e diff --git a/src/backend/base/langflow/services/database/models/transactions/model.py b/src/backend/base/langflow/services/database/models/transactions/model.py new file mode 100644 index 000000000..b07eba15b --- /dev/null +++ b/src/backend/base/langflow/services/database/models/transactions/model.py @@ -0,0 +1,44 @@ +from datetime import datetime, timezone +from typing import TYPE_CHECKING, Optional +from uuid import UUID, uuid4 + +from pydantic import field_validator +from sqlmodel import JSON, Column, Field, Relationship, SQLModel + +if TYPE_CHECKING: + from langflow.services.database.models.flow.model import Flow + + +class TransactionBase(SQLModel): + timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + vertex_id: str = Field(nullable=False) + target_id: Optional[str] = Field(default=None) + inputs: Optional[dict] = Field(default=None, sa_column=Column(JSON)) + outputs: Optional[dict] = Field(default=None, sa_column=Column(JSON)) + status: str = Field(nullable=False) + error: Optional[str] = Field(default=None) + flow_id: UUID = Field(foreign_key="flow.id") + + # Needed for Column(JSON) + class Config: + arbitrary_types_allowed = True + + @field_validator("flow_id", mode="before") + @classmethod + def validate_flow_id(cls, value): + if value is None: + return value + if isinstance(value, str): + value = UUID(value) + return value + + +class TransactionTable(TransactionBase, table=True): + __tablename__ = "transaction" + id: Optional[UUID] = Field(default_factory=uuid4, primary_key=True) + flow: "Flow" = Relationship(back_populates="transactions") + + +class TransactionReadResponse(TransactionBase): + transaction_id: UUID + flow_id: UUID diff --git a/src/backend/base/langflow/services/database/service.py b/src/backend/base/langflow/services/database/service.py index 2fee292a4..b7508e081 100644 --- a/src/backend/base/langflow/services/database/service.py +++ b/src/backend/base/langflow/services/database/service.py @@ -15,7 +15,12 @@ from sqlmodel import Session, SQLModel, create_engine, select, text from langflow.services.base import Service from langflow.services.database import models # noqa from langflow.services.database.models.user.crud import get_user_by_username -from langflow.services.database.utils import Result, TableResults, migrate_messages_from_monitor_service_to_database +from langflow.services.database.utils import ( + Result, + TableResults, + migrate_messages_from_monitor_service_to_database, + migrate_transactions_from_monitor_service_to_database, +) from langflow.services.deps import get_settings_service from langflow.services.utils import teardown_superuser @@ -55,7 +60,6 @@ class DatabaseService(Service): max_overflow=self.settings_service.settings.max_overflow, ) except sa.exc.NoSuchModuleError as exc: - # sqlalchemy.exc.NoSuchModuleError: Can't load plugin: sqlalchemy.dialects:postgres if "postgres" in str(exc) and not self.database_url.startswith("postgresql"): # https://stackoverflow.com/questions/62688256/sqlalchemy-exc-nosuchmoduleerror-cant-load-plugin-sqlalchemy-dialectspostgre self.database_url = self.database_url.replace("postgres://", "postgresql://") @@ -181,14 +185,14 @@ class DatabaseService(Service): logger.info("Alembic not initialized") should_initialize_alembic = True - else: - logger.info("Alembic already initialized") if should_initialize_alembic: try: self.init_alembic(alembic_cfg) except Exception as exc: logger.error(f"Error initializing alembic: {exc}") raise RuntimeError("Error initializing alembic") from exc + else: + logger.info("Alembic already initialized") logger.info(f"Running DB migrations in {self.script_location}") @@ -211,6 +215,10 @@ class DatabaseService(Service): migrate_messages_from_monitor_service_to_database(session) except Exception as exc: logger.error(f"Error migrating messages from monitor service to database: {exc}") + try: + migrate_transactions_from_monitor_service_to_database(session) + except Exception as exc: + logger.error(f"Error migrating transactions from monitor service to database: {exc}") if fix: self.try_downgrade_upgrade_until_success(alembic_cfg) @@ -266,7 +274,7 @@ class DatabaseService(Service): inspector = inspect(self.engine) table_names = inspector.get_table_names() - current_tables = ["flow", "user", "apikey"] + current_tables = ["flow", "user", "apikey", "folder", "message", "variable", "transaction"] if table_names and all(table in table_names for table in current_tables): logger.debug("Database and tables already exist") diff --git a/src/backend/base/langflow/services/database/utils.py b/src/backend/base/langflow/services/database/utils.py index 9500bcc50..b0cee9915 100644 --- a/src/backend/base/langflow/services/database/utils.py +++ b/src/backend/base/langflow/services/database/utils.py @@ -1,3 +1,4 @@ +import json from contextlib import contextmanager from dataclasses import dataclass from typing import TYPE_CHECKING @@ -6,6 +7,7 @@ from alembic.util.exc import CommandError from loguru import logger from sqlmodel import Session, select, text +from langflow.services.database.models import TransactionTable from langflow.services.deps import get_monitor_service if TYPE_CHECKING: @@ -130,7 +132,7 @@ def session_getter(db_service: "DatabaseService"): session = Session(db_service.engine) yield session except Exception as e: - print("Session rollback because of exception:", e) + logger.error("Session rollback because of exception:", e) session.rollback() raise finally: @@ -148,3 +150,31 @@ class Result: class TableResults: table_name: str results: list[Result] + + +def migrate_transactions_from_monitor_service_to_database(session: Session) -> None: + monitor_service = get_monitor_service() + batch = monitor_service.get_transactions() + if not batch: + logger.debug("No transactions to migrate.") + return + to_delete = [] + while batch: + logger.debug(f"Migrating {len(batch)} transactions") + for row in batch: + tt = TransactionTable( + flow_id=row["flow_id"], + status=row["status"], + error=row["error"], + timestamp=row["timestamp"], + vertex_id=row["vertex_id"], + inputs=json.loads(row["inputs"]) if row["inputs"] else None, + outputs=json.loads(row["outputs"]) if row["outputs"] else None, + target_id=row["target_id"], + ) + to_delete.append(row["index"]) + session.add(tt) + session.commit() + monitor_service.delete_transactions(to_delete) + batch = monitor_service.get_transactions() + logger.debug("Transactions migrations completed.") diff --git a/src/backend/base/langflow/services/manager.py b/src/backend/base/langflow/services/manager.py index 02bf95571..b534f1331 100644 --- a/src/backend/base/langflow/services/manager.py +++ b/src/backend/base/langflow/services/manager.py @@ -5,6 +5,8 @@ from typing import TYPE_CHECKING, Dict, Optional from loguru import logger +from langflow.utils.concurrency import KeyedMemoryLockManager + if TYPE_CHECKING: from langflow.services.base import Service from langflow.services.factory import ServiceFactory @@ -24,6 +26,7 @@ class ServiceManager: self.services: Dict[str, "Service"] = {} self.factories = {} self.register_factories() + self.keyed_lock = KeyedMemoryLockManager() def register_factories(self): for factory in self.get_factories(): @@ -49,8 +52,9 @@ class ServiceManager: Get (or create) a service by its name. """ - if service_name not in self.services: - self._create_service(service_name, default) + with self.keyed_lock.lock(service_name): + if service_name not in self.services: + self._create_service(service_name, default) return self.services[service_name] diff --git a/src/backend/base/langflow/services/monitor/service.py b/src/backend/base/langflow/services/monitor/service.py index de1970006..450518301 100644 --- a/src/backend/base/langflow/services/monitor/service.py +++ b/src/backend/base/langflow/services/monitor/service.py @@ -1,16 +1,19 @@ from datetime import datetime from pathlib import Path -from typing import TYPE_CHECKING, Union +from typing import TYPE_CHECKING, Union, List -import duckdb from loguru import logger from platformdirs import user_cache_dir from langflow.services.base import Service -from langflow.services.monitor.utils import add_row_to_table, drop_and_create_table_if_schema_mismatch +from langflow.services.monitor.utils import ( + add_row_to_table, + drop_and_create_table_if_schema_mismatch, + new_duckdb_locked_connection, +) if TYPE_CHECKING: - from langflow.services.monitor.schema import DuckDbMessageModel, TransactionModel, VertexBuildModel + from langflow.services.monitor.schema import VertexBuildModel from langflow.services.settings.service import SettingsService @@ -18,14 +21,12 @@ class MonitorService(Service): name = "monitor_service" def __init__(self, settings_service: "SettingsService"): - from langflow.services.monitor.schema import DuckDbMessageModel, TransactionModel, VertexBuildModel + from langflow.services.monitor.schema import VertexBuildModel self.settings_service = settings_service self.base_cache_dir = Path(user_cache_dir("langflow"), ensure_exists=True) self.db_path = self.base_cache_dir / "monitor.duckdb" - self.table_map: dict[str, type[TransactionModel | DuckDbMessageModel | VertexBuildModel]] = { - "transactions": TransactionModel, - "messages": DuckDbMessageModel, + self.table_map: dict[str, type[VertexBuildModel]] = { "vertex_builds": VertexBuildModel, } @@ -35,7 +36,7 @@ class MonitorService(Service): logger.exception(f"Error initializing monitor service: {e}") def exec_query(self, query: str, read_only: bool = False): - with duckdb.connect(str(self.db_path), read_only=read_only) as conn: + with new_duckdb_locked_connection(self.db_path, read_only=read_only) as conn: return conn.execute(query).df() def to_df(self, table_name): @@ -48,20 +49,17 @@ class MonitorService(Service): def add_row( self, table_name: str, - data: Union[dict, "TransactionModel", "DuckDbMessageModel", "VertexBuildModel"], + data: Union[dict, "VertexBuildModel"], ): - # Make sure the model passed matches the table - model = self.table_map.get(table_name) if model is None: raise ValueError(f"Unknown table name: {table_name}") - # Connect to DuckDB and add the row - with duckdb.connect(str(self.db_path), read_only=False) as conn: + with new_duckdb_locked_connection(self.db_path, read_only=False) as conn: add_row_to_table(conn, table_name, model, data) def load_table_as_dataframe(self, table_name): - with duckdb.connect(str(self.db_path)) as conn: + with new_duckdb_locked_connection(self.db_path, read_only=True) as conn: return conn.table(table_name).df() @staticmethod @@ -99,7 +97,7 @@ class MonitorService(Service): if limit is not None: query += f" LIMIT {limit}" - with duckdb.connect(str(self.db_path), read_only=True) as conn: + with new_duckdb_locked_connection(self.db_path, read_only=True) as conn: df = conn.execute(query).df() return df @@ -127,7 +125,7 @@ class MonitorService(Service): if order_by: query += f" ORDER BY {order_by}" - with duckdb.connect(str(self.db_path), read_only=True) as conn: + with new_duckdb_locked_connection(self.db_path, read_only=True) as conn: df = conn.execute(query).df() return df.to_dict(orient="records") @@ -137,7 +135,7 @@ class MonitorService(Service): if flow_id: query += f" WHERE flow_id = '{flow_id}'" - with duckdb.connect(str(self.db_path), read_only=False) as conn: + with new_duckdb_locked_connection(self.db_path, read_only=False) as conn: conn.execute(query) def delete_messages_session(self, session_id: str): @@ -166,33 +164,14 @@ class MonitorService(Service): return self.exec_query(query, read_only=False) - def get_transactions( - self, - source: str | None = None, - target: str | None = None, - status: str | None = None, - order_by: str | None = "timestamp", - flow_id: str | None = None, - ): - query = ( - "SELECT index,flow_id, status, error, timestamp, vertex_id, inputs, outputs, target_id FROM transactions" - ) - conditions = [] - if source: - conditions.append(f"source = '{source}'") - if target: - conditions.append(f"target = '{target}'") - if status: - conditions.append(f"status = '{status}'") - if flow_id: - conditions.append(f"flow_id = '{flow_id}'") - - if conditions: - query += " WHERE " + " AND ".join(conditions) - - if order_by: - query += f" ORDER BY {order_by} DESC" - with duckdb.connect(str(self.db_path), read_only=True) as conn: + def get_transactions(self, limit: int = 100): + query = f"SELECT index,flow_id, status, error, timestamp, vertex_id, inputs, outputs, target_id FROM transactions LIMIT {str(limit)}" + with new_duckdb_locked_connection(self.db_path, read_only=True) as conn: df = conn.execute(query).df() return df.to_dict(orient="records") + + def delete_transactions(self, ids: List[int]) -> None: + with new_duckdb_locked_connection(self.db_path, read_only=False) as conn: + conn.execute(f"DELETE FROM transactions WHERE index in ({','.join(map(str, ids))})") + conn.commit() diff --git a/src/backend/base/langflow/services/monitor/utils.py b/src/backend/base/langflow/services/monitor/utils.py index 97bb8e6e3..d0af5e839 100644 --- a/src/backend/base/langflow/services/monitor/utils.py +++ b/src/backend/base/langflow/services/monitor/utils.py @@ -1,27 +1,25 @@ +from contextlib import contextmanager +from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, Optional, Type, Union import duckdb -import threading from loguru import logger from pydantic import BaseModel from langflow.services.deps import get_monitor_service +from langflow.utils.concurrency import KeyedWorkerLockManager if TYPE_CHECKING: from langflow.api.v1.schemas import ResultDataResponse - from langflow.graph.vertex.base import Vertex INDEX_KEY = "index" - -# Lock to prevent multiple threads from creating the same table at the same time -drop_create_table_lock = threading.Lock() +worker_lock_manager = KeyedWorkerLockManager() def get_table_schema_as_dict(conn: duckdb.DuckDBPyConnection, table_name: str) -> dict: result = conn.execute(f"PRAGMA table_info('{table_name}')").fetchall() schema = {row[1]: row[2].upper() for row in result} - schema.pop(INDEX_KEY, None) return schema @@ -52,31 +50,40 @@ def model_to_sql_column_definitions(model: Type[BaseModel]) -> dict: def drop_and_create_table_if_schema_mismatch(db_path: str, table_name: str, model: Type[BaseModel]): - with drop_create_table_lock: - with duckdb.connect(db_path) as conn: - # Get the current schema from the database - try: - current_schema = get_table_schema_as_dict(conn, table_name) - except duckdb.CatalogException: - current_schema = {} - # Get the desired schema from the model - desired_schema = model_to_sql_column_definitions(model) + with new_duckdb_locked_connection(db_path) as conn: + # Get the current schema from the database + try: + current_schema = get_table_schema_as_dict(conn, table_name) + except duckdb.CatalogException: + current_schema = {} + # Get the desired schema from the model + desired_schema = model_to_sql_column_definitions(model) - # Compare the current and desired schemas + # Compare the current and desired schemas - if current_schema != desired_schema: - # If they don't match, drop the existing table and create a new one - conn.execute(f"DROP TABLE IF EXISTS {table_name}") - if INDEX_KEY in desired_schema.keys(): - # Create a sequence for the id column - try: - conn.execute(f"CREATE SEQUENCE seq_{table_name} START 1;") - except duckdb.CatalogException: - pass - desired_schema[INDEX_KEY] = f"INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq_{table_name}')" - columns_sql = ", ".join(f"{name} {data_type}" for name, data_type in desired_schema.items()) - create_table_sql = f"CREATE TABLE {table_name} ({columns_sql})" - conn.execute(create_table_sql) + if current_schema != desired_schema: + # If they don't match, drop the existing table and create a new one + logger.warning(f"Schema mismatch for duckdb table {table_name}. Dropping and recreating table.") + logger.debug(f"Current schema: {str(current_schema)}") + logger.debug(f"Desired schema: {str(desired_schema)}") + conn.execute(f"DROP TABLE IF EXISTS {table_name}") + if INDEX_KEY in desired_schema.keys(): + # Create a sequence for the id column + try: + conn.execute(f"CREATE SEQUENCE seq_{table_name} START 1;") + except duckdb.CatalogException: + pass + desired_schema[INDEX_KEY] = f"INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq_{table_name}')" + columns_sql = ", ".join(f"{name} {data_type}" for name, data_type in desired_schema.items()) + create_table_sql = f"CREATE TABLE {table_name} ({columns_sql})" + conn.execute(create_table_sql) + + +@contextmanager +def new_duckdb_locked_connection(db_path: Union[str, Path], read_only=False): + with worker_lock_manager.lock("duckdb"): + with duckdb.connect(str(db_path), read_only=read_only) as conn: + yield conn def add_row_to_table( @@ -166,37 +173,3 @@ async def log_vertex_build( monitor_service.add_row(table_name="vertex_builds", data=row) except Exception as e: logger.exception(f"Error logging vertex build: {e}") - - -def build_clean_params(target: "Vertex") -> dict: - """ - Cleans the parameters of the target vertex. - """ - # Removes all keys that the values aren't python types like str, int, bool, etc. - params = { - key: value for key, value in target.params.items() if isinstance(value, (str, int, bool, float, list, dict)) - } - # if it is a list we need to check if the contents are python types - for key, value in params.items(): - if isinstance(value, list): - params[key] = [item for item in value if isinstance(item, (str, int, bool, float, list, dict))] - return params - - -def log_transaction(flow_id, source: "Vertex", status, target: Optional["Vertex"] = None, error=None): - try: - monitor_service = get_monitor_service() - clean_params = build_clean_params(source) - data = { - "vertex_id": str(source.id), - "target_id": str(target.id) if target else None, - "inputs": clean_params, - "outputs": source.result.model_dump_json() if source.result else None, - "timestamp": monitor_service.get_timestamp(), - "status": status, - "error": error, - "flow_id": flow_id, - } - monitor_service.add_row(table_name="transactions", data=data) - except Exception as e: - logger.error(f"Error logging transaction: {e}") diff --git a/src/backend/base/langflow/utils/concurrency.py b/src/backend/base/langflow/utils/concurrency.py new file mode 100644 index 000000000..a0f810a98 --- /dev/null +++ b/src/backend/base/langflow/utils/concurrency.py @@ -0,0 +1,63 @@ +import re +import threading +from contextlib import contextmanager +from pathlib import Path +from filelock import FileLock + +from platformdirs import user_cache_dir + + +class KeyedMemoryLockManager: + """ + A manager for acquiring and releasing memory locks based on a key + """ + + def __init__(self): + self.locks = {} + self.global_lock = threading.Lock() + + def _get_lock(self, key: str): + with self.global_lock: + if key not in self.locks: + self.locks[key] = threading.Lock() + return self.locks[key] + + @contextmanager + def lock(self, key: str): + lock = self._get_lock(key) + lock.acquire() + try: + yield + finally: + lock.release() + + +class KeyedWorkerLockManager: + """ + A manager for acquiring locks between workers based on a key + """ + + def __init__(self): + self.locks_dir = Path(user_cache_dir("langflow"), ensure_exists=True) / "worker_locks" + + def _validate_key(self, key: str) -> bool: + """ + Validate that the string only contains alphanumeric characters and underscores. + + Parameters: + s (str): The string to validate. + + Returns: + bool: True if the string is valid, False otherwise. + """ + pattern = re.compile(r"^\w+$") + return bool(pattern.match(key)) + + @contextmanager + def lock(self, key: str): + if not self._validate_key(key): + raise ValueError(f"Invalid key: {key}") + + lock = FileLock(self.locks_dir / key) + with lock: + yield diff --git a/src/backend/base/poetry.lock b/src/backend/base/poetry.lock index 168424e1e..6d8026116 100644 --- a/src/backend/base/poetry.lock +++ b/src/backend/base/poetry.lock @@ -782,6 +782,22 @@ typer = ">=0.12.3" [package.extras] standard = ["fastapi", "uvicorn[standard] (>=0.15.0)"] +[[package]] +name = "filelock" +version = "3.15.4" +description = "A platform independent file lock." +optional = false +python-versions = ">=3.8" +files = [ + {file = "filelock-3.15.4-py3-none-any.whl", hash = "sha256:6ca1fffae96225dab4c6eaf1c4f4f28cd2568d3ec2a44e15a08520504de468e7"}, + {file = "filelock-3.15.4.tar.gz", hash = "sha256:2207938cbc1844345cb01a5a95524dae30f0ce089eba5b00378295a17e3e90cb"}, +] + +[package.extras] +docs = ["furo (>=2023.9.10)", "sphinx (>=7.2.6)", "sphinx-autodoc-typehints (>=1.25.2)"] +testing = ["covdefaults (>=2.3)", "coverage (>=7.3.2)", "diff-cover (>=8.0.1)", "pytest (>=7.4.3)", "pytest-asyncio (>=0.21)", "pytest-cov (>=4.1)", "pytest-mock (>=3.12)", "pytest-timeout (>=2.2)", "virtualenv (>=20.26.2)"] +typing = ["typing-extensions (>=4.8)"] + [[package]] name = "firecrawl-py" version = "0.0.16" @@ -3581,4 +3597,4 @@ local = [] [metadata] lock-version = "2.0" python-versions = ">=3.10,<3.13" -content-hash = "21ad684d075f232c2398e9f1e067702c00f1bf157efc08e4de8daa4daa7c0d47" +content-hash = "0b82dcdf0b754b3c9fc0d7f20a7939305f866d49150f1b1fd4bde9f876c4e425" diff --git a/src/backend/base/pyproject.toml b/src/backend/base/pyproject.toml index 49b2f6e2e..8a53a94ca 100644 --- a/src/backend/base/pyproject.toml +++ b/src/backend/base/pyproject.toml @@ -74,6 +74,7 @@ prometheus-client = "^0.20.0" aiofiles = "^24.1.0" setuptools = ">=70" nanoid = "^2.0.0" +filelock = "^3.15.4" [tool.poetry.extras] deploy = ["celery", "redis", "flower"] diff --git a/src/backend/tests/unit/test_database.py b/src/backend/tests/unit/test_database.py index 8cc288266..c08ef79b8 100644 --- a/src/backend/tests/unit/test_database.py +++ b/src/backend/tests/unit/test_database.py @@ -10,8 +10,15 @@ from langflow.api.v1.schemas import FlowListCreate from langflow.initial_setup.setup import load_starter_projects, load_flows_from_directory from langflow.services.database.models.base import orjson_dumps from langflow.services.database.models.flow import Flow, FlowCreate, FlowUpdate -from langflow.services.database.utils import session_getter -from langflow.services.deps import get_db_service +from langflow.services.database.models.transactions.crud import get_transactions_by_flow_id +from langflow.services.database.utils import session_getter, migrate_transactions_from_monitor_service_to_database +from langflow.services.deps import get_db_service, get_monitor_service, session_scope +from langflow.services.monitor.schema import TransactionModel +from langflow.services.monitor.utils import ( + drop_and_create_table_if_schema_mismatch, + new_duckdb_locked_connection, + add_row_to_table, +) @pytest.fixture(scope="module") @@ -281,3 +288,44 @@ def test_load_flows(client: TestClient, load_flows_dir): response = client.get("api/v1/flows/c54f9130-f2fa-4a3e-b22a-3856d946351b") assert response.status_code == 200 assert response.json()["name"] == "BasicExample" + + +@pytest.mark.load_flows +def test_migrate_transactions(client: TestClient): + monitor_service = get_monitor_service() + drop_and_create_table_if_schema_mismatch(str(monitor_service.db_path), "transactions", TransactionModel) + flow_id = "c54f9130-f2fa-4a3e-b22a-3856d946351b" + data = { + "vertex_id": "vid", + "target_id": "tid", + "inputs": {"input_value": True}, + "outputs": {"output_value": True}, + "timestamp": "2021-10-10T10:10:10", + "status": "success", + "error": None, + "flow_id": flow_id, + } + with new_duckdb_locked_connection(str(monitor_service.db_path), read_only=False) as conn: + add_row_to_table(conn, "transactions", TransactionModel, data) + assert 1 == len(monitor_service.get_transactions()) + + with session_scope() as session: + migrate_transactions_from_monitor_service_to_database(session) + new_trans = get_transactions_by_flow_id(session, UUID(flow_id)) + assert 1 == len(new_trans) + t = new_trans[0] + assert t.error is None + assert t.inputs == data["inputs"] + assert t.outputs == data["outputs"] + assert t.status == data["status"] + assert str(t.timestamp) == "2021-10-10 10:10:10" + assert t.vertex_id == data["vertex_id"] + assert t.target_id == data["target_id"] + assert t.flow_id == UUID(flow_id) + + assert 0 == len(monitor_service.get_transactions()) + + client.request("DELETE", f"api/v1/flows/{flow_id}") + with session_scope() as session: + new_trans = get_transactions_by_flow_id(session, UUID(flow_id)) + assert 0 == len(new_trans) diff --git a/src/frontend/src/controllers/API/index.ts b/src/frontend/src/controllers/API/index.ts index 3b4b4adc9..e13ae4434 100644 --- a/src/frontend/src/controllers/API/index.ts +++ b/src/frontend/src/controllers/API/index.ts @@ -1042,18 +1042,3 @@ export async function multipleDeleteFlowsComponents( // Return the responses after all requests are completed return Promise.all(responses); } - -export async function getTransactionTable( - id: string, - mode: "intersection" | "union", - params = {}, -): Promise<{ rows: Array; columns: Array }> { - const config = {}; - config["params"] = { flow_id: id }; - if (params) { - config["params"] = { ...config["params"], ...params }; - } - const rows = await api.get(`${BASE_URL_API}monitor/transactions`, config); - const columns = extractColumnsFromRows(rows.data, mode); - return { rows: rows.data, columns }; -}