diff --git a/src/backend/base/langflow/base/processing/__init__.py b/src/backend/base/langflow/base/processing/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/backend/base/langflow/components/processing/__init__.py b/src/backend/base/langflow/components/processing/__init__.py index e525b7de9..440da9431 100644 --- a/src/backend/base/langflow/components/processing/__init__.py +++ b/src/backend/base/langflow/components/processing/__init__.py @@ -1,5 +1,6 @@ from .alter_metadata import AlterMetadataComponent from .combine_text import CombineTextComponent +from .converter import TypeConverterComponent from .create_data import CreateDataComponent from .data_operations import DataOperationsComponent from .extract_key import ExtractDataKeyComponent @@ -38,5 +39,6 @@ __all__ = [ "RegexExtractorComponent", "SelectDataComponent", "SplitTextComponent", + "TypeConverterComponent", "UpdateDataComponent", ] diff --git a/src/backend/base/langflow/components/processing/converter.py b/src/backend/base/langflow/components/processing/converter.py new file mode 100644 index 000000000..e768baff7 --- /dev/null +++ b/src/backend/base/langflow/components/processing/converter.py @@ -0,0 +1,107 @@ +from typing import Any + +from langflow.custom import Component +from langflow.io import HandleInput, Output, TabInput +from langflow.schema import Data, DataFrame, Message + + +def convert_to_message(v) -> Message: + """Convert input to Message type. + + Args: + v: Input to convert (Message, Data, DataFrame, or dict) + + Returns: + Message: Converted Message object + """ + return v if isinstance(v, Message) else v.to_message() + + +def convert_to_data(v: DataFrame | Data | Message | dict) -> Data: + """Convert input to Data type. + + Args: + v: Input to convert (Message, Data, DataFrame, or dict) + + Returns: + Data: Converted Data object + """ + if isinstance(v, dict): + return Data(v) + return v if isinstance(v, Data) else v.to_data() + + +def convert_to_dataframe(v: DataFrame | Data | Message | dict) -> DataFrame: + """Convert input to DataFrame type. + + Args: + v: Input to convert (Message, Data, DataFrame, or dict) + + Returns: + DataFrame: Converted DataFrame object + """ + if isinstance(v, dict): + return DataFrame([v]) + return v if isinstance(v, DataFrame) else v.to_dataframe() + + +class TypeConverterComponent(Component): + display_name = "Type Convert" + description = "Convert between different types (Message, Data, DataFrame)" + icon = "repeat" + + inputs = [ + HandleInput( + name="input_data", + display_name="Input", + input_types=["Message", "Data", "DataFrame"], + info="Accept Message, Data or DataFrame as input", + required=True, + ), + TabInput( + name="output_type", + display_name="Output Type", + options=["Message", "Data", "DataFrame"], + info="Select the desired output data type", + real_time_refresh=True, + value="Message", + ), + ] + + outputs = [Output(display_name="Message Output", name="message_output", method="convert_to_message")] + + def update_outputs(self, frontend_node: dict, field_name: str, field_value: Any) -> dict: + """Dynamically show only the relevant output based on the selected output type.""" + if field_name == "output_type": + # Start with empty outputs + frontend_node["outputs"] = [] + + # Add only the selected output type + if field_value == "Message": + frontend_node["outputs"].append( + Output(display_name="Message Output", name="message_output", method="convert_to_message").to_dict() + ) + elif field_value == "Data": + frontend_node["outputs"].append( + Output(display_name="Data Output", name="data_output", method="convert_to_data").to_dict() + ) + elif field_value == "DataFrame": + frontend_node["outputs"].append( + Output( + display_name="DataFrame Output", name="dataframe_output", method="convert_to_dataframe" + ).to_dict() + ) + + return frontend_node + + def convert_to_message(self) -> Message: + """Convert input to Message type.""" + return convert_to_message(self.input_data[0] if isinstance(self.input_data, list) else self.input_data) + + def convert_to_data(self) -> Data: + """Convert input to Data type.""" + return convert_to_data(self.input_data[0] if isinstance(self.input_data, list) else self.input_data) + + def convert_to_dataframe(self) -> DataFrame: + """Convert input to DataFrame type.""" + return convert_to_dataframe(self.input_data[0] if isinstance(self.input_data, list) else self.input_data) diff --git a/src/backend/base/langflow/schema/data.py b/src/backend/base/langflow/schema/data.py index 25316375b..43bd25377 100644 --- a/src/backend/base/langflow/schema/data.py +++ b/src/backend/base/langflow/schema/data.py @@ -1,8 +1,10 @@ +from __future__ import annotations + import copy import json from datetime import datetime, timezone from decimal import Decimal -from typing import cast +from typing import TYPE_CHECKING, cast from uuid import UUID from langchain_core.documents import Document @@ -13,6 +15,10 @@ from pydantic import BaseModel, ConfigDict, model_serializer, model_validator from langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_USER from langflow.utils.image import create_data_url +if TYPE_CHECKING: + from langflow.schema.dataframe import DataFrame + from langflow.schema.message import Message + class Data(BaseModel): """Represents a record with text and optional data. @@ -81,7 +87,7 @@ class Data(BaseModel): return new_text @classmethod - def from_document(cls, document: Document) -> "Data": + def from_document(cls, document: Document) -> Data: """Converts a Document to a Data. Args: @@ -95,7 +101,7 @@ class Data(BaseModel): return cls(data=data, text_key="text") @classmethod - def from_lc_message(cls, message: BaseMessage) -> "Data": + def from_lc_message(cls, message: BaseMessage) -> Data: """Converts a BaseMessage to a Data. Args: @@ -108,7 +114,7 @@ class Data(BaseModel): data["metadata"] = cast("dict", message.to_json()) return cls(data=data, text_key="text") - def __add__(self, other: "Data") -> "Data": + def __add__(self, other: Data) -> Data: """Combines the data of two data by attempting to add values for overlapping keys. Combines the data of two data by attempting to add values for overlapping keys @@ -235,7 +241,7 @@ class Data(BaseModel): def __eq__(self, /, other): return isinstance(other, Data) and self.data == other.data - def filter_data(self, filter_str: str) -> "Data": + def filter_data(self, filter_str: str) -> Data: """Filters the data dictionary based on the filter string. Args: @@ -248,6 +254,26 @@ class Data(BaseModel): return apply_json_filter(self.data, filter_str) + def to_message(self) -> Message: + from langflow.schema.message import Message # Local import to avoid circular import + + if self.text_key in self.data: + return Message(text=self.get_text()) + return Message(text=str(self.data)) + + def to_dataframe(self) -> DataFrame: + from langflow.schema.dataframe import DataFrame # Local import to avoid circular import + + data_dict = self.data + # If data contains only one key and the value is a list of dictionaries, convert to DataFrame + if ( + len(data_dict) == 1 + and isinstance(next(iter(data_dict.values())), list) + and all(isinstance(item, dict) for item in next(iter(data_dict.values()))) + ): + return DataFrame(data=next(iter(data_dict.values()))) + return DataFrame(data=[self]) + def custom_serializer(obj): if isinstance(obj, datetime): diff --git a/src/backend/base/langflow/schema/dataframe.py b/src/backend/base/langflow/schema/dataframe.py index 9316e09a1..3bfb9cff3 100644 --- a/src/backend/base/langflow/schema/dataframe.py +++ b/src/backend/base/langflow/schema/dataframe.py @@ -5,6 +5,7 @@ from langchain_core.documents import Document from pandas import DataFrame as pandas_DataFrame from langflow.schema.data import Data +from langflow.schema.message import Message class DataFrame(pandas_DataFrame): @@ -178,3 +179,28 @@ class DataFrame(pandas_DataFrame): if not isinstance(other, DataFrame | pd.DataFrame): # Non-DataFrame case return False return super().__eq__(other) + + def to_data(self) -> Data: + """Convert this DataFrame to a Data object. + + Returns: + Data: A Data object containing the DataFrame records under 'results' key. + """ + dict_list = self.to_dict(orient="records") + return Data(data={"results": dict_list}) + + def to_message(self) -> Message: + from langflow.schema.message import Message # Local import to avoid circular import + + # Process DataFrame similar to the _safe_convert method + # Remove empty rows + processed_df = self.dropna(how="all") + # Remove empty lines in each cell + processed_df = processed_df.replace(r"^\s*$", "", regex=True) + # Replace multiple newlines with a single newline + processed_df = processed_df.replace(r"\n+", "\n", regex=True) + # Replace pipe characters to avoid markdown table issues + processed_df = processed_df.replace(r"\|", r"\\|", regex=True) + processed_df = processed_df.map(lambda x: str(x).replace("\n", "
") if isinstance(x, str) else x) + # Convert to markdown and wrap in a Message + return Message(text=processed_df.to_markdown(index=False)) diff --git a/src/backend/base/langflow/schema/message.py b/src/backend/base/langflow/schema/message.py index 8b276ef5a..90e8834ae 100644 --- a/src/backend/base/langflow/schema/message.py +++ b/src/backend/base/langflow/schema/message.py @@ -6,7 +6,7 @@ import re import traceback from collections.abc import AsyncIterator, Iterator from datetime import datetime, timezone -from typing import Annotated, Any, Literal +from typing import TYPE_CHECKING, Annotated, Any, Literal from uuid import UUID from fastapi.encoders import jsonable_encoder @@ -31,6 +31,9 @@ from langflow.utils.constants import ( ) from langflow.utils.image import create_data_url +if TYPE_CHECKING: + from langflow.schema.dataframe import DataFrame + class Message(Data): model_config = ConfigDict(arbitrary_types_allowed=True) @@ -276,6 +279,14 @@ class Message(Data): return await asyncio.to_thread(cls, **kwargs) return cls(**kwargs) + def to_data(self) -> Data: + return Data(data=self.data) + + def to_dataframe(self) -> DataFrame: + from langflow.schema.dataframe import DataFrame # Local import to avoid circular import + + return DataFrame(data=[self]) + class DefaultModel(BaseModel): class Config: diff --git a/src/backend/tests/unit/components/processing/test_type_converter_component.py b/src/backend/tests/unit/components/processing/test_type_converter_component.py new file mode 100644 index 000000000..2a09b015b --- /dev/null +++ b/src/backend/tests/unit/components/processing/test_type_converter_component.py @@ -0,0 +1,115 @@ +import pandas as pd +import pytest +from langflow.components.processing.converter import TypeConverterComponent +from langflow.schema.data import Data +from langflow.schema.dataframe import DataFrame +from langflow.schema.message import Message + +from tests.base import ComponentTestBaseWithoutClient + + +class TestTypeConverterComponent(ComponentTestBaseWithoutClient): + @pytest.fixture + def component_class(self): + """Return the component class to test.""" + return TypeConverterComponent + + @pytest.fixture + def file_names_mapping(self): + """Return an empty list since this component doesn't have version-specific files.""" + return [] + + # Message to other types + def test_message_to_message(self, component_class): + """Test converting Message to Message.""" + component = component_class(input_data=Message(text="Hello World"), output_type="Message") + result = component.convert_to_message() + assert isinstance(result, Message) + assert result.text == "Hello World" + + def test_message_to_data(self, component_class): + """Test converting Message to Data.""" + component = component_class(input_data=Message(text="Hello"), output_type="Data") + result = component.convert_to_data() + assert isinstance(result, Data) + assert "text" in result.data + assert result.data["text"] == "Hello" + + def test_message_to_dataframe(self, component_class): + """Test converting Message to DataFrame.""" + component = component_class(input_data=Message(text="Hello"), output_type="DataFrame") + result = component.convert_to_dataframe() + assert isinstance(result, DataFrame) + assert "text" in result.columns + assert result.iloc[0]["text"] == "Hello" + + # Data to other types + def test_data_to_message(self, component_class): + """Test converting Data to Message.""" + component = component_class(input_data=Data(data={"text": "Hello World"}), output_type="Message") + result = component.convert_to_message() + assert isinstance(result, Message) + assert result.text == "Hello World" + + def test_data_to_data(self, component_class): + """Test converting Data to Data.""" + component = component_class(input_data=Data(data={"key": "value"}), output_type="Data") + result = component.convert_to_data() + assert isinstance(result, Data) + assert result.data == {"key": "value"} + + def test_data_to_dataframe(self, component_class): + """Test converting Data to DataFrame.""" + component = component_class(input_data=Data(data={"text": "Hello World"}), output_type="DataFrame") + result = component.convert_to_dataframe() + assert isinstance(result, DataFrame) + assert "text" in result.columns + assert result.iloc[0]["text"] == "Hello World" + + # DataFrame to other types + def test_dataframe_to_message(self, component_class): + """Test converting DataFrame to Message.""" + df_data = pd.DataFrame({"col1": ["Hello"], "col2": ["World"]}) + component = component_class(input_data=DataFrame(data=df_data), output_type="Message") + result = component.convert_to_message() + assert isinstance(result, Message) + assert result.text == "| col1 | col2 |\n|:-------|:-------|\n| Hello | World |" + + def test_dataframe_to_data(self, component_class): + """Test converting DataFrame to Data.""" + df_data = pd.DataFrame({"col1": ["Hello"]}) + component = component_class(input_data=DataFrame(data=df_data), output_type="Data") + result = component.convert_to_data() + assert isinstance(result, Data) + assert isinstance(result.data, dict) + + def test_dataframe_to_dataframe(self, component_class): + """Test converting DataFrame to DataFrame.""" + df_data = pd.DataFrame({"col1": ["Hello"], "col2": ["World"]}) + component = component_class(input_data=DataFrame(data=df_data), output_type="DataFrame") + result = component.convert_to_dataframe() + assert isinstance(result, DataFrame) + assert "col1" in result.columns + assert "col2" in result.columns + assert result.iloc[0]["col1"] == "Hello" + assert result.iloc[0]["col2"] == "World" + + def test_update_outputs(self, component_class): + """Test the update_outputs method.""" + component = component_class(input_data=Message(text="Hello"), output_type="Message") + frontend_node = {"outputs": []} + + # Test with Message output + updated = component.update_outputs(frontend_node, "output_type", "Message") + assert len(updated["outputs"]) == 1 + assert updated["outputs"][0]["name"] == "message_output" + + # Test with Data output + updated = component.update_outputs(frontend_node, "output_type", "Data") + assert len(updated["outputs"]) == 1 + assert updated["outputs"][0]["name"] == "data_output" + + # Test with DataFrame output + updated = component.update_outputs(frontend_node, "output_type", "DataFrame") + assert len(updated["outputs"]) == 1 + assert updated["outputs"][0]["name"] == "dataframe_output"