feat: Add new Data utility components for CSV/JSON parsing, routing, and filtering (#3776)

* feat: Add CurrentDateComponent for timezone-based date

* feat: Add DataConditionalRouter component

* feat: Add DataFilterComponent for filtering data

* feat(components): Add beta and name attributes to components

* feat: Add JSON to Data component

* feat: Add CSV to Data component

* feat(helpers): Add ExtractKey component for key extraction

* feat: Add list processing to DataConditionalRouter

* [autofix.ci] apply automated fixes

* feat: add MessageToData component

* feat(CSVtoData, JSONtoData): Add file input support

* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes (attempt 2/3)

* Refactor error messages and improve code readability in data components utilities

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
This commit is contained in:
Cezar Vasconcelos 2024-10-03 12:48:57 -03:00 committed by GitHub
commit b8e7a77d78
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 564 additions and 0 deletions

View file

@ -0,0 +1,92 @@
import csv
import io
from pathlib import Path
from langflow.custom import Component
from langflow.io import FileInput, MessageTextInput, MultilineInput, Output
from langflow.schema import Data
class CSVToDataComponent(Component):
display_name = "CSV to Data List"
description = "Load a CSV file, CSV from a file path, or a valid CSV string and convert it to a list of Data"
icon = "file-spreadsheet"
beta = True
name = "CSVtoData"
inputs = [
FileInput(
name="csv_file",
display_name="CSV File",
file_types=["csv"],
info="Upload a CSV file to convert to a list of Data objects",
),
MessageTextInput(
name="csv_path",
display_name="CSV File Path",
info="Provide the path to the CSV file as pure text",
),
MultilineInput(
name="csv_string",
display_name="CSV String",
info="Paste a CSV string directly to convert to a list of Data objects",
),
]
outputs = [
Output(name="data_list", display_name="Data List", method="load_csv_to_data"),
]
def load_csv_to_data(self) -> list[Data]:
try:
if sum(bool(field) for field in [self.csv_file, self.csv_path, self.csv_string]) != 1:
msg = "Please provide exactly one of: CSV file, file path, or CSV string."
raise ValueError(msg)
csv_data = None
if self.csv_file:
resolved_path = self.resolve_path(self.csv_file)
file_path = Path(resolved_path)
if file_path.suffix.lower() != ".csv":
msg = "The provided file must be a CSV file."
raise ValueError(msg)
with open(file_path, newline="", encoding="utf-8") as csvfile:
csv_data = csvfile.read()
elif self.csv_path:
file_path = Path(self.csv_path)
if file_path.suffix.lower() != ".csv":
msg = "The provided file must be a CSV file."
raise ValueError(msg)
with open(file_path, newline="", encoding="utf-8") as csvfile:
csv_data = csvfile.read()
elif self.csv_string:
csv_data = self.csv_string
if not csv_data:
msg = "No CSV data provided."
raise ValueError(msg)
result = []
csv_reader = csv.DictReader(io.StringIO(csv_data))
for row in csv_reader:
result.append(Data(data=row))
if not result:
self.status = "The CSV data is empty."
return []
self.status = result
return result
except csv.Error as e:
error_message = f"CSV parsing error: {str(e)}"
self.status = error_message
raise ValueError(error_message) from e
except Exception as e:
error_message = f"An error occurred: {str(e)}"
self.status = error_message
raise ValueError(error_message) from e

View file

@ -0,0 +1,73 @@
from datetime import datetime
from zoneinfo import ZoneInfo
from langflow.custom import Component
from langflow.io import DropdownInput, Output
from langflow.schema.message import Message
class CurrentDateComponent(Component):
display_name = "Current Date"
description = "Returns the current date and time in the selected timezone."
icon = "clock"
beta = True
name = "CurrentDate"
inputs = [
DropdownInput(
name="timezone",
display_name="Timezone",
options=[
"UTC",
"US/Eastern",
"US/Central",
"US/Mountain",
"US/Pacific",
"Europe/London",
"Europe/Paris",
"Europe/Berlin",
"Europe/Moscow",
"Asia/Tokyo",
"Asia/Shanghai",
"Asia/Singapore",
"Asia/Dubai",
"Australia/Sydney",
"Australia/Melbourne",
"Pacific/Auckland",
"America/Sao_Paulo",
"America/Mexico_City",
"America/Toronto",
"America/Vancouver",
"Africa/Cairo",
"Africa/Johannesburg",
"Atlantic/Reykjavik",
"Indian/Maldives",
"America/Bogota",
"America/Lima",
"America/Santiago",
"America/Buenos_Aires",
"America/Caracas",
"America/La_Paz",
"America/Montevideo",
"America/Asuncion",
"America/Cuiaba",
],
value="UTC",
info="Select the timezone for the current date and time.",
),
]
outputs = [
Output(display_name="Current Date", name="current_date", method="get_current_date"),
]
def get_current_date(self) -> Message:
try:
tz = ZoneInfo(self.timezone)
current_date = datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S %Z")
result = f"Current date and time in {self.timezone}: {current_date}"
self.status = result
return Message(text=result)
except Exception as e:
error_message = f"Error: {str(e)}"
self.status = error_message
return Message(text=error_message)

View file

@ -0,0 +1,124 @@
from typing import Any
from langflow.custom import Component
from langflow.io import DataInput, DropdownInput, MessageTextInput, Output
from langflow.schema import Data, dotdict
class DataConditionalRouterComponent(Component):
display_name = "Data Conditional Router"
description = "Route Data object(s) based on a condition applied to a specified key, including boolean validation."
icon = "split"
beta = True
name = "DataConditionalRouter"
inputs = [
DataInput(
name="data_input",
display_name="Data Input",
info="The Data object or list of Data objects to process",
is_list=True,
),
MessageTextInput(
name="key_name",
display_name="Key Name",
info="The name of the key in the Data object(s) to check",
),
DropdownInput(
name="operator",
display_name="Comparison Operator",
options=["equals", "not equals", "contains", "starts with", "ends with", "boolean validator"],
info="The operator to apply for comparing the values. 'boolean validator' treats the value as a boolean.",
value="equals",
),
MessageTextInput(
name="compare_value",
display_name="Compare Value",
info="The value to compare against (not used for boolean validator)",
),
]
outputs = [
Output(display_name="True Output", name="true_output", method="process_data"),
Output(display_name="False Output", name="false_output", method="process_data"),
]
def compare_values(self, item_value: str, compare_value: str, operator: str) -> bool:
if operator == "equals":
return item_value == compare_value
if operator == "not equals":
return item_value != compare_value
if operator == "contains":
return compare_value in item_value
if operator == "starts with":
return item_value.startswith(compare_value)
if operator == "ends with":
return item_value.endswith(compare_value)
if operator == "boolean validator":
return self.parse_boolean(item_value)
return False
def parse_boolean(self, value):
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.lower() in ["true", "1", "yes", "y", "on"]
return bool(value)
def validate_input(self, data_item: Data) -> bool:
if not isinstance(data_item, Data):
self.status = "Input is not a Data object"
return False
if self.key_name not in data_item.data:
self.status = f"Key '{self.key_name}' not found in Data"
return False
return True
def process_data(self) -> Data | list[Data]:
if isinstance(self.data_input, list):
true_output = []
false_output = []
for item in self.data_input:
if self.validate_input(item):
result = self.process_single_data(item)
if result:
true_output.append(item)
else:
false_output.append(item)
self.stop("false_output" if true_output else "true_output")
return true_output if true_output else false_output
if not self.validate_input(self.data_input):
return Data(data={"error": self.status})
result = self.process_single_data(self.data_input)
self.stop("false_output" if result else "true_output")
return self.data_input
def process_single_data(self, data_item: Data) -> bool:
item_value = data_item.data[self.key_name]
operator = self.operator
if operator == "boolean validator":
condition_met = self.parse_boolean(item_value)
condition_description = f"Boolean validation of '{self.key_name}'"
else:
compare_value = self.compare_value
condition_met = self.compare_values(str(item_value), compare_value, operator)
condition_description = f"{self.key_name} {operator} {compare_value}"
if condition_met:
self.status = f"Condition met: {condition_description}"
return True
self.status = f"Condition not met: {condition_description}"
return False
def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
if field_name == "operator":
if field_value == "boolean validator":
build_config["compare_value"]["show"] = False
build_config["compare_value"]["advanced"] = True
build_config["compare_value"]["value"] = None
else:
build_config["compare_value"]["show"] = True
build_config["compare_value"]["advanced"] = False
return build_config

View file

@ -0,0 +1,53 @@
from langflow.custom import Component
from langflow.io import DataInput, Output, StrInput
from langflow.schema import Data
class ExtractDataKeyComponent(Component):
display_name = "Extract Key"
description = (
"Extract a specific key from a Data object or a list of "
"Data objects and return the extracted value(s) as Data object(s)."
)
icon = "key"
beta = True
name = "ExtractaKey"
inputs = [
DataInput(
name="data_input",
display_name="Data Input",
info="The Data object or list of Data objects to extract the key from.",
),
StrInput(
name="key",
display_name="Key to Extract",
info="The key in the Data object(s) to extract.",
),
]
outputs = [
Output(display_name="Extracted Data", name="extracted_data", method="extract_key"),
]
def extract_key(self) -> Data | list[Data]:
key = self.key
if isinstance(self.data_input, list):
result = []
for item in self.data_input:
if isinstance(item, Data) and key in item.data:
extracted_value = item.data[key]
result.append(Data(data={key: extracted_value}))
self.status = result
return result
if isinstance(self.data_input, Data):
if key in self.data_input.data:
extracted_value = self.data_input.data[key]
result = Data(data={key: extracted_value})
self.status = result
return result
self.status = f"Key '{key}' not found in Data object."
return Data(data={"error": f"Key '{key}' not found in Data object."})
self.status = "Invalid input. Expected Data object or list of Data objects."
return Data(data={"error": "Invalid input. Expected Data object or list of Data objects."})

View file

@ -0,0 +1,82 @@
from typing import Any
from langflow.custom import Component
from langflow.io import DataInput, DropdownInput, MessageInput, Output
from langflow.schema import Data
class DataFilterComponent(Component):
display_name = "Filter Data Values"
description = (
"Filter a list of data items based on a specified key, filter value,"
" and comparison operator. Check advanced options to select match comparision."
)
icon = "filter"
beta = True
name = "FilterDataValues"
inputs = [
DataInput(name="input_data", display_name="Input Data", info="The list of data items to filter.", is_list=True),
MessageInput(
name="filter_key", display_name="Filter Key", info="The key to filter on (e.g., 'route').", value="route"
),
MessageInput(
name="filter_value",
display_name="Filter Value",
info="The value to filter by (e.g., 'CMIP').",
value="CMIP",
),
DropdownInput(
name="operator",
display_name="Comparison Operator",
options=["equals", "not equals", "contains", "starts with", "ends with"],
info="The operator to apply for comparing the values.",
value="equals",
advanced=True,
),
]
outputs = [
Output(display_name="Filtered Data", name="filtered_data", method="filter_data"),
]
def compare_values(self, item_value: Any, filter_value: str, operator: str) -> bool:
if operator == "equals":
return str(item_value) == filter_value
if operator == "not equals":
return str(item_value) != filter_value
if operator == "contains":
return filter_value in str(item_value)
if operator == "starts with":
return str(item_value).startswith(filter_value)
if operator == "ends with":
return str(item_value).endswith(filter_value)
return False
def filter_data(self) -> list[Data]:
# Extract inputs
input_data: list[Data] = self.input_data
filter_key: str = self.filter_key.text
filter_value: str = self.filter_value.text
operator: str = self.operator
# Validate inputs
if not input_data:
self.status = "Input data is empty."
return []
if not filter_key or not filter_value:
self.status = "Filter key or value is missing."
return input_data
# Filter the data
filtered_data = []
for item in input_data:
if isinstance(item.data, dict) and filter_key in item.data:
if self.compare_values(item.data[filter_key], filter_value, operator):
filtered_data.append(item)
else:
self.status = f"Warning: Some items don't have the key '{filter_key}' or are not dictionaries."
self.status = filtered_data
return filtered_data

View file

@ -0,0 +1,100 @@
import json
from pathlib import Path
from json_repair import repair_json
from langflow.custom import Component
from langflow.io import FileInput, MessageTextInput, MultilineInput, Output
from langflow.schema import Data
class JSONToDataComponent(Component):
display_name = "JSON to Data"
description = (
"Convert a JSON file, JSON from a file path, or a JSON string to a Data object or a list of Data objects"
)
icon = "braces"
beta = True
name = "JSONtoData"
inputs = [
FileInput(
name="json_file",
display_name="JSON File",
file_types=["json"],
info="Upload a JSON file to convert to a Data object or list of Data objects",
),
MessageTextInput(
name="json_path",
display_name="JSON File Path",
info="Provide the path to the JSON file as pure text",
),
MultilineInput(
name="json_string",
display_name="JSON String",
info="Enter a valid JSON string (object or array) to convert to a Data object or list of Data objects",
),
]
outputs = [
Output(name="data", display_name="Data", method="convert_json_to_data"),
]
def convert_json_to_data(self) -> Data | list[Data]:
try:
if sum(bool(field) for field in [self.json_file, self.json_path, self.json_string]) != 1:
msg = "Please provide exactly one of: JSON file, file path, or JSON string."
raise ValueError(msg)
json_data = None
if self.json_file:
resolved_path = self.resolve_path(self.json_file)
file_path = Path(resolved_path)
if file_path.suffix.lower() != ".json":
msg = "The provided file must be a JSON file."
raise ValueError(msg)
with open(file_path, encoding="utf-8") as jsonfile:
json_data = jsonfile.read()
elif self.json_path:
file_path = Path(self.json_path)
if file_path.suffix.lower() != ".json":
msg = "The provided file must be a JSON file."
raise ValueError(msg)
with open(file_path, encoding="utf-8") as jsonfile:
json_data = jsonfile.read()
elif self.json_string:
json_data = self.json_string
if not json_data:
msg = "No JSON data provided."
raise ValueError(msg)
# Try to parse the JSON string
try:
parsed_data = json.loads(json_data)
except json.JSONDecodeError:
# If JSON parsing fails, try to repair the JSON string
repaired_json_string = repair_json(json_data)
parsed_data = json.loads(repaired_json_string)
# Check if the parsed data is a list
if isinstance(parsed_data, list):
result = [Data(data=item) for item in parsed_data]
else:
result = Data(data=parsed_data)
self.status = result
return result
except (json.JSONDecodeError, SyntaxError, ValueError) as e:
error_message = f"Invalid JSON or Python literal: {str(e)}"
self.status = error_message
raise ValueError(error_message) from e
except Exception as e:
error_message = f"An error occurred: {str(e)}"
self.status = error_message
raise ValueError(error_message) from e

View file

@ -0,0 +1,40 @@
from langflow.custom import Component
from langflow.io import MessageInput, Output
from langflow.schema import Data
from langflow.schema.message import Message
class MessageToDataComponent(Component):
display_name = "Message to Data"
description = "Convert a Message object to a Data object"
icon = "message-square-share"
beta = True
name = "MessagetoData"
inputs = [
MessageInput(
name="message",
display_name="Message",
info="The Message object to convert to a Data object",
),
]
outputs = [
Output(display_name="Data", name="data", method="convert_message_to_data"),
]
def convert_message_to_data(self) -> Data:
try:
if not isinstance(self.message, Message):
msg = "Input must be a Message object"
raise ValueError(msg)
# Convert Message to Data
data = Data(data=self.message.data)
self.status = "Successfully converted Message to Data"
return data
except Exception as e:
error_message = f"Error converting Message to Data: {str(e)}"
self.status = error_message
return Data(data={"error": error_message})