Add monitor service and related files

This commit is contained in:
anovazzi1 2024-01-19 21:54:21 -03:00
commit 9933d1fec0
5 changed files with 192 additions and 0 deletions

View file

@ -0,0 +1,12 @@
from langflow.services.factory import ServiceFactory
from langflow.services.monitor.service import MonitorService
class MonitorServiceFactory(ServiceFactory):
name = "monitor_service"
def __init__(self):
super().__init__(MonitorService)
def create(self, settings_service):
return self.service_class(settings_service)

View file

@ -0,0 +1,46 @@
import json
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field, validator
class TransactionModel(BaseModel):
id: Optional[int] = Field(default=None, alias="id")
timestamp: Optional[datetime] = Field(default_factory=datetime.now, alias="timestamp")
source: str
target: str
target_args: dict
status: str
error: Optional[str] = None
class Config:
from_attributes = True
populate_by_name = True
# validate target_args in case it is a JSON
@validator("target_args", pre=True)
def validate_target_args(cls, v):
if isinstance(v, str):
return json.loads(v)
return v
class MessageModel(BaseModel):
id: Optional[int] = Field(default=None, alias="id")
timestamp: datetime = Field(default_factory=datetime.now)
sender_type: str
sender_name: str
session_id: str
message: str
artifacts: dict
class Config:
from_attributes = True
populate_by_name = True
@validator("artifacts", pre=True)
def validate_target_args(cls, v):
if isinstance(v, str):
return json.loads(v)
return v

View file

@ -0,0 +1,55 @@
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING
import duckdb
from langflow.services.base import Service
from langflow.services.monitor.schema import MessageModel, TransactionModel
from langflow.services.monitor.utils import (
add_row_to_table, drop_and_create_table_if_schema_mismatch)
from loguru import logger
from platformdirs import user_cache_dir
if TYPE_CHECKING:
from langflow.services.settings.manager import SettingsService
class MonitorService(Service):
name = "monitor_service"
def __init__(self, settings_service: "SettingsService"):
self.settings_service = settings_service
self.base_cache_dir = Path(user_cache_dir("langflow"))
self.db_path = self.base_cache_dir / "monitor.duckdb"
try:
self.ensure_tables_exist()
except Exception as e:
logger.error(f"Error initializing monitor service: {e}")
def to_df(self, table_name):
return self.load_table_as_dataframe(table_name)
def ensure_tables_exist(self):
drop_and_create_table_if_schema_mismatch(str(self.db_path), "transactions", TransactionModel)
drop_and_create_table_if_schema_mismatch(str(self.db_path), "messages", MessageModel)
def add_row(self, table_name: str, data: dict):
# Make sure the model passed matches the table
if table_name == "transactions":
model = TransactionModel
elif table_name == "messages":
model = MessageModel
else:
raise ValueError(f"Unknown table name: {table_name}")
# Connect to DuckDB and add the row
with duckdb.connect(str(self.db_path)) as conn:
add_row_to_table(conn, table_name, model, data)
def load_table_as_dataframe(self, table_name):
with duckdb.connect(str(self.db_path)) as conn:
return conn.table(table_name).df()
@staticmethod
def get_timestamp():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

View file

@ -0,0 +1,79 @@
from typing import Any, Dict, Type
import duckdb
from pydantic import BaseModel
def get_table_schema_as_dict(conn: duckdb.DuckDBPyConnection, table_name: str) -> dict:
result = conn.execute(f"PRAGMA table_info('{table_name}')").fetchall()
return {row[1]: row[2].upper() for row in result}
def model_to_sql_column_definitions(model: Type[BaseModel]) -> dict:
columns = {}
for field_name, field_type in model.__fields__.items():
field_info = field_type.type_
if field_info.__name__ == "int":
sql_type = "INTEGER"
elif field_info.__name__ == "str":
sql_type = "VARCHAR"
elif field_info.__name__ == "datetime":
sql_type = "TIMESTAMP"
elif field_info.__name__ == "bool":
sql_type = "BOOLEAN"
elif field_info.__name__ == "dict":
sql_type = "JSON"
else:
continue # Skip types we don't handle
columns[field_name] = sql_type
return columns
def drop_and_create_table_if_schema_mismatch(db_path: str, table_name: str, model: Type[BaseModel]):
with duckdb.connect(db_path) as conn:
# Get the current schema from the database
try:
current_schema = get_table_schema_as_dict(conn, table_name)
except duckdb.CatalogException:
current_schema = {}
# Get the desired schema from the model
desired_schema = model_to_sql_column_definitions(model)
# Compare the current and desired schemas
if current_schema != desired_schema:
# If they don't match, drop the existing table and create a new one
conn.execute(f"DROP TABLE IF EXISTS {table_name}")
if "id" in desired_schema.keys():
# Create a sequence for the id column
try:
conn.execute(f"CREATE SEQUENCE seq_{table_name} START 1;")
except duckdb.CatalogException:
pass
desired_schema["id"] = f"INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq_{table_name}')"
columns_sql = ", ".join(f"{name} {data_type}" for name, data_type in desired_schema.items())
create_table_sql = f"CREATE TABLE {table_name} ({columns_sql})"
conn.execute(create_table_sql)
def add_row_to_table(
conn: duckdb.DuckDBPyConnection,
table_name: str,
model: Type[BaseModel],
monitor_data: Dict[str, Any],
):
# Validate the data with the Pydantic model
validated_data = model(**monitor_data)
# Extract data for the insert statement
validated_dict = validated_data.dict(exclude_unset=True)
keys = [key for key in validated_dict.keys() if key != "id"]
columns = ", ".join(keys)
values_placeholders = ", ".join(["?" for _ in keys])
values = list(validated_dict.values())
# Create the insert statement
insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({values_placeholders})"
# Execute the insert statement
conn.execute(insert_sql, values)