diff --git a/src/backend/langflow/services/monitor/__init__.py b/src/backend/langflow/services/monitor/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/backend/langflow/services/monitor/factory.py b/src/backend/langflow/services/monitor/factory.py new file mode 100644 index 000000000..d9e6caf72 --- /dev/null +++ b/src/backend/langflow/services/monitor/factory.py @@ -0,0 +1,12 @@ +from langflow.services.factory import ServiceFactory +from langflow.services.monitor.service import MonitorService + + +class MonitorServiceFactory(ServiceFactory): + name = "monitor_service" + + def __init__(self): + super().__init__(MonitorService) + + def create(self, settings_service): + return self.service_class(settings_service) diff --git a/src/backend/langflow/services/monitor/schema.py b/src/backend/langflow/services/monitor/schema.py new file mode 100644 index 000000000..620484829 --- /dev/null +++ b/src/backend/langflow/services/monitor/schema.py @@ -0,0 +1,46 @@ +import json +from datetime import datetime +from typing import Optional + +from pydantic import BaseModel, Field, validator + + +class TransactionModel(BaseModel): + id: Optional[int] = Field(default=None, alias="id") + timestamp: Optional[datetime] = Field(default_factory=datetime.now, alias="timestamp") + source: str + target: str + target_args: dict + status: str + error: Optional[str] = None + + class Config: + from_attributes = True + populate_by_name = True + + # validate target_args in case it is a JSON + @validator("target_args", pre=True) + def validate_target_args(cls, v): + if isinstance(v, str): + return json.loads(v) + return v + + +class MessageModel(BaseModel): + id: Optional[int] = Field(default=None, alias="id") + timestamp: datetime = Field(default_factory=datetime.now) + sender_type: str + sender_name: str + session_id: str + message: str + artifacts: dict + + class Config: + from_attributes = True + populate_by_name = True + + @validator("artifacts", pre=True) + def validate_target_args(cls, v): + if isinstance(v, str): + return json.loads(v) + return v diff --git a/src/backend/langflow/services/monitor/service.py b/src/backend/langflow/services/monitor/service.py new file mode 100644 index 000000000..3f98b4562 --- /dev/null +++ b/src/backend/langflow/services/monitor/service.py @@ -0,0 +1,55 @@ +from datetime import datetime +from pathlib import Path +from typing import TYPE_CHECKING + +import duckdb +from langflow.services.base import Service +from langflow.services.monitor.schema import MessageModel, TransactionModel +from langflow.services.monitor.utils import ( + add_row_to_table, drop_and_create_table_if_schema_mismatch) +from loguru import logger +from platformdirs import user_cache_dir + +if TYPE_CHECKING: + from langflow.services.settings.manager import SettingsService + + +class MonitorService(Service): + name = "monitor_service" + + def __init__(self, settings_service: "SettingsService"): + self.settings_service = settings_service + self.base_cache_dir = Path(user_cache_dir("langflow")) + self.db_path = self.base_cache_dir / "monitor.duckdb" + try: + self.ensure_tables_exist() + except Exception as e: + logger.error(f"Error initializing monitor service: {e}") + + def to_df(self, table_name): + return self.load_table_as_dataframe(table_name) + + def ensure_tables_exist(self): + drop_and_create_table_if_schema_mismatch(str(self.db_path), "transactions", TransactionModel) + drop_and_create_table_if_schema_mismatch(str(self.db_path), "messages", MessageModel) + + def add_row(self, table_name: str, data: dict): + # Make sure the model passed matches the table + if table_name == "transactions": + model = TransactionModel + elif table_name == "messages": + model = MessageModel + else: + raise ValueError(f"Unknown table name: {table_name}") + + # Connect to DuckDB and add the row + with duckdb.connect(str(self.db_path)) as conn: + add_row_to_table(conn, table_name, model, data) + + def load_table_as_dataframe(self, table_name): + with duckdb.connect(str(self.db_path)) as conn: + return conn.table(table_name).df() + + @staticmethod + def get_timestamp(): + return datetime.now().strftime("%Y-%m-%d %H:%M:%S") diff --git a/src/backend/langflow/services/monitor/utils.py b/src/backend/langflow/services/monitor/utils.py new file mode 100644 index 000000000..5cb20a0f9 --- /dev/null +++ b/src/backend/langflow/services/monitor/utils.py @@ -0,0 +1,79 @@ +from typing import Any, Dict, Type + +import duckdb +from pydantic import BaseModel + + +def get_table_schema_as_dict(conn: duckdb.DuckDBPyConnection, table_name: str) -> dict: + result = conn.execute(f"PRAGMA table_info('{table_name}')").fetchall() + return {row[1]: row[2].upper() for row in result} + + +def model_to_sql_column_definitions(model: Type[BaseModel]) -> dict: + columns = {} + for field_name, field_type in model.__fields__.items(): + field_info = field_type.type_ + if field_info.__name__ == "int": + sql_type = "INTEGER" + elif field_info.__name__ == "str": + sql_type = "VARCHAR" + elif field_info.__name__ == "datetime": + sql_type = "TIMESTAMP" + elif field_info.__name__ == "bool": + sql_type = "BOOLEAN" + elif field_info.__name__ == "dict": + sql_type = "JSON" + else: + continue # Skip types we don't handle + columns[field_name] = sql_type + return columns + + +def drop_and_create_table_if_schema_mismatch(db_path: str, table_name: str, model: Type[BaseModel]): + with duckdb.connect(db_path) as conn: + # Get the current schema from the database + try: + current_schema = get_table_schema_as_dict(conn, table_name) + except duckdb.CatalogException: + current_schema = {} + # Get the desired schema from the model + desired_schema = model_to_sql_column_definitions(model) + + # Compare the current and desired schemas + if current_schema != desired_schema: + # If they don't match, drop the existing table and create a new one + conn.execute(f"DROP TABLE IF EXISTS {table_name}") + if "id" in desired_schema.keys(): + # Create a sequence for the id column + try: + conn.execute(f"CREATE SEQUENCE seq_{table_name} START 1;") + except duckdb.CatalogException: + pass + desired_schema["id"] = f"INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq_{table_name}')" + columns_sql = ", ".join(f"{name} {data_type}" for name, data_type in desired_schema.items()) + create_table_sql = f"CREATE TABLE {table_name} ({columns_sql})" + conn.execute(create_table_sql) + + +def add_row_to_table( + conn: duckdb.DuckDBPyConnection, + table_name: str, + model: Type[BaseModel], + monitor_data: Dict[str, Any], +): + # Validate the data with the Pydantic model + validated_data = model(**monitor_data) + + # Extract data for the insert statement + validated_dict = validated_data.dict(exclude_unset=True) + keys = [key for key in validated_dict.keys() if key != "id"] + columns = ", ".join(keys) + + values_placeholders = ", ".join(["?" for _ in keys]) + values = list(validated_dict.values()) + + # Create the insert statement + insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({values_placeholders})" + + # Execute the insert statement + conn.execute(insert_sql, values)