chore: drop duckdb usage and migrations (#3730)

* chore: drop duckdb usage and migrations

* [autofix.ci] apply automated fixes

* Add DefaultModel and MessageResponse classes with custom JSON serialization and validation

- Introduced `DefaultModel` class with custom JSON encoders and serialization methods.
- Added `MessageResponse` class inheriting from `DefaultModel` with fields for message details and custom validators/serializers.
- Enhanced file handling and timestamp formatting in `MessageResponse`.

* Refactor: Replace `MessageModelResponse` with `MessageResponse` in monitor API

- Updated import statements to use `MessageResponse` from `langflow.schema.message`.
- Modified `/messages` endpoint to return `list[MessageResponse]` instead of `list[MessageModelResponse]`.
- Adjusted response model validation to use `MessageResponse`.

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
This commit is contained in:
Nicolò Boschi 2024-09-26 13:48:35 +02:00 committed by GitHub
commit 616c01813e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 77 additions and 751 deletions

1
poetry.lock generated
View file

@ -5071,7 +5071,6 @@ crewai = "^0.36.0"
cryptography = ">=42.0.5,<44.0.0"
diskcache = "^5.6.3"
docstring-parser = "^0.16"
duckdb = "^1.0.0"
emoji = "^2.12.0"
fastapi = "^0.111.0"
filelock = "^3.15.4"

View file

@ -4,6 +4,7 @@ from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import delete
from sqlmodel import Session, col, select
from langflow.schema.message import MessageResponse
from langflow.services.auth.utils import get_current_active_user
from langflow.services.database.models.message.model import MessageRead, MessageTable, MessageUpdate
from langflow.services.database.models.transactions.crud import get_transactions_by_flow_id
@ -15,7 +16,6 @@ from langflow.services.database.models.vertex_builds.crud import (
)
from langflow.services.database.models.vertex_builds.model import VertexBuildMapModel
from langflow.services.deps import get_session
from langflow.services.monitor.schema import MessageModelResponse
router = APIRouter(prefix="/monitor", tags=["Monitor"])
@ -43,7 +43,7 @@ async def delete_vertex_builds(
raise HTTPException(status_code=500, detail=str(e))
@router.get("/messages", response_model=list[MessageModelResponse])
@router.get("/messages", response_model=list[MessageResponse])
async def get_messages(
flow_id: str | None = Query(None),
session_id: str | None = Query(None),
@ -66,7 +66,7 @@ async def get_messages(
col = getattr(MessageTable, order_by).asc()
stmt = stmt.order_by(col)
messages = session.exec(stmt)
return [MessageModelResponse.model_validate(d, from_attributes=True) for d in messages]
return [MessageResponse.model_validate(d, from_attributes=True) for d in messages]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

View file

@ -1,4 +1,5 @@
import asyncio
import json
from collections.abc import AsyncIterator, Iterator
from datetime import datetime, timezone
from typing import Annotated, Any
@ -11,7 +12,7 @@ from langchain_core.prompt_values import ImagePromptValue
from langchain_core.prompts import BaseChatPromptTemplate, ChatPromptTemplate, PromptTemplate
from langchain_core.prompts.image import ImagePromptTemplate
from loguru import logger
from pydantic import BeforeValidator, ConfigDict, Field, field_serializer, field_validator
from pydantic import BaseModel, BeforeValidator, ConfigDict, Field, field_serializer, field_validator
from langflow.base.prompts.utils import dict_values_to_string
from langflow.schema.data import Data
@ -248,3 +249,68 @@ class Message(Data):
return asyncio.run(cls.from_template_and_variables(template, **variables))
else:
return loop.run_until_complete(cls.from_template_and_variables(template, **variables))
class DefaultModel(BaseModel):
class Config:
from_attributes = True
populate_by_name = True
json_encoders = {
datetime: lambda v: v.isoformat(),
}
def json(self, **kwargs):
# Usa a função de serialização personalizada
return super().model_dump_json(**kwargs, encoder=self.custom_encoder)
@staticmethod
def custom_encoder(obj):
if isinstance(obj, datetime):
return obj.isoformat()
raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable")
class MessageResponse(DefaultModel):
id: str | UUID | None = Field(default=None)
flow_id: UUID | None = Field(default=None)
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
sender: str
sender_name: str
session_id: str
text: str
files: list[str] = []
@field_validator("files", mode="before")
@classmethod
def validate_files(cls, v):
if isinstance(v, str):
v = json.loads(v)
return v
@field_serializer("timestamp")
@classmethod
def serialize_timestamp(cls, v):
v = v.replace(microsecond=0)
return v.strftime("%Y-%m-%d %H:%M:%S")
@field_serializer("files")
@classmethod
def serialize_files(cls, v):
if isinstance(v, list):
return json.dumps(v)
return v
@classmethod
def from_message(cls, message: Message, flow_id: str | None = None):
# first check if the record has all the required fields
if message.text is None or not message.sender or not message.sender_name:
raise ValueError("The message does not have the required fields (text, sender, sender_name).")
return cls(
sender=message.sender,
sender_name=message.sender_name,
text=message.text,
session_id=message.session_id,
files=message.files or [],
timestamp=message.timestamp,
flow_id=flow_id,
)

View file

@ -19,8 +19,6 @@ from langflow.services.database.models.user.crud import get_user_by_username
from langflow.services.database.utils import (
Result,
TableResults,
migrate_messages_from_monitor_service_to_database,
migrate_transactions_from_monitor_service_to_database,
)
from langflow.services.deps import get_settings_service
from langflow.services.utils import teardown_superuser
@ -215,14 +213,6 @@ class DatabaseService(Service):
logger.error(f"AutogenerateDiffsDetected: {exc}")
if not fix:
raise RuntimeError(f"There's a mismatch between the models and the database.\n{exc}")
try:
migrate_messages_from_monitor_service_to_database(session)
except Exception as exc:
logger.error(f"Error migrating messages from monitor service to database: {exc}")
try:
migrate_transactions_from_monitor_service_to_database(session)
except Exception as exc:
logger.error(f"Error migrating transactions from monitor service to database: {exc}")
if fix:
self.try_downgrade_upgrade_until_success(alembic_cfg)

View file

@ -1,91 +1,17 @@
from __future__ import annotations
import json
from contextlib import contextmanager
from dataclasses import dataclass
from typing import TYPE_CHECKING
from alembic.util.exc import CommandError
from loguru import logger
from sqlmodel import Session, select, text
from langflow.services.database.models import TransactionTable
from langflow.services.deps import get_monitor_service
from sqlmodel import Session, text
if TYPE_CHECKING:
from langflow.services.database.service import DatabaseService
def migrate_messages_from_monitor_service_to_database(session: Session) -> bool:
from langflow.schema.message import Message
from langflow.services.database.models.message import MessageTable
try:
monitor_service = get_monitor_service()
messages_df = monitor_service.get_messages()
except Exception as e:
if "Table with name messages does not exist" in str(e):
logger.debug(f"Error retrieving messages from monitor service: {e}")
else:
logger.warning(f"Error retrieving messages from monitor service: {e}")
return False
if messages_df.empty:
logger.info("No messages to migrate.")
return True
original_messages: list[dict] = messages_df.to_dict(orient="records")
db_messages = session.exec(select(MessageTable)).all()
db_messages = [msg[0] for msg in db_messages] # type: ignore
db_msg_dict = {(msg.text, msg.timestamp.isoformat(), str(msg.flow_id), msg.session_id): msg for msg in db_messages}
# Filter out messages that already exist in the database
original_messages_filtered = []
for message in original_messages:
key = (message["text"], message["timestamp"].isoformat(), str(message["flow_id"]), message["session_id"])
if key not in db_msg_dict:
original_messages_filtered.append(message)
if not original_messages_filtered:
logger.info("No messages to migrate.")
return True
try:
# Bulk insert messages
session.bulk_insert_mappings(
MessageTable, # type: ignore
[MessageTable.from_message(Message(**msg)).model_dump() for msg in original_messages_filtered],
)
session.commit()
except Exception as e:
logger.error(f"Error during message insertion: {str(e)}")
session.rollback()
return False
# Create a dictionary for faster lookup
all_ok = True
for orig_msg in original_messages_filtered:
key = (orig_msg["text"], orig_msg["timestamp"].isoformat(), str(orig_msg["flow_id"]), orig_msg["session_id"])
matching_db_msg = db_msg_dict.get(key)
if matching_db_msg is None:
logger.warning(f"Message not found in database: {orig_msg}")
all_ok = False
else:
# Validate other fields
if any(getattr(matching_db_msg, k) != v for k, v in orig_msg.items() if k != "index"):
logger.warning(f"Message mismatch in database: {orig_msg}")
all_ok = False
if all_ok:
messages_ids = [message["index"] for message in original_messages]
monitor_service.delete_messages(messages_ids)
logger.info("Migration completed successfully. Original messages deleted.")
else:
logger.warning("Migration completed with errors. Original messages not deleted.")
return all_ok
def initialize_database(fix_migration: bool = False):
logger.debug("Initializing database")
from langflow.services.deps import get_db_service
@ -153,39 +79,3 @@ class Result:
class TableResults:
table_name: str
results: list[Result]
def migrate_transactions_from_monitor_service_to_database(session: Session) -> None:
try:
monitor_service = get_monitor_service()
batch = monitor_service.get_transactions()
except Exception as e:
if "Table with name transactions does not exist" in str(e):
logger.debug(f"Error retrieving transactions from monitor service: {e}")
else:
logger.warning(f"Error retrieving transactions from monitor service: {e}")
return
if not batch:
logger.debug("No transactions to migrate.")
return
to_delete = []
while batch:
logger.debug(f"Migrating {len(batch)} transactions")
for row in batch:
tt = TransactionTable(
flow_id=row["flow_id"],
status=row["status"],
error=row["error"],
timestamp=row["timestamp"],
vertex_id=row["vertex_id"],
inputs=json.loads(row["inputs"]) if row["inputs"] else None,
outputs=json.loads(row["outputs"]) if row["outputs"] else None,
target_id=row["target_id"],
)
to_delete.append(row["index"])
session.add(tt)
session.commit()
monitor_service.delete_transactions(to_delete)
batch = monitor_service.get_transactions()
logger.debug("Transactions migrations completed.")

View file

@ -12,7 +12,6 @@ if TYPE_CHECKING:
from langflow.services.cache.service import CacheService
from langflow.services.chat.service import ChatService
from langflow.services.database.service import DatabaseService
from langflow.services.monitor.service import MonitorService
from langflow.services.plugins.service import PluginService
from langflow.services.session.service import SessionService
from langflow.services.settings.service import SettingsService
@ -220,18 +219,6 @@ def get_session_service() -> "SessionService":
return get_service(ServiceType.SESSION_SERVICE, SessionServiceFactory()) # type: ignore
def get_monitor_service() -> "MonitorService":
"""
Retrieves the MonitorService instance from the service manager.
Returns:
MonitorService: The MonitorService instance.
"""
from langflow.services.monitor.factory import MonitorServiceFactory
return get_service(ServiceType.MONITOR_SERVICE, MonitorServiceFactory()) # type: ignore
def get_task_service() -> "TaskService":
"""
Retrieves the TaskService instance from the service manager.

View file

@ -1,13 +0,0 @@
from langflow.services.factory import ServiceFactory
from langflow.services.monitor.service import MonitorService
from langflow.services.settings.service import SettingsService
class MonitorServiceFactory(ServiceFactory):
name = "monitor_service"
def __init__(self):
super().__init__(MonitorService)
def create(self, settings_service: SettingsService):
return self.service_class(settings_service)

View file

@ -1,258 +0,0 @@
import json
from datetime import datetime, timezone
from typing import Any
from uuid import UUID
from pydantic import BaseModel, Field, field_serializer, field_validator
from langflow.schema.message import Message
class DefaultModel(BaseModel):
class Config:
from_attributes = True
populate_by_name = True
json_encoders = {
datetime: lambda v: v.isoformat(),
}
def json(self, **kwargs):
# Usa a função de serialização personalizada
return super().model_dump_json(**kwargs, encoder=self.custom_encoder)
@staticmethod
def custom_encoder(obj):
if isinstance(obj, datetime):
return obj.isoformat()
raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable")
class TransactionModel(DefaultModel):
index: int | None = Field(default=None)
timestamp: datetime | None = Field(default_factory=datetime.now, alias="timestamp")
vertex_id: str
target_id: str | None = None
inputs: dict
outputs: dict | None = None
status: str
error: str | None = None
flow_id: str | None = Field(default=None, alias="flow_id")
# validate target_args in case it is a JSON
@field_validator("outputs", "inputs", mode="before")
def validate_target_args(cls, v):
if isinstance(v, str):
return json.loads(v)
return v
@field_serializer("outputs", "inputs")
def serialize_target_args(v):
if isinstance(v, dict):
return json.dumps(v)
return v
class TransactionModelResponse(DefaultModel):
index: int | None = Field(default=None)
timestamp: datetime | None = Field(default_factory=datetime.now, alias="timestamp")
vertex_id: str
inputs: dict
outputs: dict | None = None
status: str
error: str | None = None
flow_id: str | None = Field(default=None, alias="flow_id")
source: str | None = None
target: str | None = None
# validate target_args in case it is a JSON
@field_validator("outputs", "inputs", mode="before")
def validate_target_args(cls, v):
if isinstance(v, str):
return json.loads(v)
return v
@field_validator("index", mode="before")
def validate_id(cls, v):
if isinstance(v, float):
try:
return int(v)
except ValueError:
return None
return v
class DuckDbMessageModel(DefaultModel):
index: int | None = Field(default=None, alias="index")
flow_id: str | None = Field(default=None, alias="flow_id")
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
sender: str
sender_name: str
session_id: str
text: str
files: list[str] = []
@field_validator("files", mode="before")
@classmethod
def validate_files(cls, v):
if isinstance(v, str):
v = json.loads(v)
return v
@field_serializer("timestamp")
@classmethod
def serialize_timestamp(cls, v):
v = v.replace(microsecond=0)
return v.strftime("%Y-%m-%d %H:%M:%S")
@field_serializer("files")
@classmethod
def serialize_files(cls, v):
if isinstance(v, list):
return json.dumps(v)
return v
@classmethod
def from_message(cls, message: Message, flow_id: str | None = None):
# first check if the record has all the required fields
if message.text is None or not message.sender or not message.sender_name:
raise ValueError("The message does not have the required fields (text, sender, sender_name).")
return cls(
sender=message.sender,
sender_name=message.sender_name,
text=message.text,
session_id=message.session_id,
files=message.files or [],
timestamp=message.timestamp,
flow_id=flow_id,
)
class MessageModel(DefaultModel):
id: str | UUID | None = Field(default=None)
flow_id: UUID | None = Field(default=None)
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
sender: str
sender_name: str
session_id: str
text: str
files: list[str] = []
@field_validator("files", mode="before")
@classmethod
def validate_files(cls, v):
if isinstance(v, str):
v = json.loads(v)
return v
@field_serializer("timestamp")
@classmethod
def serialize_timestamp(cls, v):
v = v.replace(microsecond=0)
return v.strftime("%Y-%m-%d %H:%M:%S")
@field_serializer("files")
@classmethod
def serialize_files(cls, v):
if isinstance(v, list):
return json.dumps(v)
return v
@classmethod
def from_message(cls, message: Message, flow_id: str | None = None):
# first check if the record has all the required fields
if message.text is None or not message.sender or not message.sender_name:
raise ValueError("The message does not have the required fields (text, sender, sender_name).")
return cls(
sender=message.sender,
sender_name=message.sender_name,
text=message.text,
session_id=message.session_id,
files=message.files or [],
timestamp=message.timestamp,
flow_id=flow_id,
)
class MessageModelResponse(MessageModel):
pass
class MessageModelRequest(MessageModel):
text: str = Field(default="")
sender: str = Field(default="")
sender_name: str = Field(default="")
session_id: str = Field(default="")
class VertexBuildModel(DefaultModel):
index: int | None = Field(default=None, alias="index", exclude=True)
id: str | None = Field(default=None, alias="id")
flow_id: str
valid: bool
params: Any
data: dict
artifacts: dict
timestamp: datetime = Field(default_factory=datetime.now)
@field_serializer("data", "artifacts")
def serialize_dict(v):
if isinstance(v, dict):
# check if the value of each key is a BaseModel or a list of BaseModels
for key, value in v.items():
if isinstance(value, BaseModel):
v[key] = value.model_dump()
elif isinstance(value, list) and all(isinstance(i, BaseModel) for i in value):
v[key] = [i.model_dump() for i in value]
return json.dumps(v, default=str)
elif isinstance(v, BaseModel):
return v.model_dump_json()
return v
@field_validator("params", mode="before")
def validate_params(cls, v):
if isinstance(v, str):
try:
return json.loads(v)
except json.JSONDecodeError:
return v
return v
@field_serializer("params")
def serialize_params(v):
if isinstance(v, list) and all(isinstance(i, BaseModel) for i in v):
return json.dumps([i.model_dump() for i in v])
return v
@field_validator("data", mode="before")
def validate_data(cls, v):
if isinstance(v, str):
return json.loads(v)
return v
@field_validator("artifacts", mode="before")
def validate_artifacts(cls, v):
if isinstance(v, str):
return json.loads(v)
elif isinstance(v, BaseModel):
return v.model_dump()
return v
class VertexBuildResponseModel(VertexBuildModel):
@field_serializer("data", "artifacts")
def serialize_dict(v):
return v
class VertexBuildMapModel(BaseModel):
vertex_builds: dict[str, list[VertexBuildResponseModel]]
@classmethod
def from_list_of_dicts(cls, vertex_build_dicts):
vertex_build_map = {}
for vertex_build_dict in vertex_build_dicts:
vertex_build = VertexBuildResponseModel(**vertex_build_dict)
if vertex_build.id not in vertex_build_map:
vertex_build_map[vertex_build.id] = []
vertex_build_map[vertex_build.id].append(vertex_build)
return cls(vertex_builds=vertex_build_map)

View file

@ -1,91 +0,0 @@
from pathlib import Path
from typing import TYPE_CHECKING
from platformdirs import user_cache_dir
from langflow.services.base import Service
from langflow.services.monitor.utils import (
new_duckdb_locked_connection,
)
if TYPE_CHECKING:
from langflow.services.settings.service import SettingsService
class MonitorService(Service):
"""
Deprecated. Still connecting to duckdb to migrate old installations.
"""
name = "monitor_service"
def __init__(self, settings_service: "SettingsService"):
self.settings_service = settings_service
self.base_cache_dir = Path(user_cache_dir("langflow"), ensure_exists=True)
self.db_path = self.base_cache_dir / "monitor.duckdb"
def exec_query(self, query: str, read_only: bool = False):
with new_duckdb_locked_connection(self.db_path, read_only=read_only) as conn:
return conn.execute(query).df()
def get_messages(
self,
flow_id: str | None = None,
sender: str | None = None,
sender_name: str | None = None,
session_id: str | None = None,
order_by: str | None = "timestamp",
order: str | None = "DESC",
limit: int | None = None,
):
query = "SELECT index, flow_id, sender_name, sender, session_id, text, files, timestamp FROM messages"
conditions = []
if sender:
conditions.append(f"sender = '{sender}'")
if sender_name:
conditions.append(f"sender_name = '{sender_name}'")
if session_id:
conditions.append(f"session_id = '{session_id}'")
if flow_id:
conditions.append(f"flow_id = '{flow_id}'")
if conditions:
query += " WHERE " + " AND ".join(conditions)
if order_by and order:
# Make sure the order is from newest to oldest
query += f" ORDER BY {order_by} {order.upper()}"
if limit is not None:
query += f" LIMIT {limit}"
with new_duckdb_locked_connection(self.db_path, read_only=True) as conn:
df = conn.execute(query).df()
return df
def delete_messages(self, message_ids: list[int] | str):
if isinstance(message_ids, list):
# If message_ids is a list, join the string representations of the integers
ids_str = ",".join(map(str, message_ids))
elif isinstance(message_ids, str):
# If message_ids is already a string, use it directly
ids_str = message_ids
else:
raise ValueError("message_ids must be a list of integers or a string")
query = f"DELETE FROM messages WHERE index IN ({ids_str})"
return self.exec_query(query, read_only=False)
def get_transactions(self, limit: int = 100):
query = f"SELECT index,flow_id, status, error, timestamp, vertex_id, inputs, outputs, target_id FROM transactions LIMIT {str(limit)}"
with new_duckdb_locked_connection(self.db_path, read_only=True) as conn:
df = conn.execute(query).df()
return df.to_dict(orient="records")
def delete_transactions(self, ids: list[int]) -> None:
with new_duckdb_locked_connection(self.db_path, read_only=False) as conn:
conn.execute(f"DELETE FROM transactions WHERE index in ({','.join(map(str, ids))})")
conn.commit()

View file

@ -1,125 +0,0 @@
from contextlib import contextmanager
from pathlib import Path
from typing import TYPE_CHECKING, Any
import duckdb
from loguru import logger
from pydantic import BaseModel
from langflow.utils.concurrency import KeyedWorkerLockManager
if TYPE_CHECKING:
pass
INDEX_KEY = "index"
worker_lock_manager = KeyedWorkerLockManager()
def get_table_schema_as_dict(conn: duckdb.DuckDBPyConnection, table_name: str) -> dict:
result = conn.execute(f"PRAGMA table_info('{table_name}')").fetchall()
schema = {row[1]: row[2].upper() for row in result}
return schema
def model_to_sql_column_definitions(model: type[BaseModel]) -> dict:
columns = {}
for field_name, field_type in model.model_fields.items():
if hasattr(field_type.annotation, "__args__") and field_type.annotation is not None:
field_args = field_type.annotation.__args__
else:
field_args = []
field_info = field_args[0] if field_args else field_type.annotation
if field_info.__name__ == "int":
sql_type = "INTEGER"
elif field_info.__name__ == "str":
sql_type = "VARCHAR"
elif field_info.__name__ == "datetime":
sql_type = "TIMESTAMP"
elif field_info.__name__ == "bool":
sql_type = "BOOLEAN"
elif field_info.__name__ == "dict":
sql_type = "VARCHAR"
elif field_info.__name__ == "Any":
sql_type = "VARCHAR"
else:
continue # Skip types we don't handle
columns[field_name] = sql_type
return columns
def drop_and_create_table_if_schema_mismatch(db_path: str, table_name: str, model: type[BaseModel]):
with new_duckdb_locked_connection(db_path) as conn:
# Get the current schema from the database
try:
current_schema = get_table_schema_as_dict(conn, table_name)
except duckdb.CatalogException:
current_schema = {}
# Get the desired schema from the model
desired_schema = model_to_sql_column_definitions(model)
# Compare the current and desired schemas
if current_schema != desired_schema:
# If they don't match, drop the existing table and create a new one
logger.warning(f"Schema mismatch for duckdb table {table_name}. Dropping and recreating table.")
logger.debug(f"Current schema: {str(current_schema)}")
logger.debug(f"Desired schema: {str(desired_schema)}")
conn.execute(f"DROP TABLE IF EXISTS {table_name}")
if INDEX_KEY in desired_schema.keys():
# Create a sequence for the id column
try:
conn.execute(f"CREATE SEQUENCE seq_{table_name} START 1;")
except duckdb.CatalogException:
pass
desired_schema[INDEX_KEY] = f"INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq_{table_name}')"
columns_sql = ", ".join(f"{name} {data_type}" for name, data_type in desired_schema.items())
create_table_sql = f"CREATE TABLE {table_name} ({columns_sql})"
conn.execute(create_table_sql)
@contextmanager
def new_duckdb_locked_connection(db_path: str | Path, read_only=False):
with worker_lock_manager.lock("duckdb"):
with duckdb.connect(str(db_path), read_only=read_only) as conn:
yield conn
def add_row_to_table(
conn: duckdb.DuckDBPyConnection,
table_name: str,
model: type[BaseModel],
monitor_data: dict[str, Any] | BaseModel,
):
# Validate the data with the Pydantic model
if isinstance(monitor_data, model):
validated_data = monitor_data
else:
validated_data = model(**monitor_data)
# Extract data for the insert statement
validated_dict = validated_data.model_dump()
keys = [key for key in validated_dict.keys() if key != INDEX_KEY]
columns = ", ".join(keys)
values_placeholders = ", ".join(["?" for _ in keys])
values = [validated_dict[key] for key in keys]
# Create the insert statement
insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({values_placeholders})"
# Execute the insert statement
try:
conn.execute(insert_sql, values)
except Exception as e:
# Log values types
column_error_message = ""
for key, value in validated_dict.items():
logger.error(f"{key}: {type(value)}")
if str(value) in str(e):
column_error_message = f"Column: {key} Value: {value} Error: {e}"
if column_error_message:
logger.error(f"Error adding row to {table_name}: {column_error_message}")
else:
logger.error(f"Error adding row to {table_name}: {e}")

View file

@ -18,7 +18,6 @@ class ServiceType(str, Enum):
STORE_SERVICE = "store_service"
VARIABLE_SERVICE = "variable_service"
STORAGE_SERVICE = "storage_service"
MONITOR_SERVICE = "monitor_service"
# SOCKETIO_SERVICE = "socket_service"
STATE_SERVICE = "state_service"
TRACING_SERVICE = "tracing_service"

View file

@ -4,7 +4,6 @@ from langflow.services.factory import ServiceFactory
from langflow.services.tracing.service import TracingService
if TYPE_CHECKING:
from langflow.services.monitor.service import MonitorService
from langflow.services.settings.service import SettingsService
@ -12,5 +11,5 @@ class TracingServiceFactory(ServiceFactory):
def __init__(self):
super().__init__(TracingService)
def create(self, settings_service: "SettingsService", monitor_service: "MonitorService"):
return TracingService(settings_service, monitor_service)
def create(self, settings_service: "SettingsService"):
return TracingService(settings_service)

View file

@ -16,7 +16,6 @@ if TYPE_CHECKING:
from langflow.custom.custom_component.component import Component
from langflow.graph.vertex.base import Vertex
from langflow.services.monitor.service import MonitorService
from langflow.services.settings.service import SettingsService
@ -41,9 +40,8 @@ def _get_langfuse_tracer():
class TracingService(Service):
name = "tracing_service"
def __init__(self, settings_service: "SettingsService", monitor_service: "MonitorService"):
def __init__(self, settings_service: "SettingsService"):
self.settings_service = settings_service
self.monitor_service = monitor_service
self.inputs: dict[str, dict] = defaultdict(dict)
self.inputs_metadata: dict[str, dict] = defaultdict(dict)
self.outputs: dict[str, dict] = defaultdict(dict)

View file

@ -1240,61 +1240,6 @@ files = [
{file = "docstring_parser-0.16.tar.gz", hash = "sha256:538beabd0af1e2db0146b6bd3caa526c35a34d61af9fd2887f3a8a27a739aa6e"},
]
[[package]]
name = "duckdb"
version = "1.0.0"
description = "DuckDB in-process database"
optional = false
python-versions = ">=3.7.0"
files = [
{file = "duckdb-1.0.0-cp310-cp310-macosx_12_0_arm64.whl", hash = "sha256:4a8ce2d1f9e1c23b9bab3ae4ca7997e9822e21563ff8f646992663f66d050211"},
{file = "duckdb-1.0.0-cp310-cp310-macosx_12_0_universal2.whl", hash = "sha256:19797670f20f430196e48d25d082a264b66150c264c1e8eae8e22c64c2c5f3f5"},
{file = "duckdb-1.0.0-cp310-cp310-macosx_12_0_x86_64.whl", hash = "sha256:b71c342090fe117b35d866a91ad6bffce61cd6ff3e0cff4003f93fc1506da0d8"},
{file = "duckdb-1.0.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:25dd69f44ad212c35ae2ea736b0e643ea2b70f204b8dff483af1491b0e2a4cec"},
{file = "duckdb-1.0.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8da5f293ecb4f99daa9a9352c5fd1312a6ab02b464653a0c3a25ab7065c45d4d"},
{file = "duckdb-1.0.0-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:3207936da9967ddbb60644ec291eb934d5819b08169bc35d08b2dedbe7068c60"},
{file = "duckdb-1.0.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:1128d6c9c33e883b1f5df6b57c1eb46b7ab1baf2650912d77ee769aaa05111f9"},
{file = "duckdb-1.0.0-cp310-cp310-win_amd64.whl", hash = "sha256:02310d263474d0ac238646677feff47190ffb82544c018b2ff732a4cb462c6ef"},
{file = "duckdb-1.0.0-cp311-cp311-macosx_12_0_arm64.whl", hash = "sha256:75586791ab2702719c284157b65ecefe12d0cca9041da474391896ddd9aa71a4"},
{file = "duckdb-1.0.0-cp311-cp311-macosx_12_0_universal2.whl", hash = "sha256:83bb415fc7994e641344f3489e40430ce083b78963cb1057bf714ac3a58da3ba"},
{file = "duckdb-1.0.0-cp311-cp311-macosx_12_0_x86_64.whl", hash = "sha256:bee2e0b415074e84c5a2cefd91f6b5ebeb4283e7196ba4ef65175a7cef298b57"},
{file = "duckdb-1.0.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fa5a4110d2a499312609544ad0be61e85a5cdad90e5b6d75ad16b300bf075b90"},
{file = "duckdb-1.0.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5fa389e6a382d4707b5f3d1bc2087895925ebb92b77e9fe3bfb23c9b98372fdc"},
{file = "duckdb-1.0.0-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:7ede6f5277dd851f1a4586b0c78dc93f6c26da45e12b23ee0e88c76519cbdbe0"},
{file = "duckdb-1.0.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:0b88cdbc0d5c3e3d7545a341784dc6cafd90fc035f17b2f04bf1e870c68456e5"},
{file = "duckdb-1.0.0-cp311-cp311-win_amd64.whl", hash = "sha256:fd1693cdd15375156f7fff4745debc14e5c54928589f67b87fb8eace9880c370"},
{file = "duckdb-1.0.0-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:c65a7fe8a8ce21b985356ee3ec0c3d3b3b2234e288e64b4cfb03356dbe6e5583"},
{file = "duckdb-1.0.0-cp312-cp312-macosx_12_0_universal2.whl", hash = "sha256:e5a8eda554379b3a43b07bad00968acc14dd3e518c9fbe8f128b484cf95e3d16"},
{file = "duckdb-1.0.0-cp312-cp312-macosx_12_0_x86_64.whl", hash = "sha256:a1b6acdd54c4a7b43bd7cb584975a1b2ff88ea1a31607a2b734b17960e7d3088"},
{file = "duckdb-1.0.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a677bb1b6a8e7cab4a19874249d8144296e6e39dae38fce66a80f26d15e670df"},
{file = "duckdb-1.0.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:752e9d412b0a2871bf615a2ede54be494c6dc289d076974eefbf3af28129c759"},
{file = "duckdb-1.0.0-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:3aadb99d098c5e32d00dc09421bc63a47134a6a0de9d7cd6abf21780b678663c"},
{file = "duckdb-1.0.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:83b7091d4da3e9301c4f9378833f5ffe934fb1ad2b387b439ee067b2c10c8bb0"},
{file = "duckdb-1.0.0-cp312-cp312-win_amd64.whl", hash = "sha256:6a8058d0148b544694cb5ea331db44f6c2a00a7b03776cc4dd1470735c3d5ff7"},
{file = "duckdb-1.0.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e40cb20e5ee19d44bc66ec99969af791702a049079dc5f248c33b1c56af055f4"},
{file = "duckdb-1.0.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d7bce1bc0de9af9f47328e24e6e7e39da30093179b1c031897c042dd94a59c8e"},
{file = "duckdb-1.0.0-cp37-cp37m-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:8355507f7a04bc0a3666958f4414a58e06141d603e91c0fa5a7c50e49867fb6d"},
{file = "duckdb-1.0.0-cp37-cp37m-musllinux_1_2_x86_64.whl", hash = "sha256:39f1a46f5a45ad2886dc9b02ce5b484f437f90de66c327f86606d9ba4479d475"},
{file = "duckdb-1.0.0-cp37-cp37m-win_amd64.whl", hash = "sha256:a6d29ba477b27ae41676b62c8fae8d04ee7cbe458127a44f6049888231ca58fa"},
{file = "duckdb-1.0.0-cp38-cp38-macosx_12_0_arm64.whl", hash = "sha256:1bea713c1925918714328da76e79a1f7651b2b503511498ccf5e007a7e67d49e"},
{file = "duckdb-1.0.0-cp38-cp38-macosx_12_0_universal2.whl", hash = "sha256:bfe67f3bcf181edbf6f918b8c963eb060e6aa26697d86590da4edc5707205450"},
{file = "duckdb-1.0.0-cp38-cp38-macosx_12_0_x86_64.whl", hash = "sha256:dbc6093a75242f002be1d96a6ace3fdf1d002c813e67baff52112e899de9292f"},
{file = "duckdb-1.0.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ba1881a2b11c507cee18f8fd9ef10100be066fddaa2c20fba1f9a664245cd6d8"},
{file = "duckdb-1.0.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:445d0bb35087c522705c724a75f9f1c13f1eb017305b694d2686218d653c8142"},
{file = "duckdb-1.0.0-cp38-cp38-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:224553432e84432ffb9684f33206572477049b371ce68cc313a01e214f2fbdda"},
{file = "duckdb-1.0.0-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:d3914032e47c4e76636ad986d466b63fdea65e37be8a6dfc484ed3f462c4fde4"},
{file = "duckdb-1.0.0-cp38-cp38-win_amd64.whl", hash = "sha256:af9128a2eb7e1bb50cd2c2020d825fb2946fdad0a2558920cd5411d998999334"},
{file = "duckdb-1.0.0-cp39-cp39-macosx_12_0_arm64.whl", hash = "sha256:dd2659a5dbc0df0de68f617a605bf12fe4da85ba24f67c08730984a0892087e8"},
{file = "duckdb-1.0.0-cp39-cp39-macosx_12_0_universal2.whl", hash = "sha256:ac5a4afb0bc20725e734e0b2c17e99a274de4801aff0d4e765d276b99dad6d90"},
{file = "duckdb-1.0.0-cp39-cp39-macosx_12_0_x86_64.whl", hash = "sha256:2c5a53bee3668d6e84c0536164589d5127b23d298e4c443d83f55e4150fafe61"},
{file = "duckdb-1.0.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b980713244d7708b25ee0a73de0c65f0e5521c47a0e907f5e1b933d79d972ef6"},
{file = "duckdb-1.0.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:21cbd4f9fe7b7a56eff96c3f4d6778770dd370469ca2212eddbae5dd63749db5"},
{file = "duckdb-1.0.0-cp39-cp39-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ed228167c5d49888c5ef36f6f9cbf65011c2daf9dcb53ea8aa7a041ce567b3e4"},
{file = "duckdb-1.0.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:46d8395fbcea7231fd5032a250b673cc99352fef349b718a23dea2c0dd2b8dec"},
{file = "duckdb-1.0.0-cp39-cp39-win_amd64.whl", hash = "sha256:6ad1fc1a4d57e7616944166a5f9417bdbca1ea65c490797e3786e3a42e162d8a"},
{file = "duckdb-1.0.0.tar.gz", hash = "sha256:a2a059b77bc7d5b76ae9d88e267372deff19c291048d59450c431e166233d453"},
]
[[package]]
name = "ecdsa"
version = "0.19.0"
@ -7307,4 +7252,4 @@ local = []
[metadata]
lock-version = "2.0"
python-versions = ">=3.10,<3.13"
content-hash = "9f69bf4c65c458a2d9c5aafb49010211e8624971f0a6b7af493fa80920f4b8de"
content-hash = "14036da5c48fac610cfba954acd19cb2e1356c593bbcee917b892a64084528ad"

View file

@ -57,7 +57,6 @@ docstring-parser = "^0.16"
python-jose = "^3.3.0"
pandas = "2.2.2"
multiprocess = "^0.70.14"
duckdb = "^1.0.0"
python-docx = "^1.1.0"
jq = { version = "^1.7.0", markers = "sys_platform != 'win32'" }
pypdf = "^4.2.0"

View file

@ -13,15 +13,8 @@ from langflow.graph.utils import log_transaction, log_vertex_build
from langflow.initial_setup.setup import load_flows_from_directory, load_starter_projects
from langflow.services.database.models.base import orjson_dumps
from langflow.services.database.models.flow import Flow, FlowCreate, FlowUpdate
from langflow.services.database.models.transactions.crud import get_transactions_by_flow_id
from langflow.services.database.utils import migrate_transactions_from_monitor_service_to_database, session_getter
from langflow.services.deps import get_db_service, get_monitor_service, session_scope
from langflow.services.monitor.schema import TransactionModel
from langflow.services.monitor.utils import (
add_row_to_table,
drop_and_create_table_if_schema_mismatch,
new_duckdb_locked_connection,
)
from langflow.services.database.utils import session_getter
from langflow.services.deps import get_db_service
@pytest.fixture(scope="module")
@ -429,58 +422,6 @@ def test_load_flows(client: TestClient, load_flows_dir):
assert response.json()["name"] == "BasicExample"
@pytest.mark.load_flows
def test_migrate_transactions(client: TestClient):
monitor_service = get_monitor_service()
drop_and_create_table_if_schema_mismatch(str(monitor_service.db_path), "transactions", TransactionModel)
flow_id = "c54f9130-f2fa-4a3e-b22a-3856d946351b"
data = {
"vertex_id": "vid",
"target_id": "tid",
"inputs": {"input_value": True},
"outputs": {"output_value": True},
"timestamp": "2021-10-10T10:10:10",
"status": "success",
"error": None,
"flow_id": flow_id,
}
with new_duckdb_locked_connection(str(monitor_service.db_path), read_only=False) as conn:
add_row_to_table(conn, "transactions", TransactionModel, data)
assert 1 == len(monitor_service.get_transactions())
with session_scope() as session:
migrate_transactions_from_monitor_service_to_database(session)
new_trans = get_transactions_by_flow_id(session, UUID(flow_id))
assert 1 == len(new_trans)
t = new_trans[0]
assert t.error is None
assert t.inputs == data["inputs"]
assert t.outputs == data["outputs"]
assert t.status == data["status"]
assert str(t.timestamp) == "2021-10-10 10:10:10"
assert t.vertex_id == data["vertex_id"]
assert t.target_id == data["target_id"]
assert t.flow_id == UUID(flow_id)
assert 0 == len(monitor_service.get_transactions())
client.request("DELETE", f"api/v1/flows/{flow_id}")
with session_scope() as session:
new_trans = get_transactions_by_flow_id(session, UUID(flow_id))
assert 0 == len(new_trans)
@pytest.mark.load_flows
def test_migrate_transactions_no_duckdb(client: TestClient):
flow_id = "c54f9130-f2fa-4a3e-b22a-3856d946351b"
get_monitor_service()
with session_scope() as session:
migrate_transactions_from_monitor_service_to_database(session)
new_trans = get_transactions_by_flow_id(session, UUID(flow_id))
assert 0 == len(new_trans)
def test_sqlite_pragmas():
db_service = get_db_service()