From b8e7a77d78a1148c783e80b2347ab5c843acfbb8 Mon Sep 17 00:00:00 2001 From: Cezar Vasconcelos <97035956+vasconceloscezar@users.noreply.github.com> Date: Thu, 3 Oct 2024 12:48:57 -0300 Subject: [PATCH] feat: Add new Data utility components for CSV/JSON parsing, routing, and filtering (#3776) * feat: Add CurrentDateComponent for timezone-based date * feat: Add DataConditionalRouter component * feat: Add DataFilterComponent for filtering data * feat(components): Add beta and name attributes to components * feat: Add JSON to Data component * feat: Add CSV to Data component * feat(helpers): Add ExtractKey component for key extraction * feat: Add list processing to DataConditionalRouter * [autofix.ci] apply automated fixes * feat: add MessageToData component * feat(CSVtoData, JSONtoData): Add file input support * [autofix.ci] apply automated fixes * [autofix.ci] apply automated fixes (attempt 2/3) * Refactor error messages and improve code readability in data components utilities --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Gabriel Luiz Freitas Almeida --- .../langflow/components/helpers/CSVtoData.py | 92 +++++++++++++ .../components/helpers/CurrentDate.py | 73 +++++++++++ .../helpers/DataConditionalRouter.py | 124 ++++++++++++++++++ .../langflow/components/helpers/ExtractKey.py | 53 ++++++++ .../components/helpers/FilterDataValues.py | 82 ++++++++++++ .../langflow/components/helpers/JSONtoData.py | 100 ++++++++++++++ .../components/helpers/MessageToData.py | 40 ++++++ 7 files changed, 564 insertions(+) create mode 100644 src/backend/base/langflow/components/helpers/CSVtoData.py create mode 100644 src/backend/base/langflow/components/helpers/CurrentDate.py create mode 100644 src/backend/base/langflow/components/helpers/DataConditionalRouter.py create mode 100644 src/backend/base/langflow/components/helpers/ExtractKey.py create mode 100644 src/backend/base/langflow/components/helpers/FilterDataValues.py create mode 100644 src/backend/base/langflow/components/helpers/JSONtoData.py create mode 100644 src/backend/base/langflow/components/helpers/MessageToData.py diff --git a/src/backend/base/langflow/components/helpers/CSVtoData.py b/src/backend/base/langflow/components/helpers/CSVtoData.py new file mode 100644 index 000000000..7356133dc --- /dev/null +++ b/src/backend/base/langflow/components/helpers/CSVtoData.py @@ -0,0 +1,92 @@ +import csv +import io +from pathlib import Path + +from langflow.custom import Component +from langflow.io import FileInput, MessageTextInput, MultilineInput, Output +from langflow.schema import Data + + +class CSVToDataComponent(Component): + display_name = "CSV to Data List" + description = "Load a CSV file, CSV from a file path, or a valid CSV string and convert it to a list of Data" + icon = "file-spreadsheet" + beta = True + name = "CSVtoData" + + inputs = [ + FileInput( + name="csv_file", + display_name="CSV File", + file_types=["csv"], + info="Upload a CSV file to convert to a list of Data objects", + ), + MessageTextInput( + name="csv_path", + display_name="CSV File Path", + info="Provide the path to the CSV file as pure text", + ), + MultilineInput( + name="csv_string", + display_name="CSV String", + info="Paste a CSV string directly to convert to a list of Data objects", + ), + ] + + outputs = [ + Output(name="data_list", display_name="Data List", method="load_csv_to_data"), + ] + + def load_csv_to_data(self) -> list[Data]: + try: + if sum(bool(field) for field in [self.csv_file, self.csv_path, self.csv_string]) != 1: + msg = "Please provide exactly one of: CSV file, file path, or CSV string." + raise ValueError(msg) + + csv_data = None + + if self.csv_file: + resolved_path = self.resolve_path(self.csv_file) + file_path = Path(resolved_path) + if file_path.suffix.lower() != ".csv": + msg = "The provided file must be a CSV file." + raise ValueError(msg) + with open(file_path, newline="", encoding="utf-8") as csvfile: + csv_data = csvfile.read() + + elif self.csv_path: + file_path = Path(self.csv_path) + if file_path.suffix.lower() != ".csv": + msg = "The provided file must be a CSV file." + raise ValueError(msg) + with open(file_path, newline="", encoding="utf-8") as csvfile: + csv_data = csvfile.read() + + elif self.csv_string: + csv_data = self.csv_string + + if not csv_data: + msg = "No CSV data provided." + raise ValueError(msg) + + result = [] + csv_reader = csv.DictReader(io.StringIO(csv_data)) + for row in csv_reader: + result.append(Data(data=row)) + + if not result: + self.status = "The CSV data is empty." + return [] + + self.status = result + return result + + except csv.Error as e: + error_message = f"CSV parsing error: {str(e)}" + self.status = error_message + raise ValueError(error_message) from e + + except Exception as e: + error_message = f"An error occurred: {str(e)}" + self.status = error_message + raise ValueError(error_message) from e diff --git a/src/backend/base/langflow/components/helpers/CurrentDate.py b/src/backend/base/langflow/components/helpers/CurrentDate.py new file mode 100644 index 000000000..915ae07c2 --- /dev/null +++ b/src/backend/base/langflow/components/helpers/CurrentDate.py @@ -0,0 +1,73 @@ +from datetime import datetime +from zoneinfo import ZoneInfo + +from langflow.custom import Component +from langflow.io import DropdownInput, Output +from langflow.schema.message import Message + + +class CurrentDateComponent(Component): + display_name = "Current Date" + description = "Returns the current date and time in the selected timezone." + icon = "clock" + beta = True + name = "CurrentDate" + + inputs = [ + DropdownInput( + name="timezone", + display_name="Timezone", + options=[ + "UTC", + "US/Eastern", + "US/Central", + "US/Mountain", + "US/Pacific", + "Europe/London", + "Europe/Paris", + "Europe/Berlin", + "Europe/Moscow", + "Asia/Tokyo", + "Asia/Shanghai", + "Asia/Singapore", + "Asia/Dubai", + "Australia/Sydney", + "Australia/Melbourne", + "Pacific/Auckland", + "America/Sao_Paulo", + "America/Mexico_City", + "America/Toronto", + "America/Vancouver", + "Africa/Cairo", + "Africa/Johannesburg", + "Atlantic/Reykjavik", + "Indian/Maldives", + "America/Bogota", + "America/Lima", + "America/Santiago", + "America/Buenos_Aires", + "America/Caracas", + "America/La_Paz", + "America/Montevideo", + "America/Asuncion", + "America/Cuiaba", + ], + value="UTC", + info="Select the timezone for the current date and time.", + ), + ] + outputs = [ + Output(display_name="Current Date", name="current_date", method="get_current_date"), + ] + + def get_current_date(self) -> Message: + try: + tz = ZoneInfo(self.timezone) + current_date = datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S %Z") + result = f"Current date and time in {self.timezone}: {current_date}" + self.status = result + return Message(text=result) + except Exception as e: + error_message = f"Error: {str(e)}" + self.status = error_message + return Message(text=error_message) diff --git a/src/backend/base/langflow/components/helpers/DataConditionalRouter.py b/src/backend/base/langflow/components/helpers/DataConditionalRouter.py new file mode 100644 index 000000000..e77809c20 --- /dev/null +++ b/src/backend/base/langflow/components/helpers/DataConditionalRouter.py @@ -0,0 +1,124 @@ +from typing import Any + +from langflow.custom import Component +from langflow.io import DataInput, DropdownInput, MessageTextInput, Output +from langflow.schema import Data, dotdict + + +class DataConditionalRouterComponent(Component): + display_name = "Data Conditional Router" + description = "Route Data object(s) based on a condition applied to a specified key, including boolean validation." + icon = "split" + beta = True + name = "DataConditionalRouter" + + inputs = [ + DataInput( + name="data_input", + display_name="Data Input", + info="The Data object or list of Data objects to process", + is_list=True, + ), + MessageTextInput( + name="key_name", + display_name="Key Name", + info="The name of the key in the Data object(s) to check", + ), + DropdownInput( + name="operator", + display_name="Comparison Operator", + options=["equals", "not equals", "contains", "starts with", "ends with", "boolean validator"], + info="The operator to apply for comparing the values. 'boolean validator' treats the value as a boolean.", + value="equals", + ), + MessageTextInput( + name="compare_value", + display_name="Compare Value", + info="The value to compare against (not used for boolean validator)", + ), + ] + + outputs = [ + Output(display_name="True Output", name="true_output", method="process_data"), + Output(display_name="False Output", name="false_output", method="process_data"), + ] + + def compare_values(self, item_value: str, compare_value: str, operator: str) -> bool: + if operator == "equals": + return item_value == compare_value + if operator == "not equals": + return item_value != compare_value + if operator == "contains": + return compare_value in item_value + if operator == "starts with": + return item_value.startswith(compare_value) + if operator == "ends with": + return item_value.endswith(compare_value) + if operator == "boolean validator": + return self.parse_boolean(item_value) + return False + + def parse_boolean(self, value): + if isinstance(value, bool): + return value + if isinstance(value, str): + return value.lower() in ["true", "1", "yes", "y", "on"] + return bool(value) + + def validate_input(self, data_item: Data) -> bool: + if not isinstance(data_item, Data): + self.status = "Input is not a Data object" + return False + if self.key_name not in data_item.data: + self.status = f"Key '{self.key_name}' not found in Data" + return False + return True + + def process_data(self) -> Data | list[Data]: + if isinstance(self.data_input, list): + true_output = [] + false_output = [] + for item in self.data_input: + if self.validate_input(item): + result = self.process_single_data(item) + if result: + true_output.append(item) + else: + false_output.append(item) + self.stop("false_output" if true_output else "true_output") + return true_output if true_output else false_output + if not self.validate_input(self.data_input): + return Data(data={"error": self.status}) + result = self.process_single_data(self.data_input) + self.stop("false_output" if result else "true_output") + return self.data_input + + def process_single_data(self, data_item: Data) -> bool: + item_value = data_item.data[self.key_name] + operator = self.operator + + if operator == "boolean validator": + condition_met = self.parse_boolean(item_value) + condition_description = f"Boolean validation of '{self.key_name}'" + else: + compare_value = self.compare_value + condition_met = self.compare_values(str(item_value), compare_value, operator) + condition_description = f"{self.key_name} {operator} {compare_value}" + + if condition_met: + self.status = f"Condition met: {condition_description}" + return True + self.status = f"Condition not met: {condition_description}" + return False + + def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None): + if field_name == "operator": + if field_value == "boolean validator": + build_config["compare_value"]["show"] = False + build_config["compare_value"]["advanced"] = True + build_config["compare_value"]["value"] = None + else: + build_config["compare_value"]["show"] = True + build_config["compare_value"]["advanced"] = False + + return build_config diff --git a/src/backend/base/langflow/components/helpers/ExtractKey.py b/src/backend/base/langflow/components/helpers/ExtractKey.py new file mode 100644 index 000000000..46f18b962 --- /dev/null +++ b/src/backend/base/langflow/components/helpers/ExtractKey.py @@ -0,0 +1,53 @@ +from langflow.custom import Component +from langflow.io import DataInput, Output, StrInput +from langflow.schema import Data + + +class ExtractDataKeyComponent(Component): + display_name = "Extract Key" + description = ( + "Extract a specific key from a Data object or a list of " + "Data objects and return the extracted value(s) as Data object(s)." + ) + icon = "key" + beta = True + name = "ExtractaKey" + + inputs = [ + DataInput( + name="data_input", + display_name="Data Input", + info="The Data object or list of Data objects to extract the key from.", + ), + StrInput( + name="key", + display_name="Key to Extract", + info="The key in the Data object(s) to extract.", + ), + ] + + outputs = [ + Output(display_name="Extracted Data", name="extracted_data", method="extract_key"), + ] + + def extract_key(self) -> Data | list[Data]: + key = self.key + + if isinstance(self.data_input, list): + result = [] + for item in self.data_input: + if isinstance(item, Data) and key in item.data: + extracted_value = item.data[key] + result.append(Data(data={key: extracted_value})) + self.status = result + return result + if isinstance(self.data_input, Data): + if key in self.data_input.data: + extracted_value = self.data_input.data[key] + result = Data(data={key: extracted_value}) + self.status = result + return result + self.status = f"Key '{key}' not found in Data object." + return Data(data={"error": f"Key '{key}' not found in Data object."}) + self.status = "Invalid input. Expected Data object or list of Data objects." + return Data(data={"error": "Invalid input. Expected Data object or list of Data objects."}) diff --git a/src/backend/base/langflow/components/helpers/FilterDataValues.py b/src/backend/base/langflow/components/helpers/FilterDataValues.py new file mode 100644 index 000000000..19e88367f --- /dev/null +++ b/src/backend/base/langflow/components/helpers/FilterDataValues.py @@ -0,0 +1,82 @@ +from typing import Any + +from langflow.custom import Component +from langflow.io import DataInput, DropdownInput, MessageInput, Output +from langflow.schema import Data + + +class DataFilterComponent(Component): + display_name = "Filter Data Values" + description = ( + "Filter a list of data items based on a specified key, filter value," + " and comparison operator. Check advanced options to select match comparision." + ) + icon = "filter" + beta = True + name = "FilterDataValues" + + inputs = [ + DataInput(name="input_data", display_name="Input Data", info="The list of data items to filter.", is_list=True), + MessageInput( + name="filter_key", display_name="Filter Key", info="The key to filter on (e.g., 'route').", value="route" + ), + MessageInput( + name="filter_value", + display_name="Filter Value", + info="The value to filter by (e.g., 'CMIP').", + value="CMIP", + ), + DropdownInput( + name="operator", + display_name="Comparison Operator", + options=["equals", "not equals", "contains", "starts with", "ends with"], + info="The operator to apply for comparing the values.", + value="equals", + advanced=True, + ), + ] + + outputs = [ + Output(display_name="Filtered Data", name="filtered_data", method="filter_data"), + ] + + def compare_values(self, item_value: Any, filter_value: str, operator: str) -> bool: + if operator == "equals": + return str(item_value) == filter_value + if operator == "not equals": + return str(item_value) != filter_value + if operator == "contains": + return filter_value in str(item_value) + if operator == "starts with": + return str(item_value).startswith(filter_value) + if operator == "ends with": + return str(item_value).endswith(filter_value) + return False + + def filter_data(self) -> list[Data]: + # Extract inputs + input_data: list[Data] = self.input_data + filter_key: str = self.filter_key.text + filter_value: str = self.filter_value.text + operator: str = self.operator + + # Validate inputs + if not input_data: + self.status = "Input data is empty." + return [] + + if not filter_key or not filter_value: + self.status = "Filter key or value is missing." + return input_data + + # Filter the data + filtered_data = [] + for item in input_data: + if isinstance(item.data, dict) and filter_key in item.data: + if self.compare_values(item.data[filter_key], filter_value, operator): + filtered_data.append(item) + else: + self.status = f"Warning: Some items don't have the key '{filter_key}' or are not dictionaries." + + self.status = filtered_data + return filtered_data diff --git a/src/backend/base/langflow/components/helpers/JSONtoData.py b/src/backend/base/langflow/components/helpers/JSONtoData.py new file mode 100644 index 000000000..62482501e --- /dev/null +++ b/src/backend/base/langflow/components/helpers/JSONtoData.py @@ -0,0 +1,100 @@ +import json +from pathlib import Path + +from json_repair import repair_json + +from langflow.custom import Component +from langflow.io import FileInput, MessageTextInput, MultilineInput, Output +from langflow.schema import Data + + +class JSONToDataComponent(Component): + display_name = "JSON to Data" + description = ( + "Convert a JSON file, JSON from a file path, or a JSON string to a Data object or a list of Data objects" + ) + icon = "braces" + beta = True + name = "JSONtoData" + + inputs = [ + FileInput( + name="json_file", + display_name="JSON File", + file_types=["json"], + info="Upload a JSON file to convert to a Data object or list of Data objects", + ), + MessageTextInput( + name="json_path", + display_name="JSON File Path", + info="Provide the path to the JSON file as pure text", + ), + MultilineInput( + name="json_string", + display_name="JSON String", + info="Enter a valid JSON string (object or array) to convert to a Data object or list of Data objects", + ), + ] + + outputs = [ + Output(name="data", display_name="Data", method="convert_json_to_data"), + ] + + def convert_json_to_data(self) -> Data | list[Data]: + try: + if sum(bool(field) for field in [self.json_file, self.json_path, self.json_string]) != 1: + msg = "Please provide exactly one of: JSON file, file path, or JSON string." + raise ValueError(msg) + + json_data = None + + if self.json_file: + resolved_path = self.resolve_path(self.json_file) + file_path = Path(resolved_path) + if file_path.suffix.lower() != ".json": + msg = "The provided file must be a JSON file." + raise ValueError(msg) + with open(file_path, encoding="utf-8") as jsonfile: + json_data = jsonfile.read() + + elif self.json_path: + file_path = Path(self.json_path) + if file_path.suffix.lower() != ".json": + msg = "The provided file must be a JSON file." + raise ValueError(msg) + with open(file_path, encoding="utf-8") as jsonfile: + json_data = jsonfile.read() + + elif self.json_string: + json_data = self.json_string + + if not json_data: + msg = "No JSON data provided." + raise ValueError(msg) + + # Try to parse the JSON string + try: + parsed_data = json.loads(json_data) + except json.JSONDecodeError: + # If JSON parsing fails, try to repair the JSON string + repaired_json_string = repair_json(json_data) + parsed_data = json.loads(repaired_json_string) + + # Check if the parsed data is a list + if isinstance(parsed_data, list): + result = [Data(data=item) for item in parsed_data] + else: + result = Data(data=parsed_data) + + self.status = result + return result + + except (json.JSONDecodeError, SyntaxError, ValueError) as e: + error_message = f"Invalid JSON or Python literal: {str(e)}" + self.status = error_message + raise ValueError(error_message) from e + + except Exception as e: + error_message = f"An error occurred: {str(e)}" + self.status = error_message + raise ValueError(error_message) from e diff --git a/src/backend/base/langflow/components/helpers/MessageToData.py b/src/backend/base/langflow/components/helpers/MessageToData.py new file mode 100644 index 000000000..bc13f9fbc --- /dev/null +++ b/src/backend/base/langflow/components/helpers/MessageToData.py @@ -0,0 +1,40 @@ +from langflow.custom import Component +from langflow.io import MessageInput, Output +from langflow.schema import Data +from langflow.schema.message import Message + + +class MessageToDataComponent(Component): + display_name = "Message to Data" + description = "Convert a Message object to a Data object" + icon = "message-square-share" + beta = True + name = "MessagetoData" + + inputs = [ + MessageInput( + name="message", + display_name="Message", + info="The Message object to convert to a Data object", + ), + ] + + outputs = [ + Output(display_name="Data", name="data", method="convert_message_to_data"), + ] + + def convert_message_to_data(self) -> Data: + try: + if not isinstance(self.message, Message): + msg = "Input must be a Message object" + raise ValueError(msg) + + # Convert Message to Data + data = Data(data=self.message.data) + + self.status = "Successfully converted Message to Data" + return data + except Exception as e: + error_message = f"Error converting Message to Data: {str(e)}" + self.status = error_message + return Data(data={"error": error_message})