From 616c01813e533da02b3a684e55ef6c868f74c208 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Thu, 26 Sep 2024 13:48:35 +0200 Subject: [PATCH] chore: drop duckdb usage and migrations (#3730) * chore: drop duckdb usage and migrations * [autofix.ci] apply automated fixes * Add DefaultModel and MessageResponse classes with custom JSON serialization and validation - Introduced `DefaultModel` class with custom JSON encoders and serialization methods. - Added `MessageResponse` class inheriting from `DefaultModel` with fields for message details and custom validators/serializers. - Enhanced file handling and timestamp formatting in `MessageResponse`. * Refactor: Replace `MessageModelResponse` with `MessageResponse` in monitor API - Updated import statements to use `MessageResponse` from `langflow.schema.message`. - Modified `/messages` endpoint to return `list[MessageResponse]` instead of `list[MessageModelResponse]`. - Adjusted response model validation to use `MessageResponse`. --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Gabriel Luiz Freitas Almeida --- poetry.lock | 1 - src/backend/base/langflow/api/v1/monitor.py | 6 +- src/backend/base/langflow/schema/message.py | 68 ++++- .../langflow/services/database/service.py | 10 - .../base/langflow/services/database/utils.py | 112 +------- src/backend/base/langflow/services/deps.py | 13 - .../langflow/services/monitor/__init__.py | 0 .../base/langflow/services/monitor/factory.py | 13 - .../base/langflow/services/monitor/schema.py | 258 ------------------ .../base/langflow/services/monitor/service.py | 91 ------ .../base/langflow/services/monitor/utils.py | 125 --------- src/backend/base/langflow/services/schema.py | 1 - .../base/langflow/services/tracing/factory.py | 5 +- .../base/langflow/services/tracing/service.py | 4 +- src/backend/base/poetry.lock | 57 +--- src/backend/base/pyproject.toml | 1 - src/backend/tests/unit/test_database.py | 63 +---- 17 files changed, 77 insertions(+), 751 deletions(-) delete mode 100644 src/backend/base/langflow/services/monitor/__init__.py delete mode 100644 src/backend/base/langflow/services/monitor/factory.py delete mode 100644 src/backend/base/langflow/services/monitor/schema.py delete mode 100644 src/backend/base/langflow/services/monitor/service.py delete mode 100644 src/backend/base/langflow/services/monitor/utils.py diff --git a/poetry.lock b/poetry.lock index a01b6d8ea..eac01585e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -5071,7 +5071,6 @@ crewai = "^0.36.0" cryptography = ">=42.0.5,<44.0.0" diskcache = "^5.6.3" docstring-parser = "^0.16" -duckdb = "^1.0.0" emoji = "^2.12.0" fastapi = "^0.111.0" filelock = "^3.15.4" diff --git a/src/backend/base/langflow/api/v1/monitor.py b/src/backend/base/langflow/api/v1/monitor.py index 75689b057..a5daa61e5 100644 --- a/src/backend/base/langflow/api/v1/monitor.py +++ b/src/backend/base/langflow/api/v1/monitor.py @@ -4,6 +4,7 @@ from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy import delete from sqlmodel import Session, col, select +from langflow.schema.message import MessageResponse from langflow.services.auth.utils import get_current_active_user from langflow.services.database.models.message.model import MessageRead, MessageTable, MessageUpdate from langflow.services.database.models.transactions.crud import get_transactions_by_flow_id @@ -15,7 +16,6 @@ from langflow.services.database.models.vertex_builds.crud import ( ) from langflow.services.database.models.vertex_builds.model import VertexBuildMapModel from langflow.services.deps import get_session -from langflow.services.monitor.schema import MessageModelResponse router = APIRouter(prefix="/monitor", tags=["Monitor"]) @@ -43,7 +43,7 @@ async def delete_vertex_builds( raise HTTPException(status_code=500, detail=str(e)) -@router.get("/messages", response_model=list[MessageModelResponse]) +@router.get("/messages", response_model=list[MessageResponse]) async def get_messages( flow_id: str | None = Query(None), session_id: str | None = Query(None), @@ -66,7 +66,7 @@ async def get_messages( col = getattr(MessageTable, order_by).asc() stmt = stmt.order_by(col) messages = session.exec(stmt) - return [MessageModelResponse.model_validate(d, from_attributes=True) for d in messages] + return [MessageResponse.model_validate(d, from_attributes=True) for d in messages] except Exception as e: raise HTTPException(status_code=500, detail=str(e)) diff --git a/src/backend/base/langflow/schema/message.py b/src/backend/base/langflow/schema/message.py index be7c49f09..76ad950ef 100644 --- a/src/backend/base/langflow/schema/message.py +++ b/src/backend/base/langflow/schema/message.py @@ -1,4 +1,5 @@ import asyncio +import json from collections.abc import AsyncIterator, Iterator from datetime import datetime, timezone from typing import Annotated, Any @@ -11,7 +12,7 @@ from langchain_core.prompt_values import ImagePromptValue from langchain_core.prompts import BaseChatPromptTemplate, ChatPromptTemplate, PromptTemplate from langchain_core.prompts.image import ImagePromptTemplate from loguru import logger -from pydantic import BeforeValidator, ConfigDict, Field, field_serializer, field_validator +from pydantic import BaseModel, BeforeValidator, ConfigDict, Field, field_serializer, field_validator from langflow.base.prompts.utils import dict_values_to_string from langflow.schema.data import Data @@ -248,3 +249,68 @@ class Message(Data): return asyncio.run(cls.from_template_and_variables(template, **variables)) else: return loop.run_until_complete(cls.from_template_and_variables(template, **variables)) + + +class DefaultModel(BaseModel): + class Config: + from_attributes = True + populate_by_name = True + json_encoders = { + datetime: lambda v: v.isoformat(), + } + + def json(self, **kwargs): + # Usa a função de serialização personalizada + return super().model_dump_json(**kwargs, encoder=self.custom_encoder) + + @staticmethod + def custom_encoder(obj): + if isinstance(obj, datetime): + return obj.isoformat() + raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable") + + +class MessageResponse(DefaultModel): + id: str | UUID | None = Field(default=None) + flow_id: UUID | None = Field(default=None) + timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + sender: str + sender_name: str + session_id: str + text: str + files: list[str] = [] + + @field_validator("files", mode="before") + @classmethod + def validate_files(cls, v): + if isinstance(v, str): + v = json.loads(v) + return v + + @field_serializer("timestamp") + @classmethod + def serialize_timestamp(cls, v): + v = v.replace(microsecond=0) + return v.strftime("%Y-%m-%d %H:%M:%S") + + @field_serializer("files") + @classmethod + def serialize_files(cls, v): + if isinstance(v, list): + return json.dumps(v) + return v + + @classmethod + def from_message(cls, message: Message, flow_id: str | None = None): + # first check if the record has all the required fields + if message.text is None or not message.sender or not message.sender_name: + raise ValueError("The message does not have the required fields (text, sender, sender_name).") + return cls( + sender=message.sender, + sender_name=message.sender_name, + text=message.text, + session_id=message.session_id, + files=message.files or [], + timestamp=message.timestamp, + flow_id=flow_id, + ) diff --git a/src/backend/base/langflow/services/database/service.py b/src/backend/base/langflow/services/database/service.py index b3ac7c637..84c865953 100644 --- a/src/backend/base/langflow/services/database/service.py +++ b/src/backend/base/langflow/services/database/service.py @@ -19,8 +19,6 @@ from langflow.services.database.models.user.crud import get_user_by_username from langflow.services.database.utils import ( Result, TableResults, - migrate_messages_from_monitor_service_to_database, - migrate_transactions_from_monitor_service_to_database, ) from langflow.services.deps import get_settings_service from langflow.services.utils import teardown_superuser @@ -215,14 +213,6 @@ class DatabaseService(Service): logger.error(f"AutogenerateDiffsDetected: {exc}") if not fix: raise RuntimeError(f"There's a mismatch between the models and the database.\n{exc}") - try: - migrate_messages_from_monitor_service_to_database(session) - except Exception as exc: - logger.error(f"Error migrating messages from monitor service to database: {exc}") - try: - migrate_transactions_from_monitor_service_to_database(session) - except Exception as exc: - logger.error(f"Error migrating transactions from monitor service to database: {exc}") if fix: self.try_downgrade_upgrade_until_success(alembic_cfg) diff --git a/src/backend/base/langflow/services/database/utils.py b/src/backend/base/langflow/services/database/utils.py index 550da7fda..47eeb6b8f 100644 --- a/src/backend/base/langflow/services/database/utils.py +++ b/src/backend/base/langflow/services/database/utils.py @@ -1,91 +1,17 @@ from __future__ import annotations -import json from contextlib import contextmanager from dataclasses import dataclass from typing import TYPE_CHECKING from alembic.util.exc import CommandError from loguru import logger -from sqlmodel import Session, select, text - -from langflow.services.database.models import TransactionTable -from langflow.services.deps import get_monitor_service +from sqlmodel import Session, text if TYPE_CHECKING: from langflow.services.database.service import DatabaseService -def migrate_messages_from_monitor_service_to_database(session: Session) -> bool: - from langflow.schema.message import Message - from langflow.services.database.models.message import MessageTable - - try: - monitor_service = get_monitor_service() - messages_df = monitor_service.get_messages() - except Exception as e: - if "Table with name messages does not exist" in str(e): - logger.debug(f"Error retrieving messages from monitor service: {e}") - else: - logger.warning(f"Error retrieving messages from monitor service: {e}") - return False - - if messages_df.empty: - logger.info("No messages to migrate.") - return True - - original_messages: list[dict] = messages_df.to_dict(orient="records") - - db_messages = session.exec(select(MessageTable)).all() - db_messages = [msg[0] for msg in db_messages] # type: ignore - db_msg_dict = {(msg.text, msg.timestamp.isoformat(), str(msg.flow_id), msg.session_id): msg for msg in db_messages} - # Filter out messages that already exist in the database - original_messages_filtered = [] - for message in original_messages: - key = (message["text"], message["timestamp"].isoformat(), str(message["flow_id"]), message["session_id"]) - if key not in db_msg_dict: - original_messages_filtered.append(message) - if not original_messages_filtered: - logger.info("No messages to migrate.") - return True - try: - # Bulk insert messages - session.bulk_insert_mappings( - MessageTable, # type: ignore - [MessageTable.from_message(Message(**msg)).model_dump() for msg in original_messages_filtered], - ) - session.commit() - except Exception as e: - logger.error(f"Error during message insertion: {str(e)}") - session.rollback() - return False - - # Create a dictionary for faster lookup - - all_ok = True - for orig_msg in original_messages_filtered: - key = (orig_msg["text"], orig_msg["timestamp"].isoformat(), str(orig_msg["flow_id"]), orig_msg["session_id"]) - matching_db_msg = db_msg_dict.get(key) - - if matching_db_msg is None: - logger.warning(f"Message not found in database: {orig_msg}") - all_ok = False - else: - # Validate other fields - if any(getattr(matching_db_msg, k) != v for k, v in orig_msg.items() if k != "index"): - logger.warning(f"Message mismatch in database: {orig_msg}") - all_ok = False - - if all_ok: - messages_ids = [message["index"] for message in original_messages] - monitor_service.delete_messages(messages_ids) - logger.info("Migration completed successfully. Original messages deleted.") - else: - logger.warning("Migration completed with errors. Original messages not deleted.") - - return all_ok - - def initialize_database(fix_migration: bool = False): logger.debug("Initializing database") from langflow.services.deps import get_db_service @@ -153,39 +79,3 @@ class Result: class TableResults: table_name: str results: list[Result] - - -def migrate_transactions_from_monitor_service_to_database(session: Session) -> None: - try: - monitor_service = get_monitor_service() - batch = monitor_service.get_transactions() - except Exception as e: - if "Table with name transactions does not exist" in str(e): - logger.debug(f"Error retrieving transactions from monitor service: {e}") - else: - logger.warning(f"Error retrieving transactions from monitor service: {e}") - return - - if not batch: - logger.debug("No transactions to migrate.") - return - to_delete = [] - while batch: - logger.debug(f"Migrating {len(batch)} transactions") - for row in batch: - tt = TransactionTable( - flow_id=row["flow_id"], - status=row["status"], - error=row["error"], - timestamp=row["timestamp"], - vertex_id=row["vertex_id"], - inputs=json.loads(row["inputs"]) if row["inputs"] else None, - outputs=json.loads(row["outputs"]) if row["outputs"] else None, - target_id=row["target_id"], - ) - to_delete.append(row["index"]) - session.add(tt) - session.commit() - monitor_service.delete_transactions(to_delete) - batch = monitor_service.get_transactions() - logger.debug("Transactions migrations completed.") diff --git a/src/backend/base/langflow/services/deps.py b/src/backend/base/langflow/services/deps.py index ebea3a35a..f2670445e 100644 --- a/src/backend/base/langflow/services/deps.py +++ b/src/backend/base/langflow/services/deps.py @@ -12,7 +12,6 @@ if TYPE_CHECKING: from langflow.services.cache.service import CacheService from langflow.services.chat.service import ChatService from langflow.services.database.service import DatabaseService - from langflow.services.monitor.service import MonitorService from langflow.services.plugins.service import PluginService from langflow.services.session.service import SessionService from langflow.services.settings.service import SettingsService @@ -220,18 +219,6 @@ def get_session_service() -> "SessionService": return get_service(ServiceType.SESSION_SERVICE, SessionServiceFactory()) # type: ignore -def get_monitor_service() -> "MonitorService": - """ - Retrieves the MonitorService instance from the service manager. - - Returns: - MonitorService: The MonitorService instance. - """ - from langflow.services.monitor.factory import MonitorServiceFactory - - return get_service(ServiceType.MONITOR_SERVICE, MonitorServiceFactory()) # type: ignore - - def get_task_service() -> "TaskService": """ Retrieves the TaskService instance from the service manager. diff --git a/src/backend/base/langflow/services/monitor/__init__.py b/src/backend/base/langflow/services/monitor/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/backend/base/langflow/services/monitor/factory.py b/src/backend/base/langflow/services/monitor/factory.py deleted file mode 100644 index 054e6bb3f..000000000 --- a/src/backend/base/langflow/services/monitor/factory.py +++ /dev/null @@ -1,13 +0,0 @@ -from langflow.services.factory import ServiceFactory -from langflow.services.monitor.service import MonitorService -from langflow.services.settings.service import SettingsService - - -class MonitorServiceFactory(ServiceFactory): - name = "monitor_service" - - def __init__(self): - super().__init__(MonitorService) - - def create(self, settings_service: SettingsService): - return self.service_class(settings_service) diff --git a/src/backend/base/langflow/services/monitor/schema.py b/src/backend/base/langflow/services/monitor/schema.py deleted file mode 100644 index 003e704c7..000000000 --- a/src/backend/base/langflow/services/monitor/schema.py +++ /dev/null @@ -1,258 +0,0 @@ -import json -from datetime import datetime, timezone -from typing import Any -from uuid import UUID - -from pydantic import BaseModel, Field, field_serializer, field_validator - -from langflow.schema.message import Message - - -class DefaultModel(BaseModel): - class Config: - from_attributes = True - populate_by_name = True - json_encoders = { - datetime: lambda v: v.isoformat(), - } - - def json(self, **kwargs): - # Usa a função de serialização personalizada - return super().model_dump_json(**kwargs, encoder=self.custom_encoder) - - @staticmethod - def custom_encoder(obj): - if isinstance(obj, datetime): - return obj.isoformat() - raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable") - - -class TransactionModel(DefaultModel): - index: int | None = Field(default=None) - timestamp: datetime | None = Field(default_factory=datetime.now, alias="timestamp") - vertex_id: str - target_id: str | None = None - inputs: dict - outputs: dict | None = None - status: str - error: str | None = None - flow_id: str | None = Field(default=None, alias="flow_id") - - # validate target_args in case it is a JSON - @field_validator("outputs", "inputs", mode="before") - def validate_target_args(cls, v): - if isinstance(v, str): - return json.loads(v) - return v - - @field_serializer("outputs", "inputs") - def serialize_target_args(v): - if isinstance(v, dict): - return json.dumps(v) - return v - - -class TransactionModelResponse(DefaultModel): - index: int | None = Field(default=None) - timestamp: datetime | None = Field(default_factory=datetime.now, alias="timestamp") - vertex_id: str - inputs: dict - outputs: dict | None = None - status: str - error: str | None = None - flow_id: str | None = Field(default=None, alias="flow_id") - source: str | None = None - target: str | None = None - - # validate target_args in case it is a JSON - @field_validator("outputs", "inputs", mode="before") - def validate_target_args(cls, v): - if isinstance(v, str): - return json.loads(v) - return v - - @field_validator("index", mode="before") - def validate_id(cls, v): - if isinstance(v, float): - try: - return int(v) - except ValueError: - return None - return v - - -class DuckDbMessageModel(DefaultModel): - index: int | None = Field(default=None, alias="index") - flow_id: str | None = Field(default=None, alias="flow_id") - timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) - sender: str - sender_name: str - session_id: str - text: str - files: list[str] = [] - - @field_validator("files", mode="before") - @classmethod - def validate_files(cls, v): - if isinstance(v, str): - v = json.loads(v) - return v - - @field_serializer("timestamp") - @classmethod - def serialize_timestamp(cls, v): - v = v.replace(microsecond=0) - return v.strftime("%Y-%m-%d %H:%M:%S") - - @field_serializer("files") - @classmethod - def serialize_files(cls, v): - if isinstance(v, list): - return json.dumps(v) - return v - - @classmethod - def from_message(cls, message: Message, flow_id: str | None = None): - # first check if the record has all the required fields - if message.text is None or not message.sender or not message.sender_name: - raise ValueError("The message does not have the required fields (text, sender, sender_name).") - return cls( - sender=message.sender, - sender_name=message.sender_name, - text=message.text, - session_id=message.session_id, - files=message.files or [], - timestamp=message.timestamp, - flow_id=flow_id, - ) - - -class MessageModel(DefaultModel): - id: str | UUID | None = Field(default=None) - flow_id: UUID | None = Field(default=None) - timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) - sender: str - sender_name: str - session_id: str - text: str - files: list[str] = [] - - @field_validator("files", mode="before") - @classmethod - def validate_files(cls, v): - if isinstance(v, str): - v = json.loads(v) - return v - - @field_serializer("timestamp") - @classmethod - def serialize_timestamp(cls, v): - v = v.replace(microsecond=0) - return v.strftime("%Y-%m-%d %H:%M:%S") - - @field_serializer("files") - @classmethod - def serialize_files(cls, v): - if isinstance(v, list): - return json.dumps(v) - return v - - @classmethod - def from_message(cls, message: Message, flow_id: str | None = None): - # first check if the record has all the required fields - if message.text is None or not message.sender or not message.sender_name: - raise ValueError("The message does not have the required fields (text, sender, sender_name).") - return cls( - sender=message.sender, - sender_name=message.sender_name, - text=message.text, - session_id=message.session_id, - files=message.files or [], - timestamp=message.timestamp, - flow_id=flow_id, - ) - - -class MessageModelResponse(MessageModel): - pass - - -class MessageModelRequest(MessageModel): - text: str = Field(default="") - sender: str = Field(default="") - sender_name: str = Field(default="") - session_id: str = Field(default="") - - -class VertexBuildModel(DefaultModel): - index: int | None = Field(default=None, alias="index", exclude=True) - id: str | None = Field(default=None, alias="id") - flow_id: str - valid: bool - params: Any - data: dict - artifacts: dict - timestamp: datetime = Field(default_factory=datetime.now) - - @field_serializer("data", "artifacts") - def serialize_dict(v): - if isinstance(v, dict): - # check if the value of each key is a BaseModel or a list of BaseModels - for key, value in v.items(): - if isinstance(value, BaseModel): - v[key] = value.model_dump() - elif isinstance(value, list) and all(isinstance(i, BaseModel) for i in value): - v[key] = [i.model_dump() for i in value] - return json.dumps(v, default=str) - elif isinstance(v, BaseModel): - return v.model_dump_json() - return v - - @field_validator("params", mode="before") - def validate_params(cls, v): - if isinstance(v, str): - try: - return json.loads(v) - except json.JSONDecodeError: - return v - return v - - @field_serializer("params") - def serialize_params(v): - if isinstance(v, list) and all(isinstance(i, BaseModel) for i in v): - return json.dumps([i.model_dump() for i in v]) - return v - - @field_validator("data", mode="before") - def validate_data(cls, v): - if isinstance(v, str): - return json.loads(v) - return v - - @field_validator("artifacts", mode="before") - def validate_artifacts(cls, v): - if isinstance(v, str): - return json.loads(v) - elif isinstance(v, BaseModel): - return v.model_dump() - return v - - -class VertexBuildResponseModel(VertexBuildModel): - @field_serializer("data", "artifacts") - def serialize_dict(v): - return v - - -class VertexBuildMapModel(BaseModel): - vertex_builds: dict[str, list[VertexBuildResponseModel]] - - @classmethod - def from_list_of_dicts(cls, vertex_build_dicts): - vertex_build_map = {} - for vertex_build_dict in vertex_build_dicts: - vertex_build = VertexBuildResponseModel(**vertex_build_dict) - if vertex_build.id not in vertex_build_map: - vertex_build_map[vertex_build.id] = [] - vertex_build_map[vertex_build.id].append(vertex_build) - return cls(vertex_builds=vertex_build_map) diff --git a/src/backend/base/langflow/services/monitor/service.py b/src/backend/base/langflow/services/monitor/service.py deleted file mode 100644 index 6a00e9e38..000000000 --- a/src/backend/base/langflow/services/monitor/service.py +++ /dev/null @@ -1,91 +0,0 @@ -from pathlib import Path -from typing import TYPE_CHECKING - -from platformdirs import user_cache_dir - -from langflow.services.base import Service -from langflow.services.monitor.utils import ( - new_duckdb_locked_connection, -) - -if TYPE_CHECKING: - from langflow.services.settings.service import SettingsService - - -class MonitorService(Service): - """ - Deprecated. Still connecting to duckdb to migrate old installations. - """ - - name = "monitor_service" - - def __init__(self, settings_service: "SettingsService"): - self.settings_service = settings_service - self.base_cache_dir = Path(user_cache_dir("langflow"), ensure_exists=True) - self.db_path = self.base_cache_dir / "monitor.duckdb" - - def exec_query(self, query: str, read_only: bool = False): - with new_duckdb_locked_connection(self.db_path, read_only=read_only) as conn: - return conn.execute(query).df() - - def get_messages( - self, - flow_id: str | None = None, - sender: str | None = None, - sender_name: str | None = None, - session_id: str | None = None, - order_by: str | None = "timestamp", - order: str | None = "DESC", - limit: int | None = None, - ): - query = "SELECT index, flow_id, sender_name, sender, session_id, text, files, timestamp FROM messages" - conditions = [] - if sender: - conditions.append(f"sender = '{sender}'") - if sender_name: - conditions.append(f"sender_name = '{sender_name}'") - if session_id: - conditions.append(f"session_id = '{session_id}'") - if flow_id: - conditions.append(f"flow_id = '{flow_id}'") - - if conditions: - query += " WHERE " + " AND ".join(conditions) - - if order_by and order: - # Make sure the order is from newest to oldest - query += f" ORDER BY {order_by} {order.upper()}" - - if limit is not None: - query += f" LIMIT {limit}" - - with new_duckdb_locked_connection(self.db_path, read_only=True) as conn: - df = conn.execute(query).df() - - return df - - def delete_messages(self, message_ids: list[int] | str): - if isinstance(message_ids, list): - # If message_ids is a list, join the string representations of the integers - ids_str = ",".join(map(str, message_ids)) - elif isinstance(message_ids, str): - # If message_ids is already a string, use it directly - ids_str = message_ids - else: - raise ValueError("message_ids must be a list of integers or a string") - - query = f"DELETE FROM messages WHERE index IN ({ids_str})" - - return self.exec_query(query, read_only=False) - - def get_transactions(self, limit: int = 100): - query = f"SELECT index,flow_id, status, error, timestamp, vertex_id, inputs, outputs, target_id FROM transactions LIMIT {str(limit)}" - with new_duckdb_locked_connection(self.db_path, read_only=True) as conn: - df = conn.execute(query).df() - - return df.to_dict(orient="records") - - def delete_transactions(self, ids: list[int]) -> None: - with new_duckdb_locked_connection(self.db_path, read_only=False) as conn: - conn.execute(f"DELETE FROM transactions WHERE index in ({','.join(map(str, ids))})") - conn.commit() diff --git a/src/backend/base/langflow/services/monitor/utils.py b/src/backend/base/langflow/services/monitor/utils.py deleted file mode 100644 index 21d5d24a9..000000000 --- a/src/backend/base/langflow/services/monitor/utils.py +++ /dev/null @@ -1,125 +0,0 @@ -from contextlib import contextmanager -from pathlib import Path -from typing import TYPE_CHECKING, Any - -import duckdb -from loguru import logger -from pydantic import BaseModel - -from langflow.utils.concurrency import KeyedWorkerLockManager - -if TYPE_CHECKING: - pass - - -INDEX_KEY = "index" -worker_lock_manager = KeyedWorkerLockManager() - - -def get_table_schema_as_dict(conn: duckdb.DuckDBPyConnection, table_name: str) -> dict: - result = conn.execute(f"PRAGMA table_info('{table_name}')").fetchall() - schema = {row[1]: row[2].upper() for row in result} - return schema - - -def model_to_sql_column_definitions(model: type[BaseModel]) -> dict: - columns = {} - for field_name, field_type in model.model_fields.items(): - if hasattr(field_type.annotation, "__args__") and field_type.annotation is not None: - field_args = field_type.annotation.__args__ - else: - field_args = [] - field_info = field_args[0] if field_args else field_type.annotation - if field_info.__name__ == "int": - sql_type = "INTEGER" - elif field_info.__name__ == "str": - sql_type = "VARCHAR" - elif field_info.__name__ == "datetime": - sql_type = "TIMESTAMP" - elif field_info.__name__ == "bool": - sql_type = "BOOLEAN" - elif field_info.__name__ == "dict": - sql_type = "VARCHAR" - elif field_info.__name__ == "Any": - sql_type = "VARCHAR" - else: - continue # Skip types we don't handle - columns[field_name] = sql_type - return columns - - -def drop_and_create_table_if_schema_mismatch(db_path: str, table_name: str, model: type[BaseModel]): - with new_duckdb_locked_connection(db_path) as conn: - # Get the current schema from the database - try: - current_schema = get_table_schema_as_dict(conn, table_name) - except duckdb.CatalogException: - current_schema = {} - # Get the desired schema from the model - desired_schema = model_to_sql_column_definitions(model) - - # Compare the current and desired schemas - - if current_schema != desired_schema: - # If they don't match, drop the existing table and create a new one - logger.warning(f"Schema mismatch for duckdb table {table_name}. Dropping and recreating table.") - logger.debug(f"Current schema: {str(current_schema)}") - logger.debug(f"Desired schema: {str(desired_schema)}") - conn.execute(f"DROP TABLE IF EXISTS {table_name}") - if INDEX_KEY in desired_schema.keys(): - # Create a sequence for the id column - try: - conn.execute(f"CREATE SEQUENCE seq_{table_name} START 1;") - except duckdb.CatalogException: - pass - desired_schema[INDEX_KEY] = f"INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq_{table_name}')" - columns_sql = ", ".join(f"{name} {data_type}" for name, data_type in desired_schema.items()) - create_table_sql = f"CREATE TABLE {table_name} ({columns_sql})" - conn.execute(create_table_sql) - - -@contextmanager -def new_duckdb_locked_connection(db_path: str | Path, read_only=False): - with worker_lock_manager.lock("duckdb"): - with duckdb.connect(str(db_path), read_only=read_only) as conn: - yield conn - - -def add_row_to_table( - conn: duckdb.DuckDBPyConnection, - table_name: str, - model: type[BaseModel], - monitor_data: dict[str, Any] | BaseModel, -): - # Validate the data with the Pydantic model - if isinstance(monitor_data, model): - validated_data = monitor_data - else: - validated_data = model(**monitor_data) - - # Extract data for the insert statement - validated_dict = validated_data.model_dump() - keys = [key for key in validated_dict.keys() if key != INDEX_KEY] - columns = ", ".join(keys) - - values_placeholders = ", ".join(["?" for _ in keys]) - values = [validated_dict[key] for key in keys] - - # Create the insert statement - insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({values_placeholders})" - - # Execute the insert statement - try: - conn.execute(insert_sql, values) - except Exception as e: - # Log values types - column_error_message = "" - for key, value in validated_dict.items(): - logger.error(f"{key}: {type(value)}") - if str(value) in str(e): - column_error_message = f"Column: {key} Value: {value} Error: {e}" - - if column_error_message: - logger.error(f"Error adding row to {table_name}: {column_error_message}") - else: - logger.error(f"Error adding row to {table_name}: {e}") diff --git a/src/backend/base/langflow/services/schema.py b/src/backend/base/langflow/services/schema.py index eafc0c085..173b80750 100644 --- a/src/backend/base/langflow/services/schema.py +++ b/src/backend/base/langflow/services/schema.py @@ -18,7 +18,6 @@ class ServiceType(str, Enum): STORE_SERVICE = "store_service" VARIABLE_SERVICE = "variable_service" STORAGE_SERVICE = "storage_service" - MONITOR_SERVICE = "monitor_service" # SOCKETIO_SERVICE = "socket_service" STATE_SERVICE = "state_service" TRACING_SERVICE = "tracing_service" diff --git a/src/backend/base/langflow/services/tracing/factory.py b/src/backend/base/langflow/services/tracing/factory.py index 776edcf1e..e179d2118 100644 --- a/src/backend/base/langflow/services/tracing/factory.py +++ b/src/backend/base/langflow/services/tracing/factory.py @@ -4,7 +4,6 @@ from langflow.services.factory import ServiceFactory from langflow.services.tracing.service import TracingService if TYPE_CHECKING: - from langflow.services.monitor.service import MonitorService from langflow.services.settings.service import SettingsService @@ -12,5 +11,5 @@ class TracingServiceFactory(ServiceFactory): def __init__(self): super().__init__(TracingService) - def create(self, settings_service: "SettingsService", monitor_service: "MonitorService"): - return TracingService(settings_service, monitor_service) + def create(self, settings_service: "SettingsService"): + return TracingService(settings_service) diff --git a/src/backend/base/langflow/services/tracing/service.py b/src/backend/base/langflow/services/tracing/service.py index 6e78591f9..c7799a996 100644 --- a/src/backend/base/langflow/services/tracing/service.py +++ b/src/backend/base/langflow/services/tracing/service.py @@ -16,7 +16,6 @@ if TYPE_CHECKING: from langflow.custom.custom_component.component import Component from langflow.graph.vertex.base import Vertex - from langflow.services.monitor.service import MonitorService from langflow.services.settings.service import SettingsService @@ -41,9 +40,8 @@ def _get_langfuse_tracer(): class TracingService(Service): name = "tracing_service" - def __init__(self, settings_service: "SettingsService", monitor_service: "MonitorService"): + def __init__(self, settings_service: "SettingsService"): self.settings_service = settings_service - self.monitor_service = monitor_service self.inputs: dict[str, dict] = defaultdict(dict) self.inputs_metadata: dict[str, dict] = defaultdict(dict) self.outputs: dict[str, dict] = defaultdict(dict) diff --git a/src/backend/base/poetry.lock b/src/backend/base/poetry.lock index 81c74b924..a40c24e6b 100644 --- a/src/backend/base/poetry.lock +++ b/src/backend/base/poetry.lock @@ -1240,61 +1240,6 @@ files = [ {file = "docstring_parser-0.16.tar.gz", hash = "sha256:538beabd0af1e2db0146b6bd3caa526c35a34d61af9fd2887f3a8a27a739aa6e"}, ] -[[package]] -name = "duckdb" -version = "1.0.0" -description = "DuckDB in-process database" -optional = false -python-versions = ">=3.7.0" -files = [ - {file = "duckdb-1.0.0-cp310-cp310-macosx_12_0_arm64.whl", hash = "sha256:4a8ce2d1f9e1c23b9bab3ae4ca7997e9822e21563ff8f646992663f66d050211"}, - {file = "duckdb-1.0.0-cp310-cp310-macosx_12_0_universal2.whl", hash = "sha256:19797670f20f430196e48d25d082a264b66150c264c1e8eae8e22c64c2c5f3f5"}, - {file = "duckdb-1.0.0-cp310-cp310-macosx_12_0_x86_64.whl", hash = "sha256:b71c342090fe117b35d866a91ad6bffce61cd6ff3e0cff4003f93fc1506da0d8"}, - {file = "duckdb-1.0.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:25dd69f44ad212c35ae2ea736b0e643ea2b70f204b8dff483af1491b0e2a4cec"}, - {file = "duckdb-1.0.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8da5f293ecb4f99daa9a9352c5fd1312a6ab02b464653a0c3a25ab7065c45d4d"}, - {file = "duckdb-1.0.0-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:3207936da9967ddbb60644ec291eb934d5819b08169bc35d08b2dedbe7068c60"}, - {file = "duckdb-1.0.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:1128d6c9c33e883b1f5df6b57c1eb46b7ab1baf2650912d77ee769aaa05111f9"}, - {file = "duckdb-1.0.0-cp310-cp310-win_amd64.whl", hash = "sha256:02310d263474d0ac238646677feff47190ffb82544c018b2ff732a4cb462c6ef"}, - {file = "duckdb-1.0.0-cp311-cp311-macosx_12_0_arm64.whl", hash = "sha256:75586791ab2702719c284157b65ecefe12d0cca9041da474391896ddd9aa71a4"}, - {file = "duckdb-1.0.0-cp311-cp311-macosx_12_0_universal2.whl", hash = "sha256:83bb415fc7994e641344f3489e40430ce083b78963cb1057bf714ac3a58da3ba"}, - {file = "duckdb-1.0.0-cp311-cp311-macosx_12_0_x86_64.whl", hash = "sha256:bee2e0b415074e84c5a2cefd91f6b5ebeb4283e7196ba4ef65175a7cef298b57"}, - {file = "duckdb-1.0.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fa5a4110d2a499312609544ad0be61e85a5cdad90e5b6d75ad16b300bf075b90"}, - {file = "duckdb-1.0.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5fa389e6a382d4707b5f3d1bc2087895925ebb92b77e9fe3bfb23c9b98372fdc"}, - {file = "duckdb-1.0.0-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:7ede6f5277dd851f1a4586b0c78dc93f6c26da45e12b23ee0e88c76519cbdbe0"}, - {file = "duckdb-1.0.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:0b88cdbc0d5c3e3d7545a341784dc6cafd90fc035f17b2f04bf1e870c68456e5"}, - {file = "duckdb-1.0.0-cp311-cp311-win_amd64.whl", hash = "sha256:fd1693cdd15375156f7fff4745debc14e5c54928589f67b87fb8eace9880c370"}, - {file = "duckdb-1.0.0-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:c65a7fe8a8ce21b985356ee3ec0c3d3b3b2234e288e64b4cfb03356dbe6e5583"}, - {file = "duckdb-1.0.0-cp312-cp312-macosx_12_0_universal2.whl", hash = "sha256:e5a8eda554379b3a43b07bad00968acc14dd3e518c9fbe8f128b484cf95e3d16"}, - {file = "duckdb-1.0.0-cp312-cp312-macosx_12_0_x86_64.whl", hash = "sha256:a1b6acdd54c4a7b43bd7cb584975a1b2ff88ea1a31607a2b734b17960e7d3088"}, - {file = "duckdb-1.0.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a677bb1b6a8e7cab4a19874249d8144296e6e39dae38fce66a80f26d15e670df"}, - {file = "duckdb-1.0.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:752e9d412b0a2871bf615a2ede54be494c6dc289d076974eefbf3af28129c759"}, - {file = "duckdb-1.0.0-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:3aadb99d098c5e32d00dc09421bc63a47134a6a0de9d7cd6abf21780b678663c"}, - {file = "duckdb-1.0.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:83b7091d4da3e9301c4f9378833f5ffe934fb1ad2b387b439ee067b2c10c8bb0"}, - {file = "duckdb-1.0.0-cp312-cp312-win_amd64.whl", hash = "sha256:6a8058d0148b544694cb5ea331db44f6c2a00a7b03776cc4dd1470735c3d5ff7"}, - {file = "duckdb-1.0.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e40cb20e5ee19d44bc66ec99969af791702a049079dc5f248c33b1c56af055f4"}, - {file = "duckdb-1.0.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d7bce1bc0de9af9f47328e24e6e7e39da30093179b1c031897c042dd94a59c8e"}, - {file = "duckdb-1.0.0-cp37-cp37m-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:8355507f7a04bc0a3666958f4414a58e06141d603e91c0fa5a7c50e49867fb6d"}, - {file = "duckdb-1.0.0-cp37-cp37m-musllinux_1_2_x86_64.whl", hash = "sha256:39f1a46f5a45ad2886dc9b02ce5b484f437f90de66c327f86606d9ba4479d475"}, - {file = "duckdb-1.0.0-cp37-cp37m-win_amd64.whl", hash = "sha256:a6d29ba477b27ae41676b62c8fae8d04ee7cbe458127a44f6049888231ca58fa"}, - {file = "duckdb-1.0.0-cp38-cp38-macosx_12_0_arm64.whl", hash = "sha256:1bea713c1925918714328da76e79a1f7651b2b503511498ccf5e007a7e67d49e"}, - {file = "duckdb-1.0.0-cp38-cp38-macosx_12_0_universal2.whl", hash = "sha256:bfe67f3bcf181edbf6f918b8c963eb060e6aa26697d86590da4edc5707205450"}, - {file = "duckdb-1.0.0-cp38-cp38-macosx_12_0_x86_64.whl", hash = "sha256:dbc6093a75242f002be1d96a6ace3fdf1d002c813e67baff52112e899de9292f"}, - {file = "duckdb-1.0.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ba1881a2b11c507cee18f8fd9ef10100be066fddaa2c20fba1f9a664245cd6d8"}, - {file = "duckdb-1.0.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:445d0bb35087c522705c724a75f9f1c13f1eb017305b694d2686218d653c8142"}, - {file = "duckdb-1.0.0-cp38-cp38-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:224553432e84432ffb9684f33206572477049b371ce68cc313a01e214f2fbdda"}, - {file = "duckdb-1.0.0-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:d3914032e47c4e76636ad986d466b63fdea65e37be8a6dfc484ed3f462c4fde4"}, - {file = "duckdb-1.0.0-cp38-cp38-win_amd64.whl", hash = "sha256:af9128a2eb7e1bb50cd2c2020d825fb2946fdad0a2558920cd5411d998999334"}, - {file = "duckdb-1.0.0-cp39-cp39-macosx_12_0_arm64.whl", hash = "sha256:dd2659a5dbc0df0de68f617a605bf12fe4da85ba24f67c08730984a0892087e8"}, - {file = "duckdb-1.0.0-cp39-cp39-macosx_12_0_universal2.whl", hash = "sha256:ac5a4afb0bc20725e734e0b2c17e99a274de4801aff0d4e765d276b99dad6d90"}, - {file = "duckdb-1.0.0-cp39-cp39-macosx_12_0_x86_64.whl", hash = "sha256:2c5a53bee3668d6e84c0536164589d5127b23d298e4c443d83f55e4150fafe61"}, - {file = "duckdb-1.0.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b980713244d7708b25ee0a73de0c65f0e5521c47a0e907f5e1b933d79d972ef6"}, - {file = "duckdb-1.0.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:21cbd4f9fe7b7a56eff96c3f4d6778770dd370469ca2212eddbae5dd63749db5"}, - {file = "duckdb-1.0.0-cp39-cp39-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ed228167c5d49888c5ef36f6f9cbf65011c2daf9dcb53ea8aa7a041ce567b3e4"}, - {file = "duckdb-1.0.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:46d8395fbcea7231fd5032a250b673cc99352fef349b718a23dea2c0dd2b8dec"}, - {file = "duckdb-1.0.0-cp39-cp39-win_amd64.whl", hash = "sha256:6ad1fc1a4d57e7616944166a5f9417bdbca1ea65c490797e3786e3a42e162d8a"}, - {file = "duckdb-1.0.0.tar.gz", hash = "sha256:a2a059b77bc7d5b76ae9d88e267372deff19c291048d59450c431e166233d453"}, -] - [[package]] name = "ecdsa" version = "0.19.0" @@ -7307,4 +7252,4 @@ local = [] [metadata] lock-version = "2.0" python-versions = ">=3.10,<3.13" -content-hash = "9f69bf4c65c458a2d9c5aafb49010211e8624971f0a6b7af493fa80920f4b8de" +content-hash = "14036da5c48fac610cfba954acd19cb2e1356c593bbcee917b892a64084528ad" diff --git a/src/backend/base/pyproject.toml b/src/backend/base/pyproject.toml index adc38fb87..a0a234816 100644 --- a/src/backend/base/pyproject.toml +++ b/src/backend/base/pyproject.toml @@ -57,7 +57,6 @@ docstring-parser = "^0.16" python-jose = "^3.3.0" pandas = "2.2.2" multiprocess = "^0.70.14" -duckdb = "^1.0.0" python-docx = "^1.1.0" jq = { version = "^1.7.0", markers = "sys_platform != 'win32'" } pypdf = "^4.2.0" diff --git a/src/backend/tests/unit/test_database.py b/src/backend/tests/unit/test_database.py index 1ea976506..9695268a9 100644 --- a/src/backend/tests/unit/test_database.py +++ b/src/backend/tests/unit/test_database.py @@ -13,15 +13,8 @@ from langflow.graph.utils import log_transaction, log_vertex_build from langflow.initial_setup.setup import load_flows_from_directory, load_starter_projects from langflow.services.database.models.base import orjson_dumps from langflow.services.database.models.flow import Flow, FlowCreate, FlowUpdate -from langflow.services.database.models.transactions.crud import get_transactions_by_flow_id -from langflow.services.database.utils import migrate_transactions_from_monitor_service_to_database, session_getter -from langflow.services.deps import get_db_service, get_monitor_service, session_scope -from langflow.services.monitor.schema import TransactionModel -from langflow.services.monitor.utils import ( - add_row_to_table, - drop_and_create_table_if_schema_mismatch, - new_duckdb_locked_connection, -) +from langflow.services.database.utils import session_getter +from langflow.services.deps import get_db_service @pytest.fixture(scope="module") @@ -429,58 +422,6 @@ def test_load_flows(client: TestClient, load_flows_dir): assert response.json()["name"] == "BasicExample" -@pytest.mark.load_flows -def test_migrate_transactions(client: TestClient): - monitor_service = get_monitor_service() - drop_and_create_table_if_schema_mismatch(str(monitor_service.db_path), "transactions", TransactionModel) - flow_id = "c54f9130-f2fa-4a3e-b22a-3856d946351b" - data = { - "vertex_id": "vid", - "target_id": "tid", - "inputs": {"input_value": True}, - "outputs": {"output_value": True}, - "timestamp": "2021-10-10T10:10:10", - "status": "success", - "error": None, - "flow_id": flow_id, - } - with new_duckdb_locked_connection(str(monitor_service.db_path), read_only=False) as conn: - add_row_to_table(conn, "transactions", TransactionModel, data) - assert 1 == len(monitor_service.get_transactions()) - - with session_scope() as session: - migrate_transactions_from_monitor_service_to_database(session) - new_trans = get_transactions_by_flow_id(session, UUID(flow_id)) - assert 1 == len(new_trans) - t = new_trans[0] - assert t.error is None - assert t.inputs == data["inputs"] - assert t.outputs == data["outputs"] - assert t.status == data["status"] - assert str(t.timestamp) == "2021-10-10 10:10:10" - assert t.vertex_id == data["vertex_id"] - assert t.target_id == data["target_id"] - assert t.flow_id == UUID(flow_id) - - assert 0 == len(monitor_service.get_transactions()) - - client.request("DELETE", f"api/v1/flows/{flow_id}") - with session_scope() as session: - new_trans = get_transactions_by_flow_id(session, UUID(flow_id)) - assert 0 == len(new_trans) - - -@pytest.mark.load_flows -def test_migrate_transactions_no_duckdb(client: TestClient): - flow_id = "c54f9130-f2fa-4a3e-b22a-3856d946351b" - get_monitor_service() - - with session_scope() as session: - migrate_transactions_from_monitor_service_to_database(session) - new_trans = get_transactions_by_flow_id(session, UUID(flow_id)) - assert 0 == len(new_trans) - - def test_sqlite_pragmas(): db_service = get_db_service()