feat: add convert component with dynamic output support (#7773)

* Create convertor.py

* [autofix.ci] apply automated fixes

* convert component

* [autofix.ci] apply automated fixes

* add Type_conversion base class with dispatchers for performance based conversion

* fix lint issues

* add type_convertor test

* [autofix.ci] apply automated fixes

* update tests

* fix tests

* update with auto conversion methods

* update function to component file

* feat: enhance input validation for Data, DataFrame, and Message types

* test: add unit tests for DataInput, MessageInput, and DataFrameInput data conversion

* updated changes to use type classes

* [autofix.ci] apply automated fixes

* add convert logic

* update converts

* Update converter.py

* [autofix.ci] apply automated fixes

* revert converter.py

* Update inputs.py

* Update test_inputs.py

* update to logic

* Update test_type_convertor_component.py

* update converter

* [autofix.ci] apply automated fixes

* refactor: rename conversion functions for clarity

Updated function names for converting inputs to Message, Data, and DataFrame types to improve readability and consistency. The changes include renaming `get_message_converter` to `convert_to_message`, `get_data_converter` to `convert_to_data`, and `get_dataframe_converter` to `convert_to_dataframe`. Additionally, added a check for dictionary input in the data conversion function.

* fix: add TYPE_CHECKING for conditional imports in message.py

Introduced TYPE_CHECKING to optimize imports for the DataFrame type, ensuring that the import only occurs during type checking. This change enhances performance and maintains compatibility with static type checkers.

* refactor: simplify data conversion methods in Message class

Removed unnecessary parameters from the `to_data` and `to_dataframe` methods in the Message class, enhancing clarity and reducing complexity. The methods now directly use instance attributes, improving code readability and maintainability.

* refactor: enhance DataFrame methods for clarity and type safety

Updated the `to_data` and `to_message` methods in the DataFrame class to improve clarity and type safety. The `to_data` method now directly converts the DataFrame to a Data object without parameters, and the `to_message` method uses the instance's data directly. Added TYPE_CHECKING for conditional imports to optimize performance and maintain compatibility with static type checkers.

* refactor: streamline Data class methods for improved clarity

Refactored the `to_message` and `to_dataframe` methods in the Data class to eliminate unnecessary parameters and directly utilize instance attributes. This change enhances code readability and maintainability while ensuring type safety with the appropriate imports for Message and DataFrame. Additionally, updated the logic to access instance data more intuitively.

* refactor: simplify conversion method calls by removing redundant arguments

* rename test file

* refactor: remove obsolete test file for data conversion

* refactor: add support for converting dictionary to DataFrame

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
Co-authored-by: Carlos Coelho <80289056+carlosrcoelho@users.noreply.github.com>
Co-authored-by: Yuqi Tang <yuqi.tang@datastax.com>
This commit is contained in:
Edwin Jose 2025-06-02 16:06:56 -04:00 committed by GitHub
commit 360836800d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 293 additions and 6 deletions

View file

@ -1,5 +1,6 @@
from .alter_metadata import AlterMetadataComponent
from .combine_text import CombineTextComponent
from .converter import TypeConverterComponent
from .create_data import CreateDataComponent
from .data_operations import DataOperationsComponent
from .extract_key import ExtractDataKeyComponent
@ -38,5 +39,6 @@ __all__ = [
"RegexExtractorComponent",
"SelectDataComponent",
"SplitTextComponent",
"TypeConverterComponent",
"UpdateDataComponent",
]

View file

@ -0,0 +1,107 @@
from typing import Any
from langflow.custom import Component
from langflow.io import HandleInput, Output, TabInput
from langflow.schema import Data, DataFrame, Message
def convert_to_message(v) -> Message:
"""Convert input to Message type.
Args:
v: Input to convert (Message, Data, DataFrame, or dict)
Returns:
Message: Converted Message object
"""
return v if isinstance(v, Message) else v.to_message()
def convert_to_data(v: DataFrame | Data | Message | dict) -> Data:
"""Convert input to Data type.
Args:
v: Input to convert (Message, Data, DataFrame, or dict)
Returns:
Data: Converted Data object
"""
if isinstance(v, dict):
return Data(v)
return v if isinstance(v, Data) else v.to_data()
def convert_to_dataframe(v: DataFrame | Data | Message | dict) -> DataFrame:
"""Convert input to DataFrame type.
Args:
v: Input to convert (Message, Data, DataFrame, or dict)
Returns:
DataFrame: Converted DataFrame object
"""
if isinstance(v, dict):
return DataFrame([v])
return v if isinstance(v, DataFrame) else v.to_dataframe()
class TypeConverterComponent(Component):
display_name = "Type Convert"
description = "Convert between different types (Message, Data, DataFrame)"
icon = "repeat"
inputs = [
HandleInput(
name="input_data",
display_name="Input",
input_types=["Message", "Data", "DataFrame"],
info="Accept Message, Data or DataFrame as input",
required=True,
),
TabInput(
name="output_type",
display_name="Output Type",
options=["Message", "Data", "DataFrame"],
info="Select the desired output data type",
real_time_refresh=True,
value="Message",
),
]
outputs = [Output(display_name="Message Output", name="message_output", method="convert_to_message")]
def update_outputs(self, frontend_node: dict, field_name: str, field_value: Any) -> dict:
"""Dynamically show only the relevant output based on the selected output type."""
if field_name == "output_type":
# Start with empty outputs
frontend_node["outputs"] = []
# Add only the selected output type
if field_value == "Message":
frontend_node["outputs"].append(
Output(display_name="Message Output", name="message_output", method="convert_to_message").to_dict()
)
elif field_value == "Data":
frontend_node["outputs"].append(
Output(display_name="Data Output", name="data_output", method="convert_to_data").to_dict()
)
elif field_value == "DataFrame":
frontend_node["outputs"].append(
Output(
display_name="DataFrame Output", name="dataframe_output", method="convert_to_dataframe"
).to_dict()
)
return frontend_node
def convert_to_message(self) -> Message:
"""Convert input to Message type."""
return convert_to_message(self.input_data[0] if isinstance(self.input_data, list) else self.input_data)
def convert_to_data(self) -> Data:
"""Convert input to Data type."""
return convert_to_data(self.input_data[0] if isinstance(self.input_data, list) else self.input_data)
def convert_to_dataframe(self) -> DataFrame:
"""Convert input to DataFrame type."""
return convert_to_dataframe(self.input_data[0] if isinstance(self.input_data, list) else self.input_data)

View file

@ -1,8 +1,10 @@
from __future__ import annotations
import copy
import json
from datetime import datetime, timezone
from decimal import Decimal
from typing import cast
from typing import TYPE_CHECKING, cast
from uuid import UUID
from langchain_core.documents import Document
@ -13,6 +15,10 @@ from pydantic import BaseModel, ConfigDict, model_serializer, model_validator
from langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_USER
from langflow.utils.image import create_data_url
if TYPE_CHECKING:
from langflow.schema.dataframe import DataFrame
from langflow.schema.message import Message
class Data(BaseModel):
"""Represents a record with text and optional data.
@ -81,7 +87,7 @@ class Data(BaseModel):
return new_text
@classmethod
def from_document(cls, document: Document) -> "Data":
def from_document(cls, document: Document) -> Data:
"""Converts a Document to a Data.
Args:
@ -95,7 +101,7 @@ class Data(BaseModel):
return cls(data=data, text_key="text")
@classmethod
def from_lc_message(cls, message: BaseMessage) -> "Data":
def from_lc_message(cls, message: BaseMessage) -> Data:
"""Converts a BaseMessage to a Data.
Args:
@ -108,7 +114,7 @@ class Data(BaseModel):
data["metadata"] = cast("dict", message.to_json())
return cls(data=data, text_key="text")
def __add__(self, other: "Data") -> "Data":
def __add__(self, other: Data) -> Data:
"""Combines the data of two data by attempting to add values for overlapping keys.
Combines the data of two data by attempting to add values for overlapping keys
@ -235,7 +241,7 @@ class Data(BaseModel):
def __eq__(self, /, other):
return isinstance(other, Data) and self.data == other.data
def filter_data(self, filter_str: str) -> "Data":
def filter_data(self, filter_str: str) -> Data:
"""Filters the data dictionary based on the filter string.
Args:
@ -248,6 +254,26 @@ class Data(BaseModel):
return apply_json_filter(self.data, filter_str)
def to_message(self) -> Message:
from langflow.schema.message import Message # Local import to avoid circular import
if self.text_key in self.data:
return Message(text=self.get_text())
return Message(text=str(self.data))
def to_dataframe(self) -> DataFrame:
from langflow.schema.dataframe import DataFrame # Local import to avoid circular import
data_dict = self.data
# If data contains only one key and the value is a list of dictionaries, convert to DataFrame
if (
len(data_dict) == 1
and isinstance(next(iter(data_dict.values())), list)
and all(isinstance(item, dict) for item in next(iter(data_dict.values())))
):
return DataFrame(data=next(iter(data_dict.values())))
return DataFrame(data=[self])
def custom_serializer(obj):
if isinstance(obj, datetime):

View file

@ -5,6 +5,7 @@ from langchain_core.documents import Document
from pandas import DataFrame as pandas_DataFrame
from langflow.schema.data import Data
from langflow.schema.message import Message
class DataFrame(pandas_DataFrame):
@ -178,3 +179,28 @@ class DataFrame(pandas_DataFrame):
if not isinstance(other, DataFrame | pd.DataFrame): # Non-DataFrame case
return False
return super().__eq__(other)
def to_data(self) -> Data:
"""Convert this DataFrame to a Data object.
Returns:
Data: A Data object containing the DataFrame records under 'results' key.
"""
dict_list = self.to_dict(orient="records")
return Data(data={"results": dict_list})
def to_message(self) -> Message:
from langflow.schema.message import Message # Local import to avoid circular import
# Process DataFrame similar to the _safe_convert method
# Remove empty rows
processed_df = self.dropna(how="all")
# Remove empty lines in each cell
processed_df = processed_df.replace(r"^\s*$", "", regex=True)
# Replace multiple newlines with a single newline
processed_df = processed_df.replace(r"\n+", "\n", regex=True)
# Replace pipe characters to avoid markdown table issues
processed_df = processed_df.replace(r"\|", r"\\|", regex=True)
processed_df = processed_df.map(lambda x: str(x).replace("\n", "<br/>") if isinstance(x, str) else x)
# Convert to markdown and wrap in a Message
return Message(text=processed_df.to_markdown(index=False))

View file

@ -6,7 +6,7 @@ import re
import traceback
from collections.abc import AsyncIterator, Iterator
from datetime import datetime, timezone
from typing import Annotated, Any, Literal
from typing import TYPE_CHECKING, Annotated, Any, Literal
from uuid import UUID
from fastapi.encoders import jsonable_encoder
@ -31,6 +31,9 @@ from langflow.utils.constants import (
)
from langflow.utils.image import create_data_url
if TYPE_CHECKING:
from langflow.schema.dataframe import DataFrame
class Message(Data):
model_config = ConfigDict(arbitrary_types_allowed=True)
@ -276,6 +279,14 @@ class Message(Data):
return await asyncio.to_thread(cls, **kwargs)
return cls(**kwargs)
def to_data(self) -> Data:
return Data(data=self.data)
def to_dataframe(self) -> DataFrame:
from langflow.schema.dataframe import DataFrame # Local import to avoid circular import
return DataFrame(data=[self])
class DefaultModel(BaseModel):
class Config:

View file

@ -0,0 +1,115 @@
import pandas as pd
import pytest
from langflow.components.processing.converter import TypeConverterComponent
from langflow.schema.data import Data
from langflow.schema.dataframe import DataFrame
from langflow.schema.message import Message
from tests.base import ComponentTestBaseWithoutClient
class TestTypeConverterComponent(ComponentTestBaseWithoutClient):
@pytest.fixture
def component_class(self):
"""Return the component class to test."""
return TypeConverterComponent
@pytest.fixture
def file_names_mapping(self):
"""Return an empty list since this component doesn't have version-specific files."""
return []
# Message to other types
def test_message_to_message(self, component_class):
"""Test converting Message to Message."""
component = component_class(input_data=Message(text="Hello World"), output_type="Message")
result = component.convert_to_message()
assert isinstance(result, Message)
assert result.text == "Hello World"
def test_message_to_data(self, component_class):
"""Test converting Message to Data."""
component = component_class(input_data=Message(text="Hello"), output_type="Data")
result = component.convert_to_data()
assert isinstance(result, Data)
assert "text" in result.data
assert result.data["text"] == "Hello"
def test_message_to_dataframe(self, component_class):
"""Test converting Message to DataFrame."""
component = component_class(input_data=Message(text="Hello"), output_type="DataFrame")
result = component.convert_to_dataframe()
assert isinstance(result, DataFrame)
assert "text" in result.columns
assert result.iloc[0]["text"] == "Hello"
# Data to other types
def test_data_to_message(self, component_class):
"""Test converting Data to Message."""
component = component_class(input_data=Data(data={"text": "Hello World"}), output_type="Message")
result = component.convert_to_message()
assert isinstance(result, Message)
assert result.text == "Hello World"
def test_data_to_data(self, component_class):
"""Test converting Data to Data."""
component = component_class(input_data=Data(data={"key": "value"}), output_type="Data")
result = component.convert_to_data()
assert isinstance(result, Data)
assert result.data == {"key": "value"}
def test_data_to_dataframe(self, component_class):
"""Test converting Data to DataFrame."""
component = component_class(input_data=Data(data={"text": "Hello World"}), output_type="DataFrame")
result = component.convert_to_dataframe()
assert isinstance(result, DataFrame)
assert "text" in result.columns
assert result.iloc[0]["text"] == "Hello World"
# DataFrame to other types
def test_dataframe_to_message(self, component_class):
"""Test converting DataFrame to Message."""
df_data = pd.DataFrame({"col1": ["Hello"], "col2": ["World"]})
component = component_class(input_data=DataFrame(data=df_data), output_type="Message")
result = component.convert_to_message()
assert isinstance(result, Message)
assert result.text == "| col1 | col2 |\n|:-------|:-------|\n| Hello | World |"
def test_dataframe_to_data(self, component_class):
"""Test converting DataFrame to Data."""
df_data = pd.DataFrame({"col1": ["Hello"]})
component = component_class(input_data=DataFrame(data=df_data), output_type="Data")
result = component.convert_to_data()
assert isinstance(result, Data)
assert isinstance(result.data, dict)
def test_dataframe_to_dataframe(self, component_class):
"""Test converting DataFrame to DataFrame."""
df_data = pd.DataFrame({"col1": ["Hello"], "col2": ["World"]})
component = component_class(input_data=DataFrame(data=df_data), output_type="DataFrame")
result = component.convert_to_dataframe()
assert isinstance(result, DataFrame)
assert "col1" in result.columns
assert "col2" in result.columns
assert result.iloc[0]["col1"] == "Hello"
assert result.iloc[0]["col2"] == "World"
def test_update_outputs(self, component_class):
"""Test the update_outputs method."""
component = component_class(input_data=Message(text="Hello"), output_type="Message")
frontend_node = {"outputs": []}
# Test with Message output
updated = component.update_outputs(frontend_node, "output_type", "Message")
assert len(updated["outputs"]) == 1
assert updated["outputs"][0]["name"] == "message_output"
# Test with Data output
updated = component.update_outputs(frontend_node, "output_type", "Data")
assert len(updated["outputs"]) == 1
assert updated["outputs"][0]["name"] == "data_output"
# Test with DataFrame output
updated = component.update_outputs(frontend_node, "output_type", "DataFrame")
assert len(updated["outputs"]) == 1
assert updated["outputs"][0]["name"] == "dataframe_output"