From 45bac3714a3ad7899834ed719b0cbee05e670c63 Mon Sep 17 00:00:00 2001 From: Jordan Frazier <122494242+jordanrfrazier@users.noreply.github.com> Date: Tue, 18 Feb 2025 09:03:07 -0800 Subject: [PATCH] feat: add db driver env variable (#5967) * Add option to disable prepared statements * [autofix.ci] apply automated fixes * use generic connection driver env var * simplify and allow usage for sqlite * allow pool class to be set to kwargs * simplify pool class selection * [autofix.ci] apply automated fixes * use reflection to get poolclass * rebase fixes * [autofix.ci] apply automated fixes * [autofix.ci] apply automated fixes (attempt 2/3) --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- .../Financial Report Parser.json | 6 ++-- .../starter_projects/Meeting Summary.json | 6 ++-- .../starter_projects/Youtube Analysis.json | 28 +++++++++++++++---- .../langflow/services/database/service.py | 27 ++++++++++++------ .../base/langflow/services/settings/base.py | 3 ++ 5 files changed, 53 insertions(+), 17 deletions(-) diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Financial Report Parser.json b/src/backend/base/langflow/initial_setup/starter_projects/Financial Report Parser.json index 2c281e27a..0fe4a2f82 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Financial Report Parser.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Financial Report Parser.json @@ -960,7 +960,9 @@ "key": "ParseData", "legacy": false, "lf_version": "1.1.5", - "metadata": {}, + "metadata": { + "legacy_name": "Parse Data" + }, "minimized": false, "output_types": [], "outputs": [ @@ -1011,7 +1013,7 @@ "show": true, "title_case": false, "type": "code", - "value": "from langflow.custom import Component\nfrom langflow.helpers.data import data_to_text, data_to_text_list\nfrom langflow.io import DataInput, MultilineInput, Output, StrInput\nfrom langflow.schema import Data\nfrom langflow.schema.message import Message\n\n\nclass ParseDataComponent(Component):\n display_name = \"Data to Message\"\n description = \"Convert Data objects into Messages using any {field_name} from input data.\"\n icon = \"message-square\"\n name = \"ParseData\"\n\n inputs = [\n DataInput(name=\"data\", display_name=\"Data\", info=\"The data to convert to text.\", is_list=True, required=True),\n MultilineInput(\n name=\"template\",\n display_name=\"Template\",\n info=\"The template to use for formatting the data. \"\n \"It can contain the keys {text}, {data} or any other key in the Data.\",\n value=\"{text}\",\n required=True,\n ),\n StrInput(name=\"sep\", display_name=\"Separator\", advanced=True, value=\"\\n\"),\n ]\n\n outputs = [\n Output(\n display_name=\"Message\",\n name=\"text\",\n info=\"Data as a single Message, with each input Data separated by Separator\",\n method=\"parse_data\",\n ),\n Output(\n display_name=\"Data List\",\n name=\"data_list\",\n info=\"Data as a list of new Data, each having `text` formatted by Template\",\n method=\"parse_data_as_list\",\n ),\n ]\n\n def _clean_args(self) -> tuple[list[Data], str, str]:\n data = self.data if isinstance(self.data, list) else [self.data]\n template = self.template\n sep = self.sep\n return data, template, sep\n\n def parse_data(self) -> Message:\n data, template, sep = self._clean_args()\n result_string = data_to_text(template, data, sep)\n self.status = result_string\n return Message(text=result_string)\n\n def parse_data_as_list(self) -> list[Data]:\n data, template, _ = self._clean_args()\n text_list, data_list = data_to_text_list(template, data)\n for item, text in zip(data_list, text_list, strict=True):\n item.set_text(text)\n self.status = data_list\n return data_list\n" + "value": "from langflow.custom import Component\nfrom langflow.helpers.data import data_to_text, data_to_text_list\nfrom langflow.io import DataInput, MultilineInput, Output, StrInput\nfrom langflow.schema import Data\nfrom langflow.schema.message import Message\n\n\nclass ParseDataComponent(Component):\n display_name = \"Data to Message\"\n description = \"Convert Data objects into Messages using any {field_name} from input data.\"\n icon = \"message-square\"\n name = \"ParseData\"\n metadata = {\n \"legacy_name\": \"Parse Data\",\n }\n\n inputs = [\n DataInput(\n name=\"data\",\n display_name=\"Data\",\n info=\"The data to convert to text.\",\n is_list=True,\n required=True,\n ),\n MultilineInput(\n name=\"template\",\n display_name=\"Template\",\n info=\"The template to use for formatting the data. \"\n \"It can contain the keys {text}, {data} or any other key in the Data.\",\n value=\"{text}\",\n required=True,\n ),\n StrInput(name=\"sep\", display_name=\"Separator\", advanced=True, value=\"\\n\"),\n ]\n\n outputs = [\n Output(\n display_name=\"Message\",\n name=\"text\",\n info=\"Data as a single Message, with each input Data separated by Separator\",\n method=\"parse_data\",\n ),\n Output(\n display_name=\"Data List\",\n name=\"data_list\",\n info=\"Data as a list of new Data, each having `text` formatted by Template\",\n method=\"parse_data_as_list\",\n ),\n ]\n\n def _clean_args(self) -> tuple[list[Data], str, str]:\n data = self.data if isinstance(self.data, list) else [self.data]\n template = self.template\n sep = self.sep\n return data, template, sep\n\n def parse_data(self) -> Message:\n data, template, sep = self._clean_args()\n result_string = data_to_text(template, data, sep)\n self.status = result_string\n return Message(text=result_string)\n\n def parse_data_as_list(self) -> list[Data]:\n data, template, _ = self._clean_args()\n text_list, data_list = data_to_text_list(template, data)\n for item, text in zip(data_list, text_list, strict=True):\n item.set_text(text)\n self.status = data_list\n return data_list\n" }, "data": { "_input_type": "DataInput", diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Meeting Summary.json b/src/backend/base/langflow/initial_setup/starter_projects/Meeting Summary.json index b2812a874..3ef9aa89f 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Meeting Summary.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Meeting Summary.json @@ -452,7 +452,9 @@ "icon": "message-square", "legacy": false, "lf_version": "1.1.5", - "metadata": {}, + "metadata": { + "legacy_name": "Parse Data" + }, "minimized": false, "output_types": [], "outputs": [ @@ -502,7 +504,7 @@ "show": true, "title_case": false, "type": "code", - "value": "from langflow.custom import Component\nfrom langflow.helpers.data import data_to_text, data_to_text_list\nfrom langflow.io import DataInput, MultilineInput, Output, StrInput\nfrom langflow.schema import Data\nfrom langflow.schema.message import Message\n\n\nclass ParseDataComponent(Component):\n display_name = \"Data to Message\"\n description = \"Convert Data objects into Messages using any {field_name} from input data.\"\n icon = \"message-square\"\n name = \"ParseData\"\n\n inputs = [\n DataInput(name=\"data\", display_name=\"Data\", info=\"The data to convert to text.\", is_list=True, required=True),\n MultilineInput(\n name=\"template\",\n display_name=\"Template\",\n info=\"The template to use for formatting the data. \"\n \"It can contain the keys {text}, {data} or any other key in the Data.\",\n value=\"{text}\",\n required=True,\n ),\n StrInput(name=\"sep\", display_name=\"Separator\", advanced=True, value=\"\\n\"),\n ]\n\n outputs = [\n Output(\n display_name=\"Message\",\n name=\"text\",\n info=\"Data as a single Message, with each input Data separated by Separator\",\n method=\"parse_data\",\n ),\n Output(\n display_name=\"Data List\",\n name=\"data_list\",\n info=\"Data as a list of new Data, each having `text` formatted by Template\",\n method=\"parse_data_as_list\",\n ),\n ]\n\n def _clean_args(self) -> tuple[list[Data], str, str]:\n data = self.data if isinstance(self.data, list) else [self.data]\n template = self.template\n sep = self.sep\n return data, template, sep\n\n def parse_data(self) -> Message:\n data, template, sep = self._clean_args()\n result_string = data_to_text(template, data, sep)\n self.status = result_string\n return Message(text=result_string)\n\n def parse_data_as_list(self) -> list[Data]:\n data, template, _ = self._clean_args()\n text_list, data_list = data_to_text_list(template, data)\n for item, text in zip(data_list, text_list, strict=True):\n item.set_text(text)\n self.status = data_list\n return data_list\n" + "value": "from langflow.custom import Component\nfrom langflow.helpers.data import data_to_text, data_to_text_list\nfrom langflow.io import DataInput, MultilineInput, Output, StrInput\nfrom langflow.schema import Data\nfrom langflow.schema.message import Message\n\n\nclass ParseDataComponent(Component):\n display_name = \"Data to Message\"\n description = \"Convert Data objects into Messages using any {field_name} from input data.\"\n icon = \"message-square\"\n name = \"ParseData\"\n metadata = {\n \"legacy_name\": \"Parse Data\",\n }\n\n inputs = [\n DataInput(\n name=\"data\",\n display_name=\"Data\",\n info=\"The data to convert to text.\",\n is_list=True,\n required=True,\n ),\n MultilineInput(\n name=\"template\",\n display_name=\"Template\",\n info=\"The template to use for formatting the data. \"\n \"It can contain the keys {text}, {data} or any other key in the Data.\",\n value=\"{text}\",\n required=True,\n ),\n StrInput(name=\"sep\", display_name=\"Separator\", advanced=True, value=\"\\n\"),\n ]\n\n outputs = [\n Output(\n display_name=\"Message\",\n name=\"text\",\n info=\"Data as a single Message, with each input Data separated by Separator\",\n method=\"parse_data\",\n ),\n Output(\n display_name=\"Data List\",\n name=\"data_list\",\n info=\"Data as a list of new Data, each having `text` formatted by Template\",\n method=\"parse_data_as_list\",\n ),\n ]\n\n def _clean_args(self) -> tuple[list[Data], str, str]:\n data = self.data if isinstance(self.data, list) else [self.data]\n template = self.template\n sep = self.sep\n return data, template, sep\n\n def parse_data(self) -> Message:\n data, template, sep = self._clean_args()\n result_string = data_to_text(template, data, sep)\n self.status = result_string\n return Message(text=result_string)\n\n def parse_data_as_list(self) -> list[Data]:\n data, template, _ = self._clean_args()\n text_list, data_list = data_to_text_list(template, data)\n for item, text in zip(data_list, text_list, strict=True):\n item.set_text(text)\n self.status = data_list\n return data_list\n" }, "data": { "_input_type": "DataInput", diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Youtube Analysis.json b/src/backend/base/langflow/initial_setup/starter_projects/Youtube Analysis.json index 7441fd150..e72552f6f 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Youtube Analysis.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Youtube Analysis.json @@ -375,11 +375,11 @@ "show": true, "title_case": false, "type": "code", - "value": "from __future__ import annotations\n\nfrom typing import TYPE_CHECKING\n\nfrom langflow.custom import Component\nfrom langflow.io import DataFrameInput, HandleInput, MultilineInput, Output, StrInput\nfrom langflow.schema import DataFrame\n\nif TYPE_CHECKING:\n from langchain_core.runnables import Runnable\n\n\nclass BatchRunComponent(Component):\n display_name = \"Batch Run\"\n description = (\n \"Runs a language model over each row of a DataFrame's text column and returns a new \"\n \"DataFrame with two columns: 'text_input' (the original text) and 'model_response' \"\n \"containing the model's response.\"\n )\n icon = \"List\"\n beta = True\n\n inputs = [\n HandleInput(\n name=\"model\",\n display_name=\"Language Model\",\n info=\"Connect the 'Language Model' output from your LLM component here.\",\n input_types=[\"LanguageModel\"],\n ),\n MultilineInput(\n name=\"system_message\",\n display_name=\"System Message\",\n info=\"Multi-line system instruction for all rows in the DataFrame.\",\n required=False,\n ),\n DataFrameInput(\n name=\"df\",\n display_name=\"DataFrame\",\n info=\"The DataFrame whose column (specified by 'column_name') we'll treat as text messages.\",\n ),\n StrInput(\n name=\"column_name\",\n display_name=\"Column Name\",\n info=\"The name of the DataFrame column to treat as text messages. Default='text'.\",\n value=\"text\",\n ),\n ]\n\n outputs = [\n Output(\n display_name=\"Batch Results\",\n name=\"batch_results\",\n method=\"run_batch\",\n info=\"A DataFrame with two columns: 'text_input' and 'model_response'.\",\n ),\n ]\n\n async def run_batch(self) -> DataFrame:\n \"\"\"For each row in df[column_name], combine that text with system_message, then invoke the model asynchronously.\n\n Returns a new DataFrame of the same length, with columns 'text_input' and 'model_response'.\n \"\"\"\n model: Runnable = self.model\n system_msg = self.system_message or \"\"\n df: DataFrame = self.df\n col_name = self.column_name or \"text\"\n\n if col_name not in df.columns:\n msg = f\"Column '{col_name}' not found in the DataFrame.\"\n raise ValueError(msg)\n\n # Convert the specified column to a list of strings\n user_texts = df[col_name].astype(str).tolist()\n\n # Prepare the batch of conversations\n conversations = [\n [{\"role\": \"system\", \"content\": system_msg}, {\"role\": \"user\", \"content\": text}]\n if system_msg\n else [{\"role\": \"user\", \"content\": text}]\n for text in user_texts\n ]\n model = model.with_config(\n {\n \"run_name\": self.display_name,\n \"project_name\": self.get_project_name(),\n \"callbacks\": self.get_langchain_callbacks(),\n }\n )\n\n responses = await model.abatch(conversations)\n\n # Build the final data, each row has 'text_input' + 'model_response'\n rows = []\n for original_text, response in zip(user_texts, responses, strict=False):\n resp_text = response.content if hasattr(response, \"content\") else str(response)\n\n row = {\"text_input\": original_text, \"model_response\": resp_text}\n rows.append(row)\n\n # Convert to a new DataFrame\n return DataFrame(rows) # Langflow DataFrame from a list of dicts\n" + "value": "from __future__ import annotations\n\nfrom typing import TYPE_CHECKING, Any\n\nfrom loguru import logger\n\nfrom langflow.custom import Component\nfrom langflow.io import (\n BoolInput,\n DataFrameInput,\n HandleInput,\n MessageTextInput,\n MultilineInput,\n Output,\n)\nfrom langflow.schema import DataFrame\n\nif TYPE_CHECKING:\n from langchain_core.runnables import Runnable\n\n\nclass BatchRunComponent(Component):\n display_name = \"Batch Run\"\n description = (\n \"Runs a language model over each row of a DataFrame's text column and returns a new \"\n \"DataFrame with three columns: '**text_input**' (the original text), \"\n \"'**model_response**' (the model's response),and '**batch_index**' (the processing order).\"\n )\n icon = \"List\"\n beta = True\n\n inputs = [\n HandleInput(\n name=\"model\",\n display_name=\"Language Model\",\n info=\"Connect the 'Language Model' output from your LLM component here.\",\n input_types=[\"LanguageModel\"],\n required=True,\n ),\n MultilineInput(\n name=\"system_message\",\n display_name=\"System Message\",\n info=\"Multi-line system instruction for all rows in the DataFrame.\",\n required=False,\n ),\n DataFrameInput(\n name=\"df\",\n display_name=\"DataFrame\",\n info=\"The DataFrame whose column (specified by 'column_name') we'll treat as text messages.\",\n required=True,\n ),\n MessageTextInput(\n name=\"column_name\",\n display_name=\"Column Name\",\n info=\"The name of the DataFrame column to treat as text messages. Default='text'.\",\n value=\"text\",\n required=True,\n advanced=True,\n ),\n BoolInput(\n name=\"enable_metadata\",\n display_name=\"Enable Metadata\",\n info=\"If True, add metadata to the output DataFrame.\",\n value=True,\n required=False,\n advanced=True,\n ),\n ]\n\n outputs = [\n Output(\n display_name=\"Batch Results\",\n name=\"batch_results\",\n method=\"run_batch\",\n info=\"A DataFrame with columns: 'text_input', 'model_response', 'batch_index', and 'metadata'.\",\n ),\n ]\n\n def _create_base_row(self, text_input: str = \"\", model_response: str = \"\", batch_index: int = -1) -> dict[str, Any]:\n \"\"\"Create a base row with optional metadata.\"\"\"\n return {\n \"text_input\": text_input,\n \"model_response\": model_response,\n \"batch_index\": batch_index,\n }\n\n def _add_metadata(\n self, row: dict[str, Any], *, success: bool = True, system_msg: str = \"\", error: str | None = None\n ) -> None:\n \"\"\"Add metadata to a row if enabled.\"\"\"\n if not self.enable_metadata:\n return\n\n if success:\n row[\"metadata\"] = {\n \"has_system_message\": bool(system_msg),\n \"input_length\": len(row[\"text_input\"]),\n \"response_length\": len(row[\"model_response\"]),\n \"processing_status\": \"success\",\n }\n else:\n row[\"metadata\"] = {\n \"error\": error,\n \"processing_status\": \"failed\",\n }\n\n async def run_batch(self) -> DataFrame:\n \"\"\"Process each row in df[column_name] with the language model asynchronously.\n\n Returns:\n DataFrame: A new DataFrame containing:\n - text_input: The original input text\n - model_response: The model's response\n - batch_index: The processing order\n - metadata: Additional processing information\n\n Raises:\n ValueError: If the specified column is not found in the DataFrame\n TypeError: If the model is not compatible or input types are wrong\n \"\"\"\n model: Runnable = self.model\n system_msg = self.system_message or \"\"\n df: DataFrame = self.df\n col_name = self.column_name or \"text\"\n\n # Validate inputs first\n if not isinstance(df, DataFrame):\n msg = f\"Expected DataFrame input, got {type(df)}\"\n raise TypeError(msg)\n\n if col_name not in df.columns:\n msg = f\"Column '{col_name}' not found in the DataFrame. Available columns: {', '.join(df.columns)}\"\n raise ValueError(msg)\n\n try:\n # Convert the specified column to a list of strings\n user_texts = df[col_name].astype(str).tolist()\n total_rows = len(user_texts)\n\n logger.info(f\"Processing {total_rows} rows with batch run\")\n\n # Prepare the batch of conversations\n conversations = [\n [{\"role\": \"system\", \"content\": system_msg}, {\"role\": \"user\", \"content\": text}]\n if system_msg\n else [{\"role\": \"user\", \"content\": text}]\n for text in user_texts\n ]\n\n # Configure the model with project info and callbacks\n model = model.with_config(\n {\n \"run_name\": self.display_name,\n \"project_name\": self.get_project_name(),\n \"callbacks\": self.get_langchain_callbacks(),\n }\n )\n\n # Process batches and track progress\n responses_with_idx = [\n (idx, response)\n for idx, response in zip(\n range(len(conversations)), await model.abatch(list(conversations)), strict=True\n )\n ]\n\n # Sort by index to maintain order\n responses_with_idx.sort(key=lambda x: x[0])\n\n # Build the final data with enhanced metadata\n rows: list[dict[str, Any]] = []\n for idx, response in responses_with_idx:\n resp_text = response.content if hasattr(response, \"content\") else str(response)\n row = self._create_base_row(\n text_input=user_texts[idx],\n model_response=resp_text,\n batch_index=idx,\n )\n self._add_metadata(row, success=True, system_msg=system_msg)\n rows.append(row)\n\n # Log progress\n if (idx + 1) % max(1, total_rows // 10) == 0:\n logger.info(f\"Processed {idx + 1}/{total_rows} rows\")\n\n logger.info(\"Batch processing completed successfully\")\n return DataFrame(rows)\n\n except (KeyError, AttributeError) as e:\n # Handle data structure and attribute access errors\n logger.error(f\"Data processing error: {e!s}\")\n error_row = self._create_base_row()\n self._add_metadata(error_row, success=False, error=str(e))\n return DataFrame([error_row])\n" }, "column_name": { "_input_type": "StrInput", - "advanced": false, + "advanced": true, "display_name": "Column Name", "dynamic": false, "info": "The name of the DataFrame column to treat as text messages. Default='text'.", @@ -388,7 +388,7 @@ "load_from_db": false, "name": "column_name", "placeholder": "", - "required": false, + "required": true, "show": true, "title_case": false, "tool_mode": false, @@ -409,7 +409,7 @@ "list_add_label": "Add More", "name": "df", "placeholder": "", - "required": false, + "required": true, "show": true, "title_case": false, "tool_mode": false, @@ -418,6 +418,24 @@ "type": "other", "value": "" }, + "enable_metadata": { + "_input_type": "BoolInput", + "advanced": true, + "display_name": "Enable Metadata", + "dynamic": false, + "info": "If True, add metadata to the output DataFrame.", + "list": false, + "list_add_label": "Add More", + "name": "enable_metadata", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "bool", + "value": true + }, "model": { "_input_type": "HandleInput", "advanced": false, @@ -431,7 +449,7 @@ "list_add_label": "Add More", "name": "model", "placeholder": "", - "required": false, + "required": true, "show": true, "title_case": false, "trace_as_metadata": true, diff --git a/src/backend/base/langflow/services/database/service.py b/src/backend/base/langflow/services/database/service.py index 32140c361..2facacf7a 100644 --- a/src/backend/base/langflow/services/database/service.py +++ b/src/backend/base/langflow/services/database/service.py @@ -124,6 +124,15 @@ class DatabaseService(Service): # if the user specifies an empty dict, we allow it. kwargs = self._build_connection_kwargs() + poolclass_key = kwargs.get("poolclass") + if poolclass_key is not None: + pool_class = getattr(sa, poolclass_key, None) + if pool_class and isinstance(pool_class(), sa.pool.Pool): + logger.debug(f"Using poolclass: {poolclass_key}.") + kwargs["poolclass"] = pool_class + else: + logger.error(f"Invalid poolclass '{poolclass_key}' specified. Using default pool class.") + return create_async_engine( self.database_url, connect_args=self._get_connect_args(), @@ -136,16 +145,18 @@ class DatabaseService(Service): return self._create_engine() def _get_connect_args(self): - if self.settings_service.settings.database_url and self.settings_service.settings.database_url.startswith( - "sqlite" - ): - connect_args = { + settings = self.settings_service.settings + + if settings.db_driver_connection_settings is not None: + return settings.db_driver_connection_settings + + if settings.database_url and settings.database_url.startswith("sqlite"): + return { "check_same_thread": False, - "timeout": self.settings_service.settings.db_connect_timeout, + "timeout": settings.db_connect_timeout, } - else: - connect_args = {} - return connect_args + + return {} def on_connection(self, dbapi_connection, _connection_record) -> None: if isinstance(dbapi_connection, sqlite3.Connection | dialect_sqlite.aiosqlite.AsyncAdapt_aiosqlite_connection): diff --git a/src/backend/base/langflow/services/settings/base.py b/src/backend/base/langflow/services/settings/base.py index bb023775f..e716b3cf6 100644 --- a/src/backend/base/langflow/services/settings/base.py +++ b/src/backend/base/langflow/services/settings/base.py @@ -90,6 +90,9 @@ class Settings(BaseSettings): sqlite_pragmas: dict | None = {"synchronous": "NORMAL", "journal_mode": "WAL"} """SQLite pragmas to use when connecting to the database.""" + db_driver_connection_settings: dict | None = None + """Database driver connection settings.""" + db_connection_settings: dict | None = { "pool_size": 20, # Match the pool_size above "max_overflow": 30, # Match the max_overflow above