feat: migrate transactions to sql database (#2915)

* feat: migrate transactions to sql database

* feat: migrate transactions to sql database

* feat: migrate transactions to sql database

* feat: migrate transactions to sql database

* feat: migrate transactions to sql database

* feat: migrate transactions to sql database

* [autofix.ci] apply automated fixes

* remove useless

* remove useless

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Nicolò Boschi 2024-07-26 16:57:38 +02:00 committed by GitHub
commit 9ac861da2f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 467 additions and 180 deletions

View file

@ -0,0 +1,51 @@
"""create transactions table
Revision ID: 90be8e2ed91e
Revises: 325180f0c4e1
Create Date: 2024-07-24 11:37:48.532933
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
import sqlmodel
from langflow.utils import migration
# revision identifiers, used by Alembic.
revision: str = "90be8e2ed91e"
down_revision: Union[str, None] = "325180f0c4e1"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
conn = op.get_bind()
if not migration.table_exists("transaction", conn):
op.create_table(
"transaction",
sa.Column("timestamp", sa.DateTime(), nullable=False),
sa.Column("vertex_id", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("target_id", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column("inputs", sa.JSON(), nullable=True),
sa.Column("outputs", sa.JSON(), nullable=True),
sa.Column("status", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("flow_id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("error", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.ForeignKeyConstraint(
["flow_id"],
["flow.id"],
),
sa.PrimaryKeyConstraint("id"),
)
pass
def downgrade() -> None:
conn = op.get_bind()
if migration.table_exists("transaction", conn):
op.drop_table("transaction")
pass

View file

@ -18,7 +18,7 @@ from langflow.api.v1.schemas import FlowListCreate, FlowListRead
from langflow.initial_setup.setup import STARTER_FOLDER_NAME
from langflow.services.auth.utils import get_current_active_user
from langflow.services.database.models.flow import Flow, FlowCreate, FlowRead, FlowUpdate
from langflow.services.database.models.flow.utils import get_webhook_component_in_flow
from langflow.services.database.models.flow.utils import get_webhook_component_in_flow, delete_flow_by_id
from langflow.services.database.models.folder.constants import DEFAULT_FOLDER_NAME
from langflow.services.database.models.folder.model import Folder
from langflow.services.database.models.user.model import User
@ -266,7 +266,7 @@ def delete_flow(
)
if not flow:
raise HTTPException(status_code=404, detail="Flow not found")
session.delete(flow)
delete_flow_by_id(str(flow_id), session)
session.commit()
return {"message": "Flow deleted successfully"}

View file

@ -7,9 +7,11 @@ from sqlmodel import Session, col, select
from langflow.services.auth.utils import get_current_active_user
from langflow.services.database.models.message.model import MessageRead, MessageTable, MessageUpdate
from langflow.services.database.models.transactions.crud import get_transactions_by_flow_id
from langflow.services.database.models.transactions.model import TransactionReadResponse
from langflow.services.database.models.user.model import User
from langflow.services.deps import get_monitor_service, get_session
from langflow.services.monitor.schema import MessageModelResponse, TransactionModelResponse, VertexBuildMapModel
from langflow.services.monitor.schema import MessageModelResponse, VertexBuildMapModel
from langflow.services.monitor.service import MonitorService
router = APIRouter(prefix="/monitor", tags=["Monitor"])
@ -126,34 +128,26 @@ async def delete_messages_session(
raise HTTPException(status_code=500, detail=str(e))
@router.get("/transactions", response_model=List[TransactionModelResponse])
@router.get("/transactions", response_model=List[TransactionReadResponse])
async def get_transactions(
source: Optional[str] = Query(None),
target: Optional[str] = Query(None),
status: Optional[str] = Query(None),
order_by: Optional[str] = Query("timestamp"),
flow_id: Optional[str] = Query(None),
monitor_service: MonitorService = Depends(get_monitor_service),
flow_id: UUID = Query(),
session: Session = Depends(get_session),
):
try:
dicts = monitor_service.get_transactions(
source=source, target=target, status=status, order_by=order_by, flow_id=flow_id
)
result = []
for d in dicts:
d = TransactionModelResponse(
index=d["index"],
timestamp=d["timestamp"],
vertex_id=d["vertex_id"],
inputs=d["inputs"],
outputs=d["outputs"],
status=d["status"],
error=d["error"],
flow_id=d["flow_id"],
source=d["vertex_id"],
target=d["target_id"],
transactions = get_transactions_by_flow_id(session, flow_id)
return [
TransactionReadResponse(
transaction_id=t.id,
timestamp=t.timestamp,
vertex_id=t.vertex_id,
target_id=t.target_id,
inputs=t.inputs,
outputs=t.outputs,
status=t.status,
error=t.error,
flow_id=t.flow_id,
)
result.append(d)
return result
for t in transactions
]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

View file

@ -22,7 +22,6 @@ from langflow.schema.schema import INPUT_FIELD_NAME, InputType
from langflow.services.cache.utils import CacheMiss
from langflow.services.chat.service import ChatService
from langflow.services.deps import get_chat_service, get_tracing_service
from langflow.services.monitor.utils import log_transaction
if TYPE_CHECKING:
from langflow.graph.schema import ResultData
@ -913,14 +912,10 @@ class Graph:
artifacts = vertex.artifacts
else:
raise ValueError(f"No result found for vertex {vertex_id}")
flow_id = self.flow_id
log_transaction(flow_id, vertex, status="success")
return result_dict, params, valid, artifacts, vertex
except Exception as exc:
if not isinstance(exc, ComponentBuildException):
logger.exception(f"Error building Component: \n\n{exc}")
flow_id = self.flow_id
log_transaction(flow_id, vertex, status="failure", error=str(exc))
raise exc
def get_vertex_edges(

View file

@ -1,5 +1,7 @@
import json
from enum import Enum
from typing import Any, Generator, Union
from typing import TYPE_CHECKING, Any, Generator, Union, Optional
from uuid import UUID
from langchain_core.documents import Document
from pydantic import BaseModel
@ -7,6 +9,14 @@ from pydantic import BaseModel
from langflow.interface.utils import extract_input_variables_from_prompt
from langflow.schema.data import Data
from langflow.schema.message import Message
from langflow.services.database.models.transactions.model import TransactionBase
from langflow.services.database.models.transactions.crud import log_transaction as crud_log_transaction
from langflow.services.database.utils import session_getter
from langflow.services.deps import get_db_service
from loguru import logger
if TYPE_CHECKING:
from langflow.graph.vertex.base import Vertex
class UnbuiltObject:
@ -98,3 +108,40 @@ def post_process_raw(raw, artifact_type: str):
raw = ""
return raw
def _vertex_to_primitive_dict(target: "Vertex") -> dict:
"""
Cleans the parameters of the target vertex.
"""
# Removes all keys that the values aren't python types like str, int, bool, etc.
params = {
key: value for key, value in target.params.items() if isinstance(value, (str, int, bool, float, list, dict))
}
# if it is a list we need to check if the contents are python types
for key, value in params.items():
if isinstance(value, list):
params[key] = [item for item in value if isinstance(item, (str, int, bool, float, list, dict))]
return params
async def log_transaction(
flow_id: Union[str, UUID], source: "Vertex", status, target: Optional["Vertex"] = None, error=None
) -> None:
try:
inputs = _vertex_to_primitive_dict(source)
transaction = TransactionBase(
vertex_id=source.id,
target_id=target.id if target else None,
inputs=inputs,
# ugly hack to get the model dump with weird datatypes
outputs=json.loads(source.result.model_dump_json()) if source.result else None,
status=status,
error=error,
flow_id=flow_id if isinstance(flow_id, UUID) else UUID(flow_id),
)
with session_getter(get_db_service()) as session:
inserted = crud_log_transaction(session, transaction)
logger.debug(f"Logged transaction: {inserted.id}")
except Exception as e:
logger.error(f"Error logging transaction: {e}")

View file

@ -12,7 +12,7 @@ from loguru import logger
from langflow.exceptions.component import ComponentBuildException
from langflow.graph.schema import INPUT_COMPONENTS, OUTPUT_COMPONENTS, InterfaceComponentTypes, ResultData
from langflow.graph.utils import UnbuiltObject, UnbuiltResult
from langflow.graph.utils import UnbuiltObject, UnbuiltResult, log_transaction
from langflow.interface.initialize import loading
from langflow.interface.listing import lazy_load_dict
from langflow.schema.artifact import ArtifactType
@ -20,7 +20,6 @@ from langflow.schema.data import Data
from langflow.schema.message import Message
from langflow.schema.schema import INPUT_FIELD_NAME, OutputValue, build_output_logs
from langflow.services.deps import get_storage_service
from langflow.services.monitor.utils import log_transaction
from langflow.services.tracing.schema import Log
from langflow.utils.constants import DIRECT_TYPES
from langflow.utils.schemas import ChatOutputResponse
@ -583,11 +582,11 @@ class Vertex:
"""
flow_id = self.graph.flow_id
if not self._built:
log_transaction(flow_id, source=self, target=requester, status="error")
asyncio.create_task(log_transaction(str(flow_id), source=self, target=requester, status="error"))
raise ValueError(f"Component {self.display_name} has not been built yet")
result = self._built_result if self.use_result else self._built_object
log_transaction(flow_id, source=self, target=requester, status="success")
asyncio.create_task(log_transaction(str(flow_id), source=self, target=requester, status="success"))
return result
async def _build_vertex_and_update_params(self, key, vertex: "Vertex"):

View file

@ -1,3 +1,4 @@
import asyncio
import json
from typing import TYPE_CHECKING, Any, AsyncIterator, Dict, Generator, Iterator, List
@ -6,13 +7,13 @@ from langchain_core.messages import AIMessage, AIMessageChunk
from loguru import logger
from langflow.graph.schema import CHAT_COMPONENTS, RECORDS_COMPONENTS, InterfaceComponentTypes, ResultData
from langflow.graph.utils import UnbuiltObject, serialize_field
from langflow.graph.utils import UnbuiltObject, serialize_field, log_transaction
from langflow.graph.vertex.base import Vertex
from langflow.schema import Data
from langflow.schema.artifact import ArtifactType
from langflow.schema.message import Message
from langflow.schema.schema import INPUT_FIELD_NAME
from langflow.services.monitor.utils import log_transaction, log_vertex_build
from langflow.services.monitor.utils import log_vertex_build
from langflow.template.field.base import UNDEFINED
from langflow.utils.schemas import ChatOutputResponse, DataOutputResponse
from langflow.utils.util import unescape_string
@ -81,7 +82,9 @@ class ComponentVertex(Vertex):
The built result if use_result is True, else the built object.
"""
if not self._built:
log_transaction(source=self, target=requester, flow_id=self.graph.flow_id, status="error")
asyncio.create_task(
log_transaction(source=self, target=requester, flow_id=str(self.graph.flow_id), status="error")
)
raise ValueError(f"Component {self.display_name} has not been built yet")
if requester is None:
@ -101,7 +104,9 @@ class ComponentVertex(Vertex):
raise ValueError(f"Result not found for {edge.source_handle.name}. Results: {self.results}")
else:
raise ValueError(f"Result not found for {edge.source_handle.name}")
log_transaction(source=self, target=requester, flow_id=self.graph.flow_id, status="success")
asyncio.create_task(
log_transaction(source=self, target=requester, flow_id=str(self.graph.flow_id), status="success")
)
return result
def extract_messages_from_artifacts(self, artifacts: Dict[str, Any]) -> List[dict]:

View file

@ -8,7 +8,6 @@ from sqlmodel import Session, col, select
from langflow.schema.message import Message
from langflow.services.database.models.message.model import MessageRead, MessageTable
from langflow.services.database.utils import migrate_messages_from_monitor_service_to_database
from langflow.services.deps import session_scope
from langflow.field_typing import BaseChatMessageHistory
from langchain_core.messages import BaseMessage
@ -36,8 +35,6 @@ def get_messages(
Returns:
List[Data]: A list of Data objects representing the retrieved messages.
"""
with session_scope() as session:
migrate_messages_from_monitor_service_to_database(session)
messages_read: list[Message] = []
with session_scope() as session:
stmt = select(MessageTable)

View file

@ -4,5 +4,6 @@ from .folder import Folder
from .message import MessageTable
from .user import User
from .variable import Variable
from .transactions import TransactionTable
__all__ = ["Flow", "User", "ApiKey", "Variable", "Folder", "MessageTable"]
__all__ = ["Flow", "User", "ApiKey", "Variable", "Folder", "MessageTable", "TransactionTable"]

View file

@ -19,6 +19,7 @@ if TYPE_CHECKING:
from langflow.services.database.models.folder import Folder
from langflow.services.database.models.message import MessageTable
from langflow.services.database.models.user import User
from langflow.services.database.models import TransactionTable
class FlowBase(SQLModel):
@ -143,6 +144,7 @@ class Flow(FlowBase, table=True):
folder_id: Optional[UUID] = Field(default=None, foreign_key="folder.id", nullable=True, index=True)
folder: Optional["Folder"] = Relationship(back_populates="flows")
messages: List["MessageTable"] = Relationship(back_populates="flow")
transactions: List["TransactionTable"] = Relationship(back_populates="flow")
def to_data(self):
serialized = self.model_dump()

View file

@ -2,10 +2,13 @@ from typing import Optional
from fastapi import Depends
from sqlmodel import Session
from sqlalchemy import delete
from langflow.services.deps import get_session
from .model import Flow
from .. import TransactionTable, MessageTable
from loguru import logger
def get_flow_by_id(session: Session = Depends(get_session), flow_id: Optional[str] = None) -> Flow | None:
@ -17,6 +20,15 @@ def get_flow_by_id(session: Session = Depends(get_session), flow_id: Optional[st
return session.get(Flow, flow_id)
def delete_flow_by_id(flow_id: str, session: Session) -> None:
"""Delete flow by id."""
# Manually delete flow, transactions and messages because foreign key constraints might be disabled
session.exec(delete(Flow).where(Flow.id == flow_id)) # type: ignore
session.exec(delete(TransactionTable).where(TransactionTable.flow_id == flow_id)) # type: ignore
session.exec(delete(MessageTable).where(MessageTable.flow_id == flow_id)) # type: ignore
logger.info(f"Deleted flow {flow_id}")
def get_webhook_component_in_flow(flow_data: dict):
"""Get webhook component in flow data."""

View file

@ -0,0 +1,3 @@
from .model import TransactionTable
__all__ = ["TransactionTable"]

View file

@ -0,0 +1,30 @@
from typing import Optional
from uuid import UUID
from sqlalchemy.exc import IntegrityError
from sqlmodel import Session, select, col
from langflow.services.database.models.transactions.model import TransactionBase, TransactionTable
def get_transactions_by_flow_id(db: Session, flow_id: UUID, limit: Optional[int] = 1000) -> list[TransactionTable]:
stmt = (
select(TransactionTable)
.where(TransactionTable.flow_id == flow_id)
.order_by(col(TransactionTable.timestamp))
.limit(limit)
)
transactions = db.exec(stmt)
return [t for t in transactions]
def log_transaction(db: Session, transaction: TransactionBase) -> TransactionTable:
table = TransactionTable(**transaction.model_dump())
db.add(table)
try:
db.commit()
return table
except IntegrityError as e:
db.rollback()
raise e

View file

@ -0,0 +1,44 @@
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Optional
from uuid import UUID, uuid4
from pydantic import field_validator
from sqlmodel import JSON, Column, Field, Relationship, SQLModel
if TYPE_CHECKING:
from langflow.services.database.models.flow.model import Flow
class TransactionBase(SQLModel):
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
vertex_id: str = Field(nullable=False)
target_id: Optional[str] = Field(default=None)
inputs: Optional[dict] = Field(default=None, sa_column=Column(JSON))
outputs: Optional[dict] = Field(default=None, sa_column=Column(JSON))
status: str = Field(nullable=False)
error: Optional[str] = Field(default=None)
flow_id: UUID = Field(foreign_key="flow.id")
# Needed for Column(JSON)
class Config:
arbitrary_types_allowed = True
@field_validator("flow_id", mode="before")
@classmethod
def validate_flow_id(cls, value):
if value is None:
return value
if isinstance(value, str):
value = UUID(value)
return value
class TransactionTable(TransactionBase, table=True):
__tablename__ = "transaction"
id: Optional[UUID] = Field(default_factory=uuid4, primary_key=True)
flow: "Flow" = Relationship(back_populates="transactions")
class TransactionReadResponse(TransactionBase):
transaction_id: UUID
flow_id: UUID

View file

@ -15,7 +15,12 @@ from sqlmodel import Session, SQLModel, create_engine, select, text
from langflow.services.base import Service
from langflow.services.database import models # noqa
from langflow.services.database.models.user.crud import get_user_by_username
from langflow.services.database.utils import Result, TableResults, migrate_messages_from_monitor_service_to_database
from langflow.services.database.utils import (
Result,
TableResults,
migrate_messages_from_monitor_service_to_database,
migrate_transactions_from_monitor_service_to_database,
)
from langflow.services.deps import get_settings_service
from langflow.services.utils import teardown_superuser
@ -55,7 +60,6 @@ class DatabaseService(Service):
max_overflow=self.settings_service.settings.max_overflow,
)
except sa.exc.NoSuchModuleError as exc:
# sqlalchemy.exc.NoSuchModuleError: Can't load plugin: sqlalchemy.dialects:postgres
if "postgres" in str(exc) and not self.database_url.startswith("postgresql"):
# https://stackoverflow.com/questions/62688256/sqlalchemy-exc-nosuchmoduleerror-cant-load-plugin-sqlalchemy-dialectspostgre
self.database_url = self.database_url.replace("postgres://", "postgresql://")
@ -181,14 +185,14 @@ class DatabaseService(Service):
logger.info("Alembic not initialized")
should_initialize_alembic = True
else:
logger.info("Alembic already initialized")
if should_initialize_alembic:
try:
self.init_alembic(alembic_cfg)
except Exception as exc:
logger.error(f"Error initializing alembic: {exc}")
raise RuntimeError("Error initializing alembic") from exc
else:
logger.info("Alembic already initialized")
logger.info(f"Running DB migrations in {self.script_location}")
@ -211,6 +215,10 @@ class DatabaseService(Service):
migrate_messages_from_monitor_service_to_database(session)
except Exception as exc:
logger.error(f"Error migrating messages from monitor service to database: {exc}")
try:
migrate_transactions_from_monitor_service_to_database(session)
except Exception as exc:
logger.error(f"Error migrating transactions from monitor service to database: {exc}")
if fix:
self.try_downgrade_upgrade_until_success(alembic_cfg)
@ -266,7 +274,7 @@ class DatabaseService(Service):
inspector = inspect(self.engine)
table_names = inspector.get_table_names()
current_tables = ["flow", "user", "apikey"]
current_tables = ["flow", "user", "apikey", "folder", "message", "variable", "transaction"]
if table_names and all(table in table_names for table in current_tables):
logger.debug("Database and tables already exist")

View file

@ -1,3 +1,4 @@
import json
from contextlib import contextmanager
from dataclasses import dataclass
from typing import TYPE_CHECKING
@ -6,6 +7,7 @@ from alembic.util.exc import CommandError
from loguru import logger
from sqlmodel import Session, select, text
from langflow.services.database.models import TransactionTable
from langflow.services.deps import get_monitor_service
if TYPE_CHECKING:
@ -130,7 +132,7 @@ def session_getter(db_service: "DatabaseService"):
session = Session(db_service.engine)
yield session
except Exception as e:
print("Session rollback because of exception:", e)
logger.error("Session rollback because of exception:", e)
session.rollback()
raise
finally:
@ -148,3 +150,31 @@ class Result:
class TableResults:
table_name: str
results: list[Result]
def migrate_transactions_from_monitor_service_to_database(session: Session) -> None:
monitor_service = get_monitor_service()
batch = monitor_service.get_transactions()
if not batch:
logger.debug("No transactions to migrate.")
return
to_delete = []
while batch:
logger.debug(f"Migrating {len(batch)} transactions")
for row in batch:
tt = TransactionTable(
flow_id=row["flow_id"],
status=row["status"],
error=row["error"],
timestamp=row["timestamp"],
vertex_id=row["vertex_id"],
inputs=json.loads(row["inputs"]) if row["inputs"] else None,
outputs=json.loads(row["outputs"]) if row["outputs"] else None,
target_id=row["target_id"],
)
to_delete.append(row["index"])
session.add(tt)
session.commit()
monitor_service.delete_transactions(to_delete)
batch = monitor_service.get_transactions()
logger.debug("Transactions migrations completed.")

View file

@ -5,6 +5,8 @@ from typing import TYPE_CHECKING, Dict, Optional
from loguru import logger
from langflow.utils.concurrency import KeyedMemoryLockManager
if TYPE_CHECKING:
from langflow.services.base import Service
from langflow.services.factory import ServiceFactory
@ -24,6 +26,7 @@ class ServiceManager:
self.services: Dict[str, "Service"] = {}
self.factories = {}
self.register_factories()
self.keyed_lock = KeyedMemoryLockManager()
def register_factories(self):
for factory in self.get_factories():
@ -49,8 +52,9 @@ class ServiceManager:
Get (or create) a service by its name.
"""
if service_name not in self.services:
self._create_service(service_name, default)
with self.keyed_lock.lock(service_name):
if service_name not in self.services:
self._create_service(service_name, default)
return self.services[service_name]

View file

@ -1,16 +1,19 @@
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Union
from typing import TYPE_CHECKING, Union, List
import duckdb
from loguru import logger
from platformdirs import user_cache_dir
from langflow.services.base import Service
from langflow.services.monitor.utils import add_row_to_table, drop_and_create_table_if_schema_mismatch
from langflow.services.monitor.utils import (
add_row_to_table,
drop_and_create_table_if_schema_mismatch,
new_duckdb_locked_connection,
)
if TYPE_CHECKING:
from langflow.services.monitor.schema import DuckDbMessageModel, TransactionModel, VertexBuildModel
from langflow.services.monitor.schema import VertexBuildModel
from langflow.services.settings.service import SettingsService
@ -18,14 +21,12 @@ class MonitorService(Service):
name = "monitor_service"
def __init__(self, settings_service: "SettingsService"):
from langflow.services.monitor.schema import DuckDbMessageModel, TransactionModel, VertexBuildModel
from langflow.services.monitor.schema import VertexBuildModel
self.settings_service = settings_service
self.base_cache_dir = Path(user_cache_dir("langflow"), ensure_exists=True)
self.db_path = self.base_cache_dir / "monitor.duckdb"
self.table_map: dict[str, type[TransactionModel | DuckDbMessageModel | VertexBuildModel]] = {
"transactions": TransactionModel,
"messages": DuckDbMessageModel,
self.table_map: dict[str, type[VertexBuildModel]] = {
"vertex_builds": VertexBuildModel,
}
@ -35,7 +36,7 @@ class MonitorService(Service):
logger.exception(f"Error initializing monitor service: {e}")
def exec_query(self, query: str, read_only: bool = False):
with duckdb.connect(str(self.db_path), read_only=read_only) as conn:
with new_duckdb_locked_connection(self.db_path, read_only=read_only) as conn:
return conn.execute(query).df()
def to_df(self, table_name):
@ -48,20 +49,17 @@ class MonitorService(Service):
def add_row(
self,
table_name: str,
data: Union[dict, "TransactionModel", "DuckDbMessageModel", "VertexBuildModel"],
data: Union[dict, "VertexBuildModel"],
):
# Make sure the model passed matches the table
model = self.table_map.get(table_name)
if model is None:
raise ValueError(f"Unknown table name: {table_name}")
# Connect to DuckDB and add the row
with duckdb.connect(str(self.db_path), read_only=False) as conn:
with new_duckdb_locked_connection(self.db_path, read_only=False) as conn:
add_row_to_table(conn, table_name, model, data)
def load_table_as_dataframe(self, table_name):
with duckdb.connect(str(self.db_path)) as conn:
with new_duckdb_locked_connection(self.db_path, read_only=True) as conn:
return conn.table(table_name).df()
@staticmethod
@ -99,7 +97,7 @@ class MonitorService(Service):
if limit is not None:
query += f" LIMIT {limit}"
with duckdb.connect(str(self.db_path), read_only=True) as conn:
with new_duckdb_locked_connection(self.db_path, read_only=True) as conn:
df = conn.execute(query).df()
return df
@ -127,7 +125,7 @@ class MonitorService(Service):
if order_by:
query += f" ORDER BY {order_by}"
with duckdb.connect(str(self.db_path), read_only=True) as conn:
with new_duckdb_locked_connection(self.db_path, read_only=True) as conn:
df = conn.execute(query).df()
return df.to_dict(orient="records")
@ -137,7 +135,7 @@ class MonitorService(Service):
if flow_id:
query += f" WHERE flow_id = '{flow_id}'"
with duckdb.connect(str(self.db_path), read_only=False) as conn:
with new_duckdb_locked_connection(self.db_path, read_only=False) as conn:
conn.execute(query)
def delete_messages_session(self, session_id: str):
@ -166,33 +164,14 @@ class MonitorService(Service):
return self.exec_query(query, read_only=False)
def get_transactions(
self,
source: str | None = None,
target: str | None = None,
status: str | None = None,
order_by: str | None = "timestamp",
flow_id: str | None = None,
):
query = (
"SELECT index,flow_id, status, error, timestamp, vertex_id, inputs, outputs, target_id FROM transactions"
)
conditions = []
if source:
conditions.append(f"source = '{source}'")
if target:
conditions.append(f"target = '{target}'")
if status:
conditions.append(f"status = '{status}'")
if flow_id:
conditions.append(f"flow_id = '{flow_id}'")
if conditions:
query += " WHERE " + " AND ".join(conditions)
if order_by:
query += f" ORDER BY {order_by} DESC"
with duckdb.connect(str(self.db_path), read_only=True) as conn:
def get_transactions(self, limit: int = 100):
query = f"SELECT index,flow_id, status, error, timestamp, vertex_id, inputs, outputs, target_id FROM transactions LIMIT {str(limit)}"
with new_duckdb_locked_connection(self.db_path, read_only=True) as conn:
df = conn.execute(query).df()
return df.to_dict(orient="records")
def delete_transactions(self, ids: List[int]) -> None:
with new_duckdb_locked_connection(self.db_path, read_only=False) as conn:
conn.execute(f"DELETE FROM transactions WHERE index in ({','.join(map(str, ids))})")
conn.commit()

View file

@ -1,27 +1,25 @@
from contextlib import contextmanager
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Optional, Type, Union
import duckdb
import threading
from loguru import logger
from pydantic import BaseModel
from langflow.services.deps import get_monitor_service
from langflow.utils.concurrency import KeyedWorkerLockManager
if TYPE_CHECKING:
from langflow.api.v1.schemas import ResultDataResponse
from langflow.graph.vertex.base import Vertex
INDEX_KEY = "index"
# Lock to prevent multiple threads from creating the same table at the same time
drop_create_table_lock = threading.Lock()
worker_lock_manager = KeyedWorkerLockManager()
def get_table_schema_as_dict(conn: duckdb.DuckDBPyConnection, table_name: str) -> dict:
result = conn.execute(f"PRAGMA table_info('{table_name}')").fetchall()
schema = {row[1]: row[2].upper() for row in result}
schema.pop(INDEX_KEY, None)
return schema
@ -52,31 +50,40 @@ def model_to_sql_column_definitions(model: Type[BaseModel]) -> dict:
def drop_and_create_table_if_schema_mismatch(db_path: str, table_name: str, model: Type[BaseModel]):
with drop_create_table_lock:
with duckdb.connect(db_path) as conn:
# Get the current schema from the database
try:
current_schema = get_table_schema_as_dict(conn, table_name)
except duckdb.CatalogException:
current_schema = {}
# Get the desired schema from the model
desired_schema = model_to_sql_column_definitions(model)
with new_duckdb_locked_connection(db_path) as conn:
# Get the current schema from the database
try:
current_schema = get_table_schema_as_dict(conn, table_name)
except duckdb.CatalogException:
current_schema = {}
# Get the desired schema from the model
desired_schema = model_to_sql_column_definitions(model)
# Compare the current and desired schemas
# Compare the current and desired schemas
if current_schema != desired_schema:
# If they don't match, drop the existing table and create a new one
conn.execute(f"DROP TABLE IF EXISTS {table_name}")
if INDEX_KEY in desired_schema.keys():
# Create a sequence for the id column
try:
conn.execute(f"CREATE SEQUENCE seq_{table_name} START 1;")
except duckdb.CatalogException:
pass
desired_schema[INDEX_KEY] = f"INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq_{table_name}')"
columns_sql = ", ".join(f"{name} {data_type}" for name, data_type in desired_schema.items())
create_table_sql = f"CREATE TABLE {table_name} ({columns_sql})"
conn.execute(create_table_sql)
if current_schema != desired_schema:
# If they don't match, drop the existing table and create a new one
logger.warning(f"Schema mismatch for duckdb table {table_name}. Dropping and recreating table.")
logger.debug(f"Current schema: {str(current_schema)}")
logger.debug(f"Desired schema: {str(desired_schema)}")
conn.execute(f"DROP TABLE IF EXISTS {table_name}")
if INDEX_KEY in desired_schema.keys():
# Create a sequence for the id column
try:
conn.execute(f"CREATE SEQUENCE seq_{table_name} START 1;")
except duckdb.CatalogException:
pass
desired_schema[INDEX_KEY] = f"INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq_{table_name}')"
columns_sql = ", ".join(f"{name} {data_type}" for name, data_type in desired_schema.items())
create_table_sql = f"CREATE TABLE {table_name} ({columns_sql})"
conn.execute(create_table_sql)
@contextmanager
def new_duckdb_locked_connection(db_path: Union[str, Path], read_only=False):
with worker_lock_manager.lock("duckdb"):
with duckdb.connect(str(db_path), read_only=read_only) as conn:
yield conn
def add_row_to_table(
@ -166,37 +173,3 @@ async def log_vertex_build(
monitor_service.add_row(table_name="vertex_builds", data=row)
except Exception as e:
logger.exception(f"Error logging vertex build: {e}")
def build_clean_params(target: "Vertex") -> dict:
"""
Cleans the parameters of the target vertex.
"""
# Removes all keys that the values aren't python types like str, int, bool, etc.
params = {
key: value for key, value in target.params.items() if isinstance(value, (str, int, bool, float, list, dict))
}
# if it is a list we need to check if the contents are python types
for key, value in params.items():
if isinstance(value, list):
params[key] = [item for item in value if isinstance(item, (str, int, bool, float, list, dict))]
return params
def log_transaction(flow_id, source: "Vertex", status, target: Optional["Vertex"] = None, error=None):
try:
monitor_service = get_monitor_service()
clean_params = build_clean_params(source)
data = {
"vertex_id": str(source.id),
"target_id": str(target.id) if target else None,
"inputs": clean_params,
"outputs": source.result.model_dump_json() if source.result else None,
"timestamp": monitor_service.get_timestamp(),
"status": status,
"error": error,
"flow_id": flow_id,
}
monitor_service.add_row(table_name="transactions", data=data)
except Exception as e:
logger.error(f"Error logging transaction: {e}")

View file

@ -0,0 +1,63 @@
import re
import threading
from contextlib import contextmanager
from pathlib import Path
from filelock import FileLock
from platformdirs import user_cache_dir
class KeyedMemoryLockManager:
"""
A manager for acquiring and releasing memory locks based on a key
"""
def __init__(self):
self.locks = {}
self.global_lock = threading.Lock()
def _get_lock(self, key: str):
with self.global_lock:
if key not in self.locks:
self.locks[key] = threading.Lock()
return self.locks[key]
@contextmanager
def lock(self, key: str):
lock = self._get_lock(key)
lock.acquire()
try:
yield
finally:
lock.release()
class KeyedWorkerLockManager:
"""
A manager for acquiring locks between workers based on a key
"""
def __init__(self):
self.locks_dir = Path(user_cache_dir("langflow"), ensure_exists=True) / "worker_locks"
def _validate_key(self, key: str) -> bool:
"""
Validate that the string only contains alphanumeric characters and underscores.
Parameters:
s (str): The string to validate.
Returns:
bool: True if the string is valid, False otherwise.
"""
pattern = re.compile(r"^\w+$")
return bool(pattern.match(key))
@contextmanager
def lock(self, key: str):
if not self._validate_key(key):
raise ValueError(f"Invalid key: {key}")
lock = FileLock(self.locks_dir / key)
with lock:
yield

View file

@ -782,6 +782,22 @@ typer = ">=0.12.3"
[package.extras]
standard = ["fastapi", "uvicorn[standard] (>=0.15.0)"]
[[package]]
name = "filelock"
version = "3.15.4"
description = "A platform independent file lock."
optional = false
python-versions = ">=3.8"
files = [
{file = "filelock-3.15.4-py3-none-any.whl", hash = "sha256:6ca1fffae96225dab4c6eaf1c4f4f28cd2568d3ec2a44e15a08520504de468e7"},
{file = "filelock-3.15.4.tar.gz", hash = "sha256:2207938cbc1844345cb01a5a95524dae30f0ce089eba5b00378295a17e3e90cb"},
]
[package.extras]
docs = ["furo (>=2023.9.10)", "sphinx (>=7.2.6)", "sphinx-autodoc-typehints (>=1.25.2)"]
testing = ["covdefaults (>=2.3)", "coverage (>=7.3.2)", "diff-cover (>=8.0.1)", "pytest (>=7.4.3)", "pytest-asyncio (>=0.21)", "pytest-cov (>=4.1)", "pytest-mock (>=3.12)", "pytest-timeout (>=2.2)", "virtualenv (>=20.26.2)"]
typing = ["typing-extensions (>=4.8)"]
[[package]]
name = "firecrawl-py"
version = "0.0.16"
@ -3581,4 +3597,4 @@ local = []
[metadata]
lock-version = "2.0"
python-versions = ">=3.10,<3.13"
content-hash = "21ad684d075f232c2398e9f1e067702c00f1bf157efc08e4de8daa4daa7c0d47"
content-hash = "0b82dcdf0b754b3c9fc0d7f20a7939305f866d49150f1b1fd4bde9f876c4e425"

View file

@ -74,6 +74,7 @@ prometheus-client = "^0.20.0"
aiofiles = "^24.1.0"
setuptools = ">=70"
nanoid = "^2.0.0"
filelock = "^3.15.4"
[tool.poetry.extras]
deploy = ["celery", "redis", "flower"]

View file

@ -10,8 +10,15 @@ from langflow.api.v1.schemas import FlowListCreate
from langflow.initial_setup.setup import load_starter_projects, load_flows_from_directory
from langflow.services.database.models.base import orjson_dumps
from langflow.services.database.models.flow import Flow, FlowCreate, FlowUpdate
from langflow.services.database.utils import session_getter
from langflow.services.deps import get_db_service
from langflow.services.database.models.transactions.crud import get_transactions_by_flow_id
from langflow.services.database.utils import session_getter, migrate_transactions_from_monitor_service_to_database
from langflow.services.deps import get_db_service, get_monitor_service, session_scope
from langflow.services.monitor.schema import TransactionModel
from langflow.services.monitor.utils import (
drop_and_create_table_if_schema_mismatch,
new_duckdb_locked_connection,
add_row_to_table,
)
@pytest.fixture(scope="module")
@ -281,3 +288,44 @@ def test_load_flows(client: TestClient, load_flows_dir):
response = client.get("api/v1/flows/c54f9130-f2fa-4a3e-b22a-3856d946351b")
assert response.status_code == 200
assert response.json()["name"] == "BasicExample"
@pytest.mark.load_flows
def test_migrate_transactions(client: TestClient):
monitor_service = get_monitor_service()
drop_and_create_table_if_schema_mismatch(str(monitor_service.db_path), "transactions", TransactionModel)
flow_id = "c54f9130-f2fa-4a3e-b22a-3856d946351b"
data = {
"vertex_id": "vid",
"target_id": "tid",
"inputs": {"input_value": True},
"outputs": {"output_value": True},
"timestamp": "2021-10-10T10:10:10",
"status": "success",
"error": None,
"flow_id": flow_id,
}
with new_duckdb_locked_connection(str(monitor_service.db_path), read_only=False) as conn:
add_row_to_table(conn, "transactions", TransactionModel, data)
assert 1 == len(monitor_service.get_transactions())
with session_scope() as session:
migrate_transactions_from_monitor_service_to_database(session)
new_trans = get_transactions_by_flow_id(session, UUID(flow_id))
assert 1 == len(new_trans)
t = new_trans[0]
assert t.error is None
assert t.inputs == data["inputs"]
assert t.outputs == data["outputs"]
assert t.status == data["status"]
assert str(t.timestamp) == "2021-10-10 10:10:10"
assert t.vertex_id == data["vertex_id"]
assert t.target_id == data["target_id"]
assert t.flow_id == UUID(flow_id)
assert 0 == len(monitor_service.get_transactions())
client.request("DELETE", f"api/v1/flows/{flow_id}")
with session_scope() as session:
new_trans = get_transactions_by_flow_id(session, UUID(flow_id))
assert 0 == len(new_trans)

View file

@ -1042,18 +1042,3 @@ export async function multipleDeleteFlowsComponents(
// Return the responses after all requests are completed
return Promise.all(responses);
}
export async function getTransactionTable(
id: string,
mode: "intersection" | "union",
params = {},
): Promise<{ rows: Array<object>; columns: Array<ColDef | ColGroupDef> }> {
const config = {};
config["params"] = { flow_id: id };
if (params) {
config["params"] = { ...config["params"], ...params };
}
const rows = await api.get(`${BASE_URL_API}monitor/transactions`, config);
const columns = extractColumnsFromRows(rows.data, mode);
return { rows: rows.data, columns };
}