feat(dataframe): add support for DataFrame outputs across multiple components (#5589)

* add dataframe outputs to vector stores, directory, url, split text

* add dataframe import

* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes (attempt 2/3)

* add parse dataframe

* [autofix.ci] apply automated fixes

* Refactor: Update DataFrame handling in components

- Added import of DataFrame in directory and url components.
- Renamed variable 'df' to 'dataframe' in ParseDataFrameComponent for clarity.
- Updated method _clean_args and parse_data to use 'dataframe' instead of 'df' for consistency.

These changes enhance code readability and maintainability by standardizing the terminology used for DataFrame objects.

* [autofix.ci] apply automated fixes

* remove parse dataframe

* Add tests for URL component functionality and data handling

* Enhance DirectoryComponent tests with new functionality and parameters

- Added tests for loading files with specific types and handling hidden files.
- Implemented tests for directory loading with depth and multithreading support.
- Introduced a new test for converting directory contents to a DataFrame.
- Updated existing tests to include additional parameters like 'silent_errors' and 'types'.

These changes improve test coverage and ensure the DirectoryComponent behaves as expected under various conditions.

* update retrieve_file_paths for backwards compatibility

* Refactor DirectoryComponent to handle file types more robustly

- Removed the default assignment of TEXT_FILE_TYPES to 'types' and added logic to use all supported types if none are specified.
- Implemented validation to ensure only valid file types are processed, improving error handling.
- Updated the file retrieval process to utilize the filtered list of valid types.

These changes enhance the flexibility and reliability of the DirectoryComponent's file loading functionality.

* Refactor and simplify tests in test_data_components.py

- Removed multiple tests related to HTTP requests, including successful and failed GET requests, timeouts, and multiple URL handling, to streamline the test suite.
- Cleaned up imports and unnecessary mock setups to enhance readability and maintainability.
- Focused on retaining essential tests for DirectoryComponent and URLComponent functionality, ensuring core features are still validated.

These changes improve the clarity and efficiency of the test suite while maintaining coverage for critical components.

* Add unit tests for DirectoryComponent functionality

- Introduced a new test file for DirectoryComponent, enhancing test coverage.
- Implemented various tests to validate loading files with specific types, handling hidden files, and supporting multithreading.
- Added tests for directory loading with depth and converting directory contents to a DataFrame.
- Ensured tests cover different scenarios, including recursive loading and file type filtering.

These changes improve the robustness and reliability of the DirectoryComponent's functionality through comprehensive testing.

* Add unit tests for URLComponent functionality

- Introduced a new test file for URLComponent, enhancing test coverage for its methods.
- Implemented tests for fetching content from valid URLs, handling multiple URLs, and validating error handling for invalid URLs.
- Added tests for converting fetched content to a DataFrame and ensuring correct message formatting.
- Mocked web requests to simulate various scenarios, ensuring robust testing of URLComponent's functionality.

These changes improve the reliability and correctness of the URLComponent through comprehensive testing.

* Add unit tests for SplitTextComponent functionality

- Introduced a new test file for SplitTextComponent, enhancing test coverage for its methods.
- Implemented tests for basic text splitting, handling overlaps, custom separators, and preserving metadata.
- Added tests for converting split text results to a DataFrame and handling empty input.
- Ensured functionality for single and multiple input texts is validated.

These changes improve the reliability and correctness of the SplitTextComponent through comprehensive testing.

* Add comment to ignore FBT001 in retrieve_file_paths function

* Validate specified file types in DirectoryComponent and raise ValueError for invalid types

* Fix type hint in DataFrame constructor to support list of dicts or Data objects. This change enhances type safety and clarity in the DataFrame initialization process.

* Enhance DirectoryComponent tests to validate error handling for invalid file types

- Removed the test case for 'exe' file type from valid scenarios.
- Added a new test to ensure DirectoryComponent raises a ValueError for invalid file types, specifically when 'exe' is specified.
- Improved test coverage for DirectoryComponent by validating error messages for unsupported file types.

These changes strengthen the reliability of the DirectoryComponent by ensuring proper error handling for invalid inputs.

* [autofix.ci] apply automated fixes

* Update error handling in Component class to return None for missing flow_id or session_id

- Modified the send_error_message method to include a type hint that allows for returning None in addition to Message.
- Added a conditional check to return None if flow_id or session_id is not present, improving robustness in error handling.

These changes enhance the reliability of the Component class by ensuring it gracefully handles cases with missing identifiers.

* Refactor error handling in Component class to return None for missing session_id

- Updated the send_error_message method to remove the flow_id check, simplifying the logic.
- Enhanced robustness by ensuring that the method returns None if session_id is not present.

These changes improve the reliability of the Component class in handling error messages.

* Update required_inputs for DataFrame method in JSON configurations

- Modified the 'required_inputs' field for the 'DataFrame' method in both 'Graph Vector Store RAG.json' and 'Vector Store RAG.json' files to include necessary parameters: 'api_endpoint', 'collection_name', and 'token'.
- In 'Vector Store RAG.json', added 'collection_name_new' to the 'required_inputs' list.

These changes ensure that the DataFrame method has the appropriate inputs defined for proper functionality.

* [autofix.ci] apply automated fixes

* Enhance BaseComponent to use deep copy for attribute values in template configuration

- Updated the BaseComponent class to utilize `copy.deepcopy` when assigning values to `template_config`. This change ensures that modifications to the original component's attributes do not affect the template configuration, enhancing data integrity and preventing unintended side effects.

These changes improve the reliability of the BaseComponent by ensuring that the template configuration remains consistent and isolated from the original component's state.

* Added output for 'dataframe' in both ingestion and rag graphs

- Updated the ingestion vector store ID for better identification.
- Added output for 'dataframe' in both ingestion and rag graphs to enhance data handling.
- Simplified the output assignment for search results in rag graph by using a data list.

These changes improve the test structure and ensure that the vector store components are correctly configured for better testing outcomes.

* Refactor vector store RAG tests for improved validation and consistency

- Updated test assertions in `test_vector_store_rag_dump_components_and_edges` to verify the expected number of nodes and their types using a mapping for easier lookup.
- Changed the ingestion vector store ID from `vector-store-123` to `ingestion-vector-store-123` for better identification.
- Adjusted expected edges in the tests to reflect the new vector store ID, ensuring accurate edge validation.

These changes enhance the test structure and ensure that the vector store components are correctly configured for better testing outcomes.

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
This commit is contained in:
Rodrigo Nader 2025-01-10 11:50:30 -03:00 committed by GitHub
commit 45ed8e5e54
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 953 additions and 217 deletions

View file

@ -56,11 +56,12 @@ def format_directory_path(path: str) -> str:
return path.replace("\n", "\\n")
# Ignoring FBT001 because the DirectoryComponent in 1.0.19
# calls this function without keyword arguments
def retrieve_file_paths(
path: str,
*,
load_hidden: bool,
recursive: bool,
load_hidden: bool, # noqa: FBT001
recursive: bool, # noqa: FBT001
depth: int,
types: list[str] = TEXT_FILE_TYPES,
) -> list[str]:

View file

@ -6,7 +6,7 @@ from langflow.custom import Component
from langflow.field_typing import Text, VectorStore
from langflow.helpers.data import docs_to_data
from langflow.io import DataInput, MultilineInput, Output
from langflow.schema import Data
from langflow.schema import Data, DataFrame
if TYPE_CHECKING:
from langchain_core.documents import Document
@ -70,6 +70,7 @@ class LCVectorStoreComponent(Component):
name="search_results",
method="search_documents",
),
Output(display_name="DataFrame", name="dataframe", method="as_dataframe"),
]
def _validate_outputs(self) -> None:
@ -143,6 +144,9 @@ class LCVectorStoreComponent(Component):
self.status = search_results
return search_results
def as_dataframe(self) -> DataFrame:
return DataFrame(self.search_documents())
def get_retriever_kwargs(self):
"""Get the retriever kwargs. Implementations can override this method to provide custom retriever kwargs."""
return {}

View file

@ -2,6 +2,7 @@ from langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_
from langflow.custom import Component
from langflow.io import BoolInput, IntInput, MessageTextInput, MultiselectInput
from langflow.schema import Data
from langflow.schema.dataframe import DataFrame
from langflow.template import Output
@ -67,11 +68,12 @@ class DirectoryComponent(Component):
outputs = [
Output(display_name="Data", name="data", method="load_directory"),
Output(display_name="DataFrame", name="dataframe", method="as_dataframe"),
]
def load_directory(self) -> list[Data]:
path = self.path
types = self.types or TEXT_FILE_TYPES
types = self.types
depth = self.depth
max_concurrency = self.max_concurrency
load_hidden = self.load_hidden
@ -81,12 +83,21 @@ class DirectoryComponent(Component):
resolved_path = self.resolve_path(path)
file_paths = retrieve_file_paths(
resolved_path, load_hidden=load_hidden, recursive=recursive, depth=depth, types=types
)
# If no types are specified, use all supported types
if not types:
types = TEXT_FILE_TYPES
if types:
file_paths = [fp for fp in file_paths if any(fp.endswith(ext) for ext in types)]
# Check if all specified types are valid
invalid_types = [t for t in types if t not in TEXT_FILE_TYPES]
if invalid_types:
msg = f"Invalid file types specified: {invalid_types}. Valid types are: {TEXT_FILE_TYPES}"
raise ValueError(msg)
valid_types = types
file_paths = retrieve_file_paths(
resolved_path, load_hidden=load_hidden, recursive=recursive, depth=depth, types=valid_types
)
loaded_data = []
if use_multithreading:
@ -97,3 +108,6 @@ class DirectoryComponent(Component):
valid_data = [x for x in loaded_data if x is not None and isinstance(x, Data)]
self.status = valid_data
return valid_data
def as_dataframe(self) -> DataFrame:
return DataFrame(self.load_directory())

View file

@ -6,6 +6,7 @@ from langflow.custom import Component
from langflow.helpers.data import data_to_text
from langflow.io import DropdownInput, MessageTextInput, Output
from langflow.schema import Data
from langflow.schema.dataframe import DataFrame
from langflow.schema.message import Message
@ -35,6 +36,7 @@ class URLComponent(Component):
outputs = [
Output(display_name="Data", name="data", method="fetch_content"),
Output(display_name="Text", name="text", method="fetch_content_text"),
Output(display_name="DataFrame", name="dataframe", method="as_dataframe"),
]
def ensure_url(self, string: str) -> str:
@ -88,3 +90,6 @@ class URLComponent(Component):
result_string = data_to_text("{text}", data)
self.status = result_string
return Message(text=result_string)
def as_dataframe(self) -> DataFrame:
return DataFrame(self.fetch_content())

View file

@ -2,7 +2,7 @@ from langchain_text_splitters import CharacterTextSplitter
from langflow.custom import Component
from langflow.io import HandleInput, IntInput, MessageTextInput, Output
from langflow.schema import Data
from langflow.schema import Data, DataFrame
from langflow.utils.util import unescape_string
@ -19,6 +19,7 @@ class SplitTextComponent(Component):
info="The data to split.",
input_types=["Data"],
is_list=True,
required=True,
),
IntInput(
name="chunk_overlap",
@ -42,6 +43,7 @@ class SplitTextComponent(Component):
outputs = [
Output(display_name="Chunks", name="chunks", method="split_text"),
Output(display_name="DataFrame", name="dataframe", method="as_dataframe"),
]
def _docs_to_data(self, docs):
@ -61,3 +63,6 @@ class SplitTextComponent(Component):
data = self._docs_to_data(docs)
self.status = data
return data
def as_dataframe(self) -> DataFrame:
return DataFrame(self.split_text())

View file

@ -1,3 +1,4 @@
import copy
import operator
import re
from typing import TYPE_CHECKING, Any, ClassVar
@ -83,7 +84,8 @@ class BaseComponent:
if hasattr(component, attribute):
value = getattr(component, attribute)
if value is not None:
template_config[attribute] = func(value=value)
value_copy = copy.deepcopy(value)
template_config[attribute] = func(value=value_copy)
for key in template_config.copy():
if key not in ATTR_FUNC_MAPPING:

View file

@ -1173,9 +1173,11 @@ class Component(CustomComponent):
session_id: str,
trace_name: str,
source: Source,
) -> Message:
) -> Message | None:
"""Send an error message to the frontend."""
flow_id = self.graph.flow_id if hasattr(self, "graph") else None
if not session_id:
return None
error_message = ErrorMessage(
flow_id=flow_id,
exception=exception,

View file

@ -187,6 +187,17 @@
"Message"
],
"value": "__UNDEFINED__"
},
{
"cache": true,
"display_name": "DataFrame",
"method": "as_dataframe",
"name": "dataframe",
"selected": "DataFrame",
"types": [
"DataFrame"
],
"value": "__UNDEFINED__"
}
],
"pinned": false,
@ -208,7 +219,7 @@
"show": true,
"title_case": false,
"type": "code",
"value": "import re\n\nfrom langchain_community.document_loaders import AsyncHtmlLoader, WebBaseLoader\n\nfrom langflow.custom import Component\nfrom langflow.helpers.data import data_to_text\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.schema import Data\nfrom langflow.schema.message import Message\n\n\nclass URLComponent(Component):\n display_name = \"URL\"\n description = \"Fetch content from one or more URLs.\"\n icon = \"layout-template\"\n name = \"URL\"\n\n inputs = [\n MessageTextInput(\n name=\"urls\",\n display_name=\"URLs\",\n info=\"Enter one or more URLs, by clicking the '+' button.\",\n is_list=True,\n tool_mode=True,\n ),\n DropdownInput(\n name=\"format\",\n display_name=\"Output Format\",\n info=\"Output Format. Use 'Text' to extract the text from the HTML or 'Raw HTML' for the raw HTML content.\",\n options=[\"Text\", \"Raw HTML\"],\n value=\"Text\",\n ),\n ]\n\n outputs = [\n Output(display_name=\"Data\", name=\"data\", method=\"fetch_content\"),\n Output(display_name=\"Text\", name=\"text\", method=\"fetch_content_text\"),\n ]\n\n def ensure_url(self, string: str) -> str:\n \"\"\"Ensures the given string is a URL by adding 'http://' if it doesn't start with 'http://' or 'https://'.\n\n Raises an error if the string is not a valid URL.\n\n Parameters:\n string (str): The string to be checked and possibly modified.\n\n Returns:\n str: The modified string that is ensured to be a URL.\n\n Raises:\n ValueError: If the string is not a valid URL.\n \"\"\"\n if not string.startswith((\"http://\", \"https://\")):\n string = \"http://\" + string\n\n # Basic URL validation regex\n url_regex = re.compile(\n r\"^(https?:\\/\\/)?\" # optional protocol\n r\"(www\\.)?\" # optional www\n r\"([a-zA-Z0-9.-]+)\" # domain\n r\"(\\.[a-zA-Z]{2,})?\" # top-level domain\n r\"(:\\d+)?\" # optional port\n r\"(\\/[^\\s]*)?$\", # optional path\n re.IGNORECASE,\n )\n\n if not url_regex.match(string):\n msg = f\"Invalid URL: {string}\"\n raise ValueError(msg)\n\n return string\n\n def fetch_content(self) -> list[Data]:\n urls = [self.ensure_url(url.strip()) for url in self.urls if url.strip()]\n if self.format == \"Raw HTML\":\n loader = AsyncHtmlLoader(web_path=urls, encoding=\"utf-8\")\n else:\n loader = WebBaseLoader(web_paths=urls, encoding=\"utf-8\")\n docs = loader.load()\n data = [Data(text=doc.page_content, **doc.metadata) for doc in docs]\n self.status = data\n return data\n\n def fetch_content_text(self) -> Message:\n data = self.fetch_content()\n\n result_string = data_to_text(\"{text}\", data)\n self.status = result_string\n return Message(text=result_string)\n"
"value": "import re\n\nfrom langchain_community.document_loaders import AsyncHtmlLoader, WebBaseLoader\n\nfrom langflow.custom import Component\nfrom langflow.helpers.data import data_to_text\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.schema import Data\nfrom langflow.schema.dataframe import DataFrame\nfrom langflow.schema.message import Message\n\n\nclass URLComponent(Component):\n display_name = \"URL\"\n description = \"Fetch content from one or more URLs.\"\n icon = \"layout-template\"\n name = \"URL\"\n\n inputs = [\n MessageTextInput(\n name=\"urls\",\n display_name=\"URLs\",\n info=\"Enter one or more URLs, by clicking the '+' button.\",\n is_list=True,\n tool_mode=True,\n ),\n DropdownInput(\n name=\"format\",\n display_name=\"Output Format\",\n info=\"Output Format. Use 'Text' to extract the text from the HTML or 'Raw HTML' for the raw HTML content.\",\n options=[\"Text\", \"Raw HTML\"],\n value=\"Text\",\n ),\n ]\n\n outputs = [\n Output(display_name=\"Data\", name=\"data\", method=\"fetch_content\"),\n Output(display_name=\"Text\", name=\"text\", method=\"fetch_content_text\"),\n Output(display_name=\"DataFrame\", name=\"dataframe\", method=\"as_dataframe\"),\n ]\n\n def ensure_url(self, string: str) -> str:\n \"\"\"Ensures the given string is a URL by adding 'http://' if it doesn't start with 'http://' or 'https://'.\n\n Raises an error if the string is not a valid URL.\n\n Parameters:\n string (str): The string to be checked and possibly modified.\n\n Returns:\n str: The modified string that is ensured to be a URL.\n\n Raises:\n ValueError: If the string is not a valid URL.\n \"\"\"\n if not string.startswith((\"http://\", \"https://\")):\n string = \"http://\" + string\n\n # Basic URL validation regex\n url_regex = re.compile(\n r\"^(https?:\\/\\/)?\" # optional protocol\n r\"(www\\.)?\" # optional www\n r\"([a-zA-Z0-9.-]+)\" # domain\n r\"(\\.[a-zA-Z]{2,})?\" # top-level domain\n r\"(:\\d+)?\" # optional port\n r\"(\\/[^\\s]*)?$\", # optional path\n re.IGNORECASE,\n )\n\n if not url_regex.match(string):\n msg = f\"Invalid URL: {string}\"\n raise ValueError(msg)\n\n return string\n\n def fetch_content(self) -> list[Data]:\n urls = [self.ensure_url(url.strip()) for url in self.urls if url.strip()]\n if self.format == \"Raw HTML\":\n loader = AsyncHtmlLoader(web_path=urls, encoding=\"utf-8\")\n else:\n loader = WebBaseLoader(web_paths=urls, encoding=\"utf-8\")\n docs = loader.load()\n data = [Data(text=doc.page_content, **doc.metadata) for doc in docs]\n self.status = data\n return data\n\n def fetch_content_text(self) -> Message:\n data = self.fetch_content()\n\n result_string = data_to_text(\"{text}\", data)\n self.status = result_string\n return Message(text=result_string)\n\n def as_dataframe(self) -> DataFrame:\n return DataFrame(self.fetch_content())\n"
},
"format": {
"_input_type": "DropdownInput",

View file

@ -1636,6 +1636,17 @@
"Message"
],
"value": "__UNDEFINED__"
},
{
"cache": true,
"display_name": "DataFrame",
"method": "as_dataframe",
"name": "dataframe",
"selected": "DataFrame",
"types": [
"DataFrame"
],
"value": "__UNDEFINED__"
}
],
"pinned": false,
@ -1657,7 +1668,7 @@
"show": true,
"title_case": false,
"type": "code",
"value": "import re\n\nfrom langchain_community.document_loaders import AsyncHtmlLoader, WebBaseLoader\n\nfrom langflow.custom import Component\nfrom langflow.helpers.data import data_to_text\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.schema import Data\nfrom langflow.schema.message import Message\n\n\nclass URLComponent(Component):\n display_name = \"URL\"\n description = \"Fetch content from one or more URLs.\"\n icon = \"layout-template\"\n name = \"URL\"\n\n inputs = [\n MessageTextInput(\n name=\"urls\",\n display_name=\"URLs\",\n info=\"Enter one or more URLs, by clicking the '+' button.\",\n is_list=True,\n tool_mode=True,\n ),\n DropdownInput(\n name=\"format\",\n display_name=\"Output Format\",\n info=\"Output Format. Use 'Text' to extract the text from the HTML or 'Raw HTML' for the raw HTML content.\",\n options=[\"Text\", \"Raw HTML\"],\n value=\"Text\",\n ),\n ]\n\n outputs = [\n Output(display_name=\"Data\", name=\"data\", method=\"fetch_content\"),\n Output(display_name=\"Text\", name=\"text\", method=\"fetch_content_text\"),\n ]\n\n def ensure_url(self, string: str) -> str:\n \"\"\"Ensures the given string is a URL by adding 'http://' if it doesn't start with 'http://' or 'https://'.\n\n Raises an error if the string is not a valid URL.\n\n Parameters:\n string (str): The string to be checked and possibly modified.\n\n Returns:\n str: The modified string that is ensured to be a URL.\n\n Raises:\n ValueError: If the string is not a valid URL.\n \"\"\"\n if not string.startswith((\"http://\", \"https://\")):\n string = \"http://\" + string\n\n # Basic URL validation regex\n url_regex = re.compile(\n r\"^(https?:\\/\\/)?\" # optional protocol\n r\"(www\\.)?\" # optional www\n r\"([a-zA-Z0-9.-]+)\" # domain\n r\"(\\.[a-zA-Z]{2,})?\" # top-level domain\n r\"(:\\d+)?\" # optional port\n r\"(\\/[^\\s]*)?$\", # optional path\n re.IGNORECASE,\n )\n\n if not url_regex.match(string):\n msg = f\"Invalid URL: {string}\"\n raise ValueError(msg)\n\n return string\n\n def fetch_content(self) -> list[Data]:\n urls = [self.ensure_url(url.strip()) for url in self.urls if url.strip()]\n if self.format == \"Raw HTML\":\n loader = AsyncHtmlLoader(web_path=urls, encoding=\"utf-8\")\n else:\n loader = WebBaseLoader(web_paths=urls, encoding=\"utf-8\")\n docs = loader.load()\n data = [Data(text=doc.page_content, **doc.metadata) for doc in docs]\n self.status = data\n return data\n\n def fetch_content_text(self) -> Message:\n data = self.fetch_content()\n\n result_string = data_to_text(\"{text}\", data)\n self.status = result_string\n return Message(text=result_string)\n"
"value": "import re\n\nfrom langchain_community.document_loaders import AsyncHtmlLoader, WebBaseLoader\n\nfrom langflow.custom import Component\nfrom langflow.helpers.data import data_to_text\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.schema import Data\nfrom langflow.schema.dataframe import DataFrame\nfrom langflow.schema.message import Message\n\n\nclass URLComponent(Component):\n display_name = \"URL\"\n description = \"Fetch content from one or more URLs.\"\n icon = \"layout-template\"\n name = \"URL\"\n\n inputs = [\n MessageTextInput(\n name=\"urls\",\n display_name=\"URLs\",\n info=\"Enter one or more URLs, by clicking the '+' button.\",\n is_list=True,\n tool_mode=True,\n ),\n DropdownInput(\n name=\"format\",\n display_name=\"Output Format\",\n info=\"Output Format. Use 'Text' to extract the text from the HTML or 'Raw HTML' for the raw HTML content.\",\n options=[\"Text\", \"Raw HTML\"],\n value=\"Text\",\n ),\n ]\n\n outputs = [\n Output(display_name=\"Data\", name=\"data\", method=\"fetch_content\"),\n Output(display_name=\"Text\", name=\"text\", method=\"fetch_content_text\"),\n Output(display_name=\"DataFrame\", name=\"dataframe\", method=\"as_dataframe\"),\n ]\n\n def ensure_url(self, string: str) -> str:\n \"\"\"Ensures the given string is a URL by adding 'http://' if it doesn't start with 'http://' or 'https://'.\n\n Raises an error if the string is not a valid URL.\n\n Parameters:\n string (str): The string to be checked and possibly modified.\n\n Returns:\n str: The modified string that is ensured to be a URL.\n\n Raises:\n ValueError: If the string is not a valid URL.\n \"\"\"\n if not string.startswith((\"http://\", \"https://\")):\n string = \"http://\" + string\n\n # Basic URL validation regex\n url_regex = re.compile(\n r\"^(https?:\\/\\/)?\" # optional protocol\n r\"(www\\.)?\" # optional www\n r\"([a-zA-Z0-9.-]+)\" # domain\n r\"(\\.[a-zA-Z]{2,})?\" # top-level domain\n r\"(:\\d+)?\" # optional port\n r\"(\\/[^\\s]*)?$\", # optional path\n re.IGNORECASE,\n )\n\n if not url_regex.match(string):\n msg = f\"Invalid URL: {string}\"\n raise ValueError(msg)\n\n return string\n\n def fetch_content(self) -> list[Data]:\n urls = [self.ensure_url(url.strip()) for url in self.urls if url.strip()]\n if self.format == \"Raw HTML\":\n loader = AsyncHtmlLoader(web_path=urls, encoding=\"utf-8\")\n else:\n loader = WebBaseLoader(web_paths=urls, encoding=\"utf-8\")\n docs = loader.load()\n data = [Data(text=doc.page_content, **doc.metadata) for doc in docs]\n self.status = data\n return data\n\n def fetch_content_text(self) -> Message:\n data = self.fetch_content()\n\n result_string = data_to_text(\"{text}\", data)\n self.status = result_string\n return Message(text=result_string)\n\n def as_dataframe(self) -> DataFrame:\n return DataFrame(self.fetch_content())\n"
},
"format": {
"_input_type": "DropdownInput",
@ -1771,6 +1782,17 @@
"Message"
],
"value": "__UNDEFINED__"
},
{
"cache": true,
"display_name": "DataFrame",
"method": "as_dataframe",
"name": "dataframe",
"selected": "DataFrame",
"types": [
"DataFrame"
],
"value": "__UNDEFINED__"
}
],
"pinned": false,
@ -1792,7 +1814,7 @@
"show": true,
"title_case": false,
"type": "code",
"value": "import re\n\nfrom langchain_community.document_loaders import AsyncHtmlLoader, WebBaseLoader\n\nfrom langflow.custom import Component\nfrom langflow.helpers.data import data_to_text\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.schema import Data\nfrom langflow.schema.message import Message\n\n\nclass URLComponent(Component):\n display_name = \"URL\"\n description = \"Fetch content from one or more URLs.\"\n icon = \"layout-template\"\n name = \"URL\"\n\n inputs = [\n MessageTextInput(\n name=\"urls\",\n display_name=\"URLs\",\n info=\"Enter one or more URLs, by clicking the '+' button.\",\n is_list=True,\n tool_mode=True,\n ),\n DropdownInput(\n name=\"format\",\n display_name=\"Output Format\",\n info=\"Output Format. Use 'Text' to extract the text from the HTML or 'Raw HTML' for the raw HTML content.\",\n options=[\"Text\", \"Raw HTML\"],\n value=\"Text\",\n ),\n ]\n\n outputs = [\n Output(display_name=\"Data\", name=\"data\", method=\"fetch_content\"),\n Output(display_name=\"Text\", name=\"text\", method=\"fetch_content_text\"),\n ]\n\n def ensure_url(self, string: str) -> str:\n \"\"\"Ensures the given string is a URL by adding 'http://' if it doesn't start with 'http://' or 'https://'.\n\n Raises an error if the string is not a valid URL.\n\n Parameters:\n string (str): The string to be checked and possibly modified.\n\n Returns:\n str: The modified string that is ensured to be a URL.\n\n Raises:\n ValueError: If the string is not a valid URL.\n \"\"\"\n if not string.startswith((\"http://\", \"https://\")):\n string = \"http://\" + string\n\n # Basic URL validation regex\n url_regex = re.compile(\n r\"^(https?:\\/\\/)?\" # optional protocol\n r\"(www\\.)?\" # optional www\n r\"([a-zA-Z0-9.-]+)\" # domain\n r\"(\\.[a-zA-Z]{2,})?\" # top-level domain\n r\"(:\\d+)?\" # optional port\n r\"(\\/[^\\s]*)?$\", # optional path\n re.IGNORECASE,\n )\n\n if not url_regex.match(string):\n msg = f\"Invalid URL: {string}\"\n raise ValueError(msg)\n\n return string\n\n def fetch_content(self) -> list[Data]:\n urls = [self.ensure_url(url.strip()) for url in self.urls if url.strip()]\n if self.format == \"Raw HTML\":\n loader = AsyncHtmlLoader(web_path=urls, encoding=\"utf-8\")\n else:\n loader = WebBaseLoader(web_paths=urls, encoding=\"utf-8\")\n docs = loader.load()\n data = [Data(text=doc.page_content, **doc.metadata) for doc in docs]\n self.status = data\n return data\n\n def fetch_content_text(self) -> Message:\n data = self.fetch_content()\n\n result_string = data_to_text(\"{text}\", data)\n self.status = result_string\n return Message(text=result_string)\n"
"value": "import re\n\nfrom langchain_community.document_loaders import AsyncHtmlLoader, WebBaseLoader\n\nfrom langflow.custom import Component\nfrom langflow.helpers.data import data_to_text\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.schema import Data\nfrom langflow.schema.dataframe import DataFrame\nfrom langflow.schema.message import Message\n\n\nclass URLComponent(Component):\n display_name = \"URL\"\n description = \"Fetch content from one or more URLs.\"\n icon = \"layout-template\"\n name = \"URL\"\n\n inputs = [\n MessageTextInput(\n name=\"urls\",\n display_name=\"URLs\",\n info=\"Enter one or more URLs, by clicking the '+' button.\",\n is_list=True,\n tool_mode=True,\n ),\n DropdownInput(\n name=\"format\",\n display_name=\"Output Format\",\n info=\"Output Format. Use 'Text' to extract the text from the HTML or 'Raw HTML' for the raw HTML content.\",\n options=[\"Text\", \"Raw HTML\"],\n value=\"Text\",\n ),\n ]\n\n outputs = [\n Output(display_name=\"Data\", name=\"data\", method=\"fetch_content\"),\n Output(display_name=\"Text\", name=\"text\", method=\"fetch_content_text\"),\n Output(display_name=\"DataFrame\", name=\"dataframe\", method=\"as_dataframe\"),\n ]\n\n def ensure_url(self, string: str) -> str:\n \"\"\"Ensures the given string is a URL by adding 'http://' if it doesn't start with 'http://' or 'https://'.\n\n Raises an error if the string is not a valid URL.\n\n Parameters:\n string (str): The string to be checked and possibly modified.\n\n Returns:\n str: The modified string that is ensured to be a URL.\n\n Raises:\n ValueError: If the string is not a valid URL.\n \"\"\"\n if not string.startswith((\"http://\", \"https://\")):\n string = \"http://\" + string\n\n # Basic URL validation regex\n url_regex = re.compile(\n r\"^(https?:\\/\\/)?\" # optional protocol\n r\"(www\\.)?\" # optional www\n r\"([a-zA-Z0-9.-]+)\" # domain\n r\"(\\.[a-zA-Z]{2,})?\" # top-level domain\n r\"(:\\d+)?\" # optional port\n r\"(\\/[^\\s]*)?$\", # optional path\n re.IGNORECASE,\n )\n\n if not url_regex.match(string):\n msg = f\"Invalid URL: {string}\"\n raise ValueError(msg)\n\n return string\n\n def fetch_content(self) -> list[Data]:\n urls = [self.ensure_url(url.strip()) for url in self.urls if url.strip()]\n if self.format == \"Raw HTML\":\n loader = AsyncHtmlLoader(web_path=urls, encoding=\"utf-8\")\n else:\n loader = WebBaseLoader(web_paths=urls, encoding=\"utf-8\")\n docs = loader.load()\n data = [Data(text=doc.page_content, **doc.metadata) for doc in docs]\n self.status = data\n return data\n\n def fetch_content_text(self) -> Message:\n data = self.fetch_content()\n\n result_string = data_to_text(\"{text}\", data)\n self.status = result_string\n return Message(text=result_string)\n\n def as_dataframe(self) -> DataFrame:\n return DataFrame(self.fetch_content())\n"
},
"format": {
"_input_type": "DropdownInput",
@ -1912,6 +1934,17 @@
"Message"
],
"value": "__UNDEFINED__"
},
{
"cache": true,
"display_name": "DataFrame",
"method": "as_dataframe",
"name": "dataframe",
"selected": "DataFrame",
"types": [
"DataFrame"
],
"value": "__UNDEFINED__"
}
],
"pinned": false,
@ -1933,7 +1966,7 @@
"show": true,
"title_case": false,
"type": "code",
"value": "import re\n\nfrom langchain_community.document_loaders import AsyncHtmlLoader, WebBaseLoader\n\nfrom langflow.custom import Component\nfrom langflow.helpers.data import data_to_text\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.schema import Data\nfrom langflow.schema.message import Message\n\n\nclass URLComponent(Component):\n display_name = \"URL\"\n description = \"Fetch content from one or more URLs.\"\n icon = \"layout-template\"\n name = \"URL\"\n\n inputs = [\n MessageTextInput(\n name=\"urls\",\n display_name=\"URLs\",\n info=\"Enter one or more URLs, by clicking the '+' button.\",\n is_list=True,\n tool_mode=True,\n ),\n DropdownInput(\n name=\"format\",\n display_name=\"Output Format\",\n info=\"Output Format. Use 'Text' to extract the text from the HTML or 'Raw HTML' for the raw HTML content.\",\n options=[\"Text\", \"Raw HTML\"],\n value=\"Text\",\n ),\n ]\n\n outputs = [\n Output(display_name=\"Data\", name=\"data\", method=\"fetch_content\"),\n Output(display_name=\"Text\", name=\"text\", method=\"fetch_content_text\"),\n ]\n\n def ensure_url(self, string: str) -> str:\n \"\"\"Ensures the given string is a URL by adding 'http://' if it doesn't start with 'http://' or 'https://'.\n\n Raises an error if the string is not a valid URL.\n\n Parameters:\n string (str): The string to be checked and possibly modified.\n\n Returns:\n str: The modified string that is ensured to be a URL.\n\n Raises:\n ValueError: If the string is not a valid URL.\n \"\"\"\n if not string.startswith((\"http://\", \"https://\")):\n string = \"http://\" + string\n\n # Basic URL validation regex\n url_regex = re.compile(\n r\"^(https?:\\/\\/)?\" # optional protocol\n r\"(www\\.)?\" # optional www\n r\"([a-zA-Z0-9.-]+)\" # domain\n r\"(\\.[a-zA-Z]{2,})?\" # top-level domain\n r\"(:\\d+)?\" # optional port\n r\"(\\/[^\\s]*)?$\", # optional path\n re.IGNORECASE,\n )\n\n if not url_regex.match(string):\n msg = f\"Invalid URL: {string}\"\n raise ValueError(msg)\n\n return string\n\n def fetch_content(self) -> list[Data]:\n urls = [self.ensure_url(url.strip()) for url in self.urls if url.strip()]\n if self.format == \"Raw HTML\":\n loader = AsyncHtmlLoader(web_path=urls, encoding=\"utf-8\")\n else:\n loader = WebBaseLoader(web_paths=urls, encoding=\"utf-8\")\n docs = loader.load()\n data = [Data(text=doc.page_content, **doc.metadata) for doc in docs]\n self.status = data\n return data\n\n def fetch_content_text(self) -> Message:\n data = self.fetch_content()\n\n result_string = data_to_text(\"{text}\", data)\n self.status = result_string\n return Message(text=result_string)\n"
"value": "import re\n\nfrom langchain_community.document_loaders import AsyncHtmlLoader, WebBaseLoader\n\nfrom langflow.custom import Component\nfrom langflow.helpers.data import data_to_text\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.schema import Data\nfrom langflow.schema.dataframe import DataFrame\nfrom langflow.schema.message import Message\n\n\nclass URLComponent(Component):\n display_name = \"URL\"\n description = \"Fetch content from one or more URLs.\"\n icon = \"layout-template\"\n name = \"URL\"\n\n inputs = [\n MessageTextInput(\n name=\"urls\",\n display_name=\"URLs\",\n info=\"Enter one or more URLs, by clicking the '+' button.\",\n is_list=True,\n tool_mode=True,\n ),\n DropdownInput(\n name=\"format\",\n display_name=\"Output Format\",\n info=\"Output Format. Use 'Text' to extract the text from the HTML or 'Raw HTML' for the raw HTML content.\",\n options=[\"Text\", \"Raw HTML\"],\n value=\"Text\",\n ),\n ]\n\n outputs = [\n Output(display_name=\"Data\", name=\"data\", method=\"fetch_content\"),\n Output(display_name=\"Text\", name=\"text\", method=\"fetch_content_text\"),\n Output(display_name=\"DataFrame\", name=\"dataframe\", method=\"as_dataframe\"),\n ]\n\n def ensure_url(self, string: str) -> str:\n \"\"\"Ensures the given string is a URL by adding 'http://' if it doesn't start with 'http://' or 'https://'.\n\n Raises an error if the string is not a valid URL.\n\n Parameters:\n string (str): The string to be checked and possibly modified.\n\n Returns:\n str: The modified string that is ensured to be a URL.\n\n Raises:\n ValueError: If the string is not a valid URL.\n \"\"\"\n if not string.startswith((\"http://\", \"https://\")):\n string = \"http://\" + string\n\n # Basic URL validation regex\n url_regex = re.compile(\n r\"^(https?:\\/\\/)?\" # optional protocol\n r\"(www\\.)?\" # optional www\n r\"([a-zA-Z0-9.-]+)\" # domain\n r\"(\\.[a-zA-Z]{2,})?\" # top-level domain\n r\"(:\\d+)?\" # optional port\n r\"(\\/[^\\s]*)?$\", # optional path\n re.IGNORECASE,\n )\n\n if not url_regex.match(string):\n msg = f\"Invalid URL: {string}\"\n raise ValueError(msg)\n\n return string\n\n def fetch_content(self) -> list[Data]:\n urls = [self.ensure_url(url.strip()) for url in self.urls if url.strip()]\n if self.format == \"Raw HTML\":\n loader = AsyncHtmlLoader(web_path=urls, encoding=\"utf-8\")\n else:\n loader = WebBaseLoader(web_paths=urls, encoding=\"utf-8\")\n docs = loader.load()\n data = [Data(text=doc.page_content, **doc.metadata) for doc in docs]\n self.status = data\n return data\n\n def fetch_content_text(self) -> Message:\n data = self.fetch_content()\n\n result_string = data_to_text(\"{text}\", data)\n self.status = result_string\n return Message(text=result_string)\n\n def as_dataframe(self) -> DataFrame:\n return DataFrame(self.fetch_content())\n"
},
"format": {
"_input_type": "DropdownInput",

View file

@ -1175,6 +1175,18 @@
"Data"
],
"value": "__UNDEFINED__"
},
{
"cache": true,
"display_name": "DataFrame",
"method": "as_dataframe",
"name": "dataframe",
"required_inputs": [],
"selected": "DataFrame",
"types": [
"DataFrame"
],
"value": "__UNDEFINED__"
}
],
"pinned": false,
@ -2593,6 +2605,17 @@
"Message"
],
"value": "__UNDEFINED__"
},
{
"cache": true,
"display_name": "DataFrame",
"method": "as_dataframe",
"name": "dataframe",
"selected": "DataFrame",
"types": [
"DataFrame"
],
"value": "__UNDEFINED__"
}
],
"pinned": false,
@ -2615,7 +2638,7 @@
"show": true,
"title_case": false,
"type": "code",
"value": "import re\n\nfrom langchain_community.document_loaders import AsyncHtmlLoader, WebBaseLoader\n\nfrom langflow.custom import Component\nfrom langflow.helpers.data import data_to_text\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.schema import Data\nfrom langflow.schema.message import Message\n\n\nclass URLComponent(Component):\n display_name = \"URL\"\n description = \"Fetch content from one or more URLs.\"\n icon = \"layout-template\"\n name = \"URL\"\n\n inputs = [\n MessageTextInput(\n name=\"urls\",\n display_name=\"URLs\",\n info=\"Enter one or more URLs, by clicking the '+' button.\",\n is_list=True,\n tool_mode=True,\n ),\n DropdownInput(\n name=\"format\",\n display_name=\"Output Format\",\n info=\"Output Format. Use 'Text' to extract the text from the HTML or 'Raw HTML' for the raw HTML content.\",\n options=[\"Text\", \"Raw HTML\"],\n value=\"Text\",\n ),\n ]\n\n outputs = [\n Output(display_name=\"Data\", name=\"data\", method=\"fetch_content\"),\n Output(display_name=\"Text\", name=\"text\", method=\"fetch_content_text\"),\n ]\n\n def ensure_url(self, string: str) -> str:\n \"\"\"Ensures the given string is a URL by adding 'http://' if it doesn't start with 'http://' or 'https://'.\n\n Raises an error if the string is not a valid URL.\n\n Parameters:\n string (str): The string to be checked and possibly modified.\n\n Returns:\n str: The modified string that is ensured to be a URL.\n\n Raises:\n ValueError: If the string is not a valid URL.\n \"\"\"\n if not string.startswith((\"http://\", \"https://\")):\n string = \"http://\" + string\n\n # Basic URL validation regex\n url_regex = re.compile(\n r\"^(https?:\\/\\/)?\" # optional protocol\n r\"(www\\.)?\" # optional www\n r\"([a-zA-Z0-9.-]+)\" # domain\n r\"(\\.[a-zA-Z]{2,})?\" # top-level domain\n r\"(:\\d+)?\" # optional port\n r\"(\\/[^\\s]*)?$\", # optional path\n re.IGNORECASE,\n )\n\n if not url_regex.match(string):\n msg = f\"Invalid URL: {string}\"\n raise ValueError(msg)\n\n return string\n\n def fetch_content(self) -> list[Data]:\n urls = [self.ensure_url(url.strip()) for url in self.urls if url.strip()]\n if self.format == \"Raw HTML\":\n loader = AsyncHtmlLoader(web_path=urls, encoding=\"utf-8\")\n else:\n loader = WebBaseLoader(web_paths=urls, encoding=\"utf-8\")\n docs = loader.load()\n data = [Data(text=doc.page_content, **doc.metadata) for doc in docs]\n self.status = data\n return data\n\n def fetch_content_text(self) -> Message:\n data = self.fetch_content()\n\n result_string = data_to_text(\"{text}\", data)\n self.status = result_string\n return Message(text=result_string)\n"
"value": "import re\n\nfrom langchain_community.document_loaders import AsyncHtmlLoader, WebBaseLoader\n\nfrom langflow.custom import Component\nfrom langflow.helpers.data import data_to_text\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.schema import Data\nfrom langflow.schema.dataframe import DataFrame\nfrom langflow.schema.message import Message\n\n\nclass URLComponent(Component):\n display_name = \"URL\"\n description = \"Fetch content from one or more URLs.\"\n icon = \"layout-template\"\n name = \"URL\"\n\n inputs = [\n MessageTextInput(\n name=\"urls\",\n display_name=\"URLs\",\n info=\"Enter one or more URLs, by clicking the '+' button.\",\n is_list=True,\n tool_mode=True,\n ),\n DropdownInput(\n name=\"format\",\n display_name=\"Output Format\",\n info=\"Output Format. Use 'Text' to extract the text from the HTML or 'Raw HTML' for the raw HTML content.\",\n options=[\"Text\", \"Raw HTML\"],\n value=\"Text\",\n ),\n ]\n\n outputs = [\n Output(display_name=\"Data\", name=\"data\", method=\"fetch_content\"),\n Output(display_name=\"Text\", name=\"text\", method=\"fetch_content_text\"),\n Output(display_name=\"DataFrame\", name=\"dataframe\", method=\"as_dataframe\"),\n ]\n\n def ensure_url(self, string: str) -> str:\n \"\"\"Ensures the given string is a URL by adding 'http://' if it doesn't start with 'http://' or 'https://'.\n\n Raises an error if the string is not a valid URL.\n\n Parameters:\n string (str): The string to be checked and possibly modified.\n\n Returns:\n str: The modified string that is ensured to be a URL.\n\n Raises:\n ValueError: If the string is not a valid URL.\n \"\"\"\n if not string.startswith((\"http://\", \"https://\")):\n string = \"http://\" + string\n\n # Basic URL validation regex\n url_regex = re.compile(\n r\"^(https?:\\/\\/)?\" # optional protocol\n r\"(www\\.)?\" # optional www\n r\"([a-zA-Z0-9.-]+)\" # domain\n r\"(\\.[a-zA-Z]{2,})?\" # top-level domain\n r\"(:\\d+)?\" # optional port\n r\"(\\/[^\\s]*)?$\", # optional path\n re.IGNORECASE,\n )\n\n if not url_regex.match(string):\n msg = f\"Invalid URL: {string}\"\n raise ValueError(msg)\n\n return string\n\n def fetch_content(self) -> list[Data]:\n urls = [self.ensure_url(url.strip()) for url in self.urls if url.strip()]\n if self.format == \"Raw HTML\":\n loader = AsyncHtmlLoader(web_path=urls, encoding=\"utf-8\")\n else:\n loader = WebBaseLoader(web_paths=urls, encoding=\"utf-8\")\n docs = loader.load()\n data = [Data(text=doc.page_content, **doc.metadata) for doc in docs]\n self.status = data\n return data\n\n def fetch_content_text(self) -> Message:\n data = self.fetch_content()\n\n result_string = data_to_text(\"{text}\", data)\n self.status = result_string\n return Message(text=result_string)\n\n def as_dataframe(self) -> DataFrame:\n return DataFrame(self.fetch_content())\n"
},
"format": {
"_input_type": "DropdownInput",
@ -2757,6 +2780,18 @@
"Data"
],
"value": "__UNDEFINED__"
},
{
"cache": true,
"display_name": "DataFrame",
"method": "as_dataframe",
"name": "dataframe",
"required_inputs": [],
"selected": "DataFrame",
"types": [
"DataFrame"
],
"value": "__UNDEFINED__"
}
],
"pinned": false,

View file

@ -904,6 +904,17 @@
"Data"
],
"value": "__UNDEFINED__"
},
{
"cache": true,
"display_name": "DataFrame",
"method": "as_dataframe",
"name": "dataframe",
"selected": "DataFrame",
"types": [
"DataFrame"
],
"value": "__UNDEFINED__"
}
],
"pinned": false,
@ -955,7 +966,7 @@
"show": true,
"title_case": false,
"type": "code",
"value": "from langchain_text_splitters import CharacterTextSplitter\n\nfrom langflow.custom import Component\nfrom langflow.io import HandleInput, IntInput, MessageTextInput, Output\nfrom langflow.schema import Data\nfrom langflow.utils.util import unescape_string\n\n\nclass SplitTextComponent(Component):\n display_name: str = \"Split Text\"\n description: str = \"Split text into chunks based on specified criteria.\"\n icon = \"scissors-line-dashed\"\n name = \"SplitText\"\n\n inputs = [\n HandleInput(\n name=\"data_inputs\",\n display_name=\"Data Inputs\",\n info=\"The data to split.\",\n input_types=[\"Data\"],\n is_list=True,\n ),\n IntInput(\n name=\"chunk_overlap\",\n display_name=\"Chunk Overlap\",\n info=\"Number of characters to overlap between chunks.\",\n value=200,\n ),\n IntInput(\n name=\"chunk_size\",\n display_name=\"Chunk Size\",\n info=\"The maximum number of characters in each chunk.\",\n value=1000,\n ),\n MessageTextInput(\n name=\"separator\",\n display_name=\"Separator\",\n info=\"The character to split on. Defaults to newline.\",\n value=\"\\n\",\n ),\n ]\n\n outputs = [\n Output(display_name=\"Chunks\", name=\"chunks\", method=\"split_text\"),\n ]\n\n def _docs_to_data(self, docs):\n return [Data(text=doc.page_content, data=doc.metadata) for doc in docs]\n\n def split_text(self) -> list[Data]:\n separator = unescape_string(self.separator)\n\n documents = [_input.to_lc_document() for _input in self.data_inputs if isinstance(_input, Data)]\n\n splitter = CharacterTextSplitter(\n chunk_overlap=self.chunk_overlap,\n chunk_size=self.chunk_size,\n separator=separator,\n )\n docs = splitter.split_documents(documents)\n data = self._docs_to_data(docs)\n self.status = data\n return data\n"
"value": "from langchain_text_splitters import CharacterTextSplitter\n\nfrom langflow.custom import Component\nfrom langflow.io import HandleInput, IntInput, MessageTextInput, Output\nfrom langflow.schema import Data, DataFrame\nfrom langflow.utils.util import unescape_string\n\n\nclass SplitTextComponent(Component):\n display_name: str = \"Split Text\"\n description: str = \"Split text into chunks based on specified criteria.\"\n icon = \"scissors-line-dashed\"\n name = \"SplitText\"\n\n inputs = [\n HandleInput(\n name=\"data_inputs\",\n display_name=\"Data Inputs\",\n info=\"The data to split.\",\n input_types=[\"Data\"],\n is_list=True,\n required=True,\n ),\n IntInput(\n name=\"chunk_overlap\",\n display_name=\"Chunk Overlap\",\n info=\"Number of characters to overlap between chunks.\",\n value=200,\n ),\n IntInput(\n name=\"chunk_size\",\n display_name=\"Chunk Size\",\n info=\"The maximum number of characters in each chunk.\",\n value=1000,\n ),\n MessageTextInput(\n name=\"separator\",\n display_name=\"Separator\",\n info=\"The character to split on. Defaults to newline.\",\n value=\"\\n\",\n ),\n ]\n\n outputs = [\n Output(display_name=\"Chunks\", name=\"chunks\", method=\"split_text\"),\n Output(display_name=\"DataFrame\", name=\"dataframe\", method=\"as_dataframe\"),\n ]\n\n def _docs_to_data(self, docs):\n return [Data(text=doc.page_content, data=doc.metadata) for doc in docs]\n\n def split_text(self) -> list[Data]:\n separator = unescape_string(self.separator)\n\n documents = [_input.to_lc_document() for _input in self.data_inputs if isinstance(_input, Data)]\n\n splitter = CharacterTextSplitter(\n chunk_overlap=self.chunk_overlap,\n chunk_size=self.chunk_size,\n separator=separator,\n )\n docs = splitter.split_documents(documents)\n data = self._docs_to_data(docs)\n self.status = data\n return data\n\n def as_dataframe(self) -> DataFrame:\n return DataFrame(self.split_text())\n"
},
"data_inputs": {
"advanced": false,
@ -968,7 +979,7 @@
"list": true,
"name": "data_inputs",
"placeholder": "",
"required": false,
"required": true,
"show": true,
"title_case": false,
"trace_as_metadata": true,
@ -3058,6 +3069,18 @@
"Data"
],
"value": "__UNDEFINED__"
},
{
"cache": true,
"display_name": "DataFrame",
"method": "as_dataframe",
"name": "dataframe",
"required_inputs": [],
"selected": "DataFrame",
"types": [
"DataFrame"
],
"value": "__UNDEFINED__"
}
],
"pinned": false,
@ -3477,6 +3500,18 @@
"Data"
],
"value": "__UNDEFINED__"
},
{
"cache": true,
"display_name": "DataFrame",
"method": "as_dataframe",
"name": "dataframe",
"required_inputs": [],
"selected": "DataFrame",
"types": [
"DataFrame"
],
"value": "__UNDEFINED__"
}
],
"pinned": false,

View file

@ -32,7 +32,7 @@ class DataFrame(pandas_DataFrame):
>>> dataset = DataFrame({"name": ["John", "Jane"], "age": [30, 25]})
"""
def __init__(self, data: list[dict | Data] | dict | pd.DataFrame | None = None, **kwargs):
def __init__(self, data: list[dict] | list[Data] | pd.DataFrame | None = None, **kwargs):
if data is None:
super().__init__(**kwargs)
return

View file

@ -0,0 +1,378 @@
import tempfile
from pathlib import Path
from unittest.mock import Mock, patch
import pytest
from langflow.components.data import DirectoryComponent
from langflow.schema import Data, DataFrame
from tests.base import ComponentTestBaseWithoutClient
class TestDirectoryComponent(ComponentTestBaseWithoutClient):
@pytest.fixture
def component_class(self):
"""Return the component class to test."""
return DirectoryComponent
@pytest.fixture
def default_kwargs(self, tmp_path):
"""Return the default kwargs for the component."""
return {
"path": str(tmp_path),
"recursive": True,
"use_multithreading": False,
"silent_errors": False,
"types": ["txt"],
}
@pytest.fixture
def file_names_mapping(self):
"""Return an empty list since this component doesn't have version-specific files."""
return [
{"version": "1.0.19", "module": "data", "file_name": "Directory"},
{"version": "1.1.0", "module": "data", "file_name": "directory"},
{"version": "1.1.1", "module": "data", "file_name": "directory"},
]
@patch("langflow.components.data.directory.parallel_load_data")
@patch("langflow.components.data.directory.retrieve_file_paths")
@patch("langflow.components.data.DirectoryComponent.resolve_path")
def test_directory_component_build_with_multithreading(
self, mock_resolve_path, mock_retrieve_file_paths, mock_parallel_load_data
):
# Arrange
directory_component = DirectoryComponent()
path = Path(__file__).resolve().parent
depth = 1
max_concurrency = 2
load_hidden = False
recursive = True
silent_errors = False
use_multithreading = True
mock_resolve_path.return_value = str(path)
mock_retrieve_file_paths.return_value = [str(p) for p in path.iterdir() if p.suffix == ".py"]
mock_parallel_load_data.return_value = [Mock()]
# Act
directory_component.set_attributes(
{
"path": str(path),
"depth": depth,
"max_concurrency": max_concurrency,
"load_hidden": load_hidden,
"recursive": recursive,
"silent_errors": silent_errors,
"use_multithreading": use_multithreading,
"types": ["py"], # Add file types without dots
}
)
directory_component.load_directory()
# Assert
mock_resolve_path.assert_called_once_with(str(path))
mock_retrieve_file_paths.assert_called_once_with(
mock_resolve_path.return_value,
depth=depth,
recursive=recursive,
types=["py"],
load_hidden=load_hidden,
)
mock_parallel_load_data.assert_called_once_with(
mock_retrieve_file_paths.return_value,
max_concurrency=max_concurrency,
silent_errors=silent_errors,
)
def test_directory_without_mocks(self):
directory_component = DirectoryComponent()
with tempfile.TemporaryDirectory() as temp_dir:
(Path(temp_dir) / "test.txt").write_text("test", encoding="utf-8")
# also add a json file
(Path(temp_dir) / "test.json").write_text('{"test": "test"}', encoding="utf-8")
directory_component.set_attributes(
{
"path": str(temp_dir),
"use_multithreading": False,
"silent_errors": False,
"types": ["txt", "json"],
}
)
results = directory_component.load_directory()
assert len(results) == 2
values = ["test", '{"test":"test"}']
assert all(result.text in values for result in results), [
(len(result.text), len(val)) for result, val in zip(results, values, strict=True)
]
# in ../docs/docs/components there are many mdx files
# check if the directory component can load them
# just check if the number of results is the same as the number of files
directory_component = DirectoryComponent()
docs_path = Path(__file__).parent.parent.parent.parent.parent.parent.parent / "docs" / "docs" / "Components"
directory_component.set_attributes(
{
"path": str(docs_path),
"use_multithreading": False,
"silent_errors": False,
"types": ["md", "json"],
}
)
results = directory_component.load_directory()
docs_files = list(docs_path.glob("*.md")) + list(docs_path.glob("*.json"))
assert len(results) == len(docs_files)
def test_directory_as_dataframe(self):
"""Test DirectoryComponent's as_dataframe method."""
directory_component = DirectoryComponent()
with tempfile.TemporaryDirectory() as temp_dir:
# Create test files with different content
files_content = {
"file1.txt": "content1",
"file2.json": '{"key": "content2"}',
"file3.md": "# content3",
}
for filename, content in files_content.items():
(Path(temp_dir) / filename).write_text(content, encoding="utf-8")
directory_component.set_attributes(
{
"path": str(temp_dir),
"use_multithreading": False,
"types": ["txt", "json", "md"],
"silent_errors": False,
}
)
# Test as_dataframe
data_frame = directory_component.as_dataframe()
# Verify DataFrame structure
assert isinstance(data_frame, DataFrame), "Expected DataFrame instance"
assert len(data_frame) == 3, f"Expected DataFrame with 3 rows, got {len(data_frame)}"
# Check column names
expected_columns = ["text", "file_path"]
actual_columns = list(data_frame.columns)
assert set(expected_columns).issubset(
set(actual_columns)
), f"Missing required columns. Expected at least {expected_columns}, got {actual_columns}"
# Verify content matches input files
texts = data_frame["text"].tolist()
# For JSON files, the content is parsed and re-serialized
expected_content = {
"file1.txt": "content1",
"file2.json": '{"key":"content2"}', # JSON is re-serialized without spaces
"file3.md": "# content3",
}
missing_content = [content for content in expected_content.values() if content not in texts]
assert not missing_content, f"Missing expected content in DataFrame: {missing_content}"
# Verify file paths are correct
file_paths = data_frame["file_path"].tolist()
expected_paths = [str(Path(temp_dir) / filename) for filename in files_content]
missing_paths = [path for path in expected_paths if not any(path in fp for fp in file_paths)]
assert not missing_paths, f"Missing expected file paths in DataFrame: {missing_paths}"
def test_directory_with_depth(self):
"""Test DirectoryComponent with different depth settings."""
directory_component = DirectoryComponent()
with tempfile.TemporaryDirectory() as temp_dir:
# Create a nested directory structure
base_dir = Path(temp_dir)
(base_dir / "level1").mkdir()
(base_dir / "level1" / "level2").mkdir()
# Create files at different levels
(base_dir / "root.txt").write_text("root", encoding="utf-8")
(base_dir / "level1" / "level1.txt").write_text("level1", encoding="utf-8")
(base_dir / "level1" / "level2" / "level2.txt").write_text("level2", encoding="utf-8")
# Test non-recursive (only root)
directory_component.set_attributes(
{
"path": str(temp_dir),
"recursive": False, # Set recursive to False to get only root files
"use_multithreading": False,
"silent_errors": False,
"types": ["txt"],
}
)
results_root = directory_component.load_directory()
assert len(results_root) == 1, (
"With recursive=False, expected 1 file (root.txt), "
f"got {len(results_root)} files: {[d.data['file_path'] for d in results_root]}"
)
assert results_root[0].text == "root", f"Expected root file content 'root', got '{results_root[0].text}'"
# Test recursive with all files
directory_component.set_attributes(
{
"path": str(temp_dir),
"recursive": True,
"use_multithreading": False,
"silent_errors": False,
"types": ["txt"],
}
)
results_all = directory_component.load_directory()
assert len(results_all) == 3, (
"With recursive=True, expected 3 files (all files), "
f"got {len(results_all)} files: {[d.data['file_path'] for d in results_all]}"
)
texts = sorted([r.text for r in results_all])
expected_texts = sorted(["root", "level1", "level2"])
assert texts == expected_texts, f"Expected texts {expected_texts}, got {texts}"
@pytest.mark.parametrize(
("file_types", "expected_count"),
[
(["txt"], 1),
(["json"], 1),
(["txt", "json"], 2),
],
)
def test_directory_with_types(self, file_types, expected_count):
"""Test DirectoryComponent with different file type filters (parameterized)."""
directory_component = DirectoryComponent()
with tempfile.TemporaryDirectory() as temp_dir:
# Create files with different extensions
(Path(temp_dir) / "test.txt").write_text("text content", encoding="utf-8")
(Path(temp_dir) / "test.json").write_text('{"key": "value"}', encoding="utf-8")
(Path(temp_dir) / "test.exe").write_text("test", encoding="utf-8")
directory_component.set_attributes(
{
"path": str(temp_dir),
"types": file_types,
"use_multithreading": False,
"silent_errors": False,
}
)
results = directory_component.load_directory()
# Verify number of loaded files
assert (
len(results) == expected_count
), f"Expected {expected_count} results for file types {file_types}, got {len(results)}"
# Optionally, check the file extension in each result
for r in results:
# e.g., verify that the extension is indeed in file_types
file_ext = Path(r.data["file_path"]).suffix.lstrip(".")
assert file_ext in file_types, f"Unexpected file extension: {file_ext}"
def test_directory_invalid_type(self):
"""Test DirectoryComponent raises error with invalid file type."""
directory_component = DirectoryComponent()
with tempfile.TemporaryDirectory() as temp_dir:
# Create test file
(Path(temp_dir) / "test.exe").write_text("test", encoding="utf-8")
directory_component.set_attributes(
{
"path": str(temp_dir),
"types": ["exe"],
"use_multithreading": False,
"silent_errors": False,
}
)
with pytest.raises(
ValueError, match="Invalid file types specified: \\['exe'\\]. Valid types are:"
) as exc_info:
directory_component.load_directory()
assert "Invalid file types specified: ['exe']" in str(exc_info.value)
assert "Valid types are:" in str(exc_info.value)
def test_directory_with_hidden_files(self):
"""Test DirectoryComponent with hidden files."""
directory_component = DirectoryComponent()
with tempfile.TemporaryDirectory() as temp_dir:
# Create regular and hidden files
(Path(temp_dir) / "regular.txt").write_text("regular", encoding="utf-8")
(Path(temp_dir) / ".hidden.txt").write_text("hidden", encoding="utf-8")
# Test without loading hidden files
directory_component.set_attributes(
{
"path": str(temp_dir),
"load_hidden": False,
"use_multithreading": False,
"silent_errors": False,
"types": ["txt"],
}
)
results = directory_component.load_directory()
assert len(results) == 1
assert results[0].text == "regular"
# Test with loading hidden files
directory_component.set_attributes({"load_hidden": True})
results = directory_component.load_directory()
assert len(results) == 2
texts = [r.text for r in results]
assert "regular" in texts
assert "hidden" in texts
@patch("langflow.components.data.directory.parallel_load_data")
def test_directory_with_multithreading(self, mock_parallel_load):
"""Test DirectoryComponent with multithreading enabled."""
directory_component = DirectoryComponent()
with tempfile.TemporaryDirectory() as temp_dir:
# Create test files
(Path(temp_dir) / "test1.txt").write_text("content1", encoding="utf-8")
(Path(temp_dir) / "test2.txt").write_text("content2", encoding="utf-8")
# Mock parallel_load_data to return some test data
mock_data = [
Data(text="content1", data={"file_path": str(Path(temp_dir) / "test1.txt")}),
Data(text="content2", data={"file_path": str(Path(temp_dir) / "test2.txt")}),
]
mock_parallel_load.return_value = mock_data
# Test with multithreading enabled
directory_component.set_attributes(
{
"path": str(temp_dir),
"use_multithreading": True,
"max_concurrency": 2,
"types": ["txt"], # Specify file types to ensure files are found
"recursive": True, # Enable recursive search
"silent_errors": False,
}
)
results = directory_component.load_directory()
# Verify parallel_load_data was called with correct parameters
mock_parallel_load.assert_called_once()
call_args = mock_parallel_load.call_args[1]
assert (
call_args["max_concurrency"] == 2
), f"Expected max_concurrency=2, got {call_args.get('max_concurrency')}"
assert (
call_args["silent_errors"] is False
), f"Expected silent_errors=False, got {call_args.get('silent_errors')}"
# Verify results
assert (
len(results) == 2
), f"Expected 2 results, got {len(results)}: {[r.data['file_path'] for r in results]}"
assert all(
isinstance(r, Data) for r in results
), f"All results should be Data objects, got types: {[type(r) for r in results]}"
actual_texts = [r.text for r in results]
expected_texts = ["content1", "content2"]
assert actual_texts == expected_texts, f"Expected texts {expected_texts}, got {actual_texts}"

View file

@ -0,0 +1,149 @@
from unittest.mock import Mock, patch
import pytest
import respx
from httpx import Response
from langflow.components.data import URLComponent
from langflow.schema import DataFrame, Message
from tests.base import ComponentTestBaseWithoutClient
class TestURLComponent(ComponentTestBaseWithoutClient):
@pytest.fixture
def component_class(self):
"""Return the component class to test."""
return URLComponent
@pytest.fixture
def default_kwargs(self):
"""Return the default kwargs for the component."""
return {
"urls": ["https://example.com"],
"format": "Text",
}
@pytest.fixture
def file_names_mapping(self):
"""Return an empty list since this component doesn't have version-specific files."""
return [
{"version": "1.0.19", "module": "data", "file_name": "URL"},
{"version": "1.1.0", "module": "data", "file_name": "url"},
{"version": "1.1.1", "module": "data", "file_name": "url"},
]
@pytest.fixture
def mock_web_load(self):
"""Mock the WebBaseLoader.load method."""
with patch("langchain_community.document_loaders.WebBaseLoader.load") as mock:
yield mock
def test_url_component(self, mock_web_load):
"""Test basic URL component functionality."""
component = URLComponent()
component.set_attributes({"urls": ["https://example.com"]})
mock_web_load.return_value = [Mock(page_content="test content", metadata={"source": "https://example.com"})]
data_ = component.fetch_content()
assert all(value.data for value in data_)
assert all(value.text for value in data_)
assert all(value.source for value in data_)
# @pytest.mark.parametrize(
# ("format_type", "expected_content"),
# [
# ("Text", "test content"),
# ("Raw HTML", "<html>test content</html>"),
# ],
# )
# def test_url_component_formats(self, mock_web_load, format_type, expected_content):
# """Test URL component with different format types."""
# component = URLComponent()
# component.set_attributes({"urls": ["https://example.com"], "format": format_type})
# # Mock the loader response
# mock_web_load.return_value = [Mock(page_content=expected_content, metadata={"source": "https://example.com"})]
# # Test fetch_content - use sync version
# content = component.fetch_content()
# assert len(content) == 1
# assert content[0].text == expected_content
# assert content[0].source == "https://example.com"
def test_url_component_as_dataframe(self, mock_web_load):
"""Test URL component's as_dataframe method."""
component = URLComponent()
urls = ["https://example1.com", "https://example2.com"]
component.set_attributes({"urls": urls})
# Mock the loader response
mock_web_load.return_value = [
Mock(page_content="content1", metadata={"source": urls[0]}),
Mock(page_content="content2", metadata={"source": urls[1]}),
]
# Test as_dataframe
data_frame = component.as_dataframe()
assert isinstance(data_frame, DataFrame), "Expected DataFrame instance"
assert len(data_frame) == 2
assert list(data_frame.columns) == ["text", "source"]
assert data_frame.iloc[0]["text"] == "content1"
assert data_frame.iloc[0]["source"] == urls[0]
assert data_frame.iloc[1]["text"] == "content2"
assert data_frame.iloc[1]["source"] == urls[1]
def test_url_component_fetch_content_text(self, mock_web_load):
"""Test URL component's fetch_content_text method."""
component = URLComponent()
component.set_attributes({"urls": ["https://example.com"]})
mock_web_load.return_value = [Mock(page_content="test content", metadata={"source": "https://example.com"})]
# Test fetch_content_text
message = component.fetch_content_text()
assert isinstance(message, Message), "Expected Message instance"
assert message.text == "test content"
def test_url_component_invalid_urls(self):
"""Test URL component with invalid URLs."""
component = URLComponent()
component.set_attributes({"urls": ["not_a_valid_url"]})
# Test that invalid URLs raise a ValueError
with pytest.raises(ValueError, match="Invalid URL: http://not_a_valid_url"):
component.fetch_content()
def test_url_component_multiple_urls(self, mock_web_load):
"""Test URL component with multiple URLs."""
component = URLComponent()
urls = ["https://example1.com", "https://example2.com", "https://example3.com"]
component.set_attributes({"urls": urls})
mock_web_load.return_value = [
Mock(page_content=f"content{i+1}", metadata={"source": url}) for i, url in enumerate(urls)
]
# Test fetch_content
content = component.fetch_content()
assert len(content) == 3, f"Expected 3 content items, got {len(content)}"
for i, item in enumerate(content):
url = urls[i]
assert item.source == url, f"Expected '{url}', got '{item.source}'"
assert item.text == f"content{i+1}"
@respx.mock
async def test_url_request_success(self, mock_web_load):
"""Test successful URL request."""
url = "https://example.com/api/test"
respx.get(url).mock(return_value=Response(200, json={"success": True}))
component = URLComponent()
component.set_attributes({"urls": [url]})
mock_web_load.return_value = [Mock(page_content="test content", metadata={"source": url})]
result = component.fetch_content()
assert len(result) == 1
assert result[0].source == url

View file

@ -0,0 +1,222 @@
import pytest
from langflow.components.processing import SplitTextComponent
from langflow.schema import Data, DataFrame
from tests.base import ComponentTestBaseWithoutClient
class TestSplitTextComponent(ComponentTestBaseWithoutClient):
@pytest.fixture
def component_class(self):
"""Return the component class to test."""
return SplitTextComponent
@pytest.fixture
def default_kwargs(self):
"""Return the default kwargs for the component."""
return {
"data_inputs": [Data(text="Hello World")],
"chunk_overlap": 200,
"chunk_size": 1000,
"separator": "\n",
"session_id": "test_session",
"sender": "test_sender",
"sender_name": "test_sender_name",
}
@pytest.fixture
def file_names_mapping(self):
"""Return the file names mapping for different versions."""
return [
# It was in helpers in version 1.0.19
{"version": "1.0.19", "module": "helpers", "file_name": "SplitText"},
{"version": "1.1.0", "module": "processing", "file_name": "split_text"},
{"version": "1.1.1", "module": "processing", "file_name": "split_text"},
]
def test_split_text_basic(self):
"""Test basic text splitting functionality."""
component = SplitTextComponent()
test_text = "This is a test.\nIt has multiple lines.\nEach line should be a chunk."
component.set_attributes(
{
"data_inputs": [Data(text=test_text)],
"chunk_overlap": 0,
"chunk_size": 15,
"separator": "\n",
"session_id": "test_session",
"sender": "test_sender",
"sender_name": "test_sender_name",
}
)
results = component.split_text()
assert len(results) == 3, f"Expected 3 chunks, got {len(results)}"
assert "This is a test" in results[0].text, f"Expected 'This is a test', got '{results[0].text}'"
assert "It has multiple lines" in results[1].text, f"Expected 'It has multiple lines', got '{results[1].text}'"
assert (
"Each line should be a chunk" in results[2].text
), f"Expected 'Each line should be a chunk', got '{results[2].text}'"
def test_split_text_with_overlap(self):
"""Test text splitting with overlap."""
component = SplitTextComponent()
test_text = "First chunk.\nSecond chunk.\nThird chunk."
component.set_attributes(
{
"data_inputs": [Data(text=test_text)],
"chunk_overlap": 5, # Small overlap to test functionality
"chunk_size": 20,
"separator": "\n",
"session_id": "test_session",
"sender": "test_sender",
"sender_name": "test_sender_name",
}
)
results = component.split_text()
assert len(results) > 1, f"Expected more than 1 chunk, got {len(results)}"
# Check that chunks contain the expected text
assert "First chunk" in results[0].text, f"Expected 'First chunk' in '{results[0].text}'"
assert "Second chunk" in results[1].text, f"Expected 'Second chunk' in '{results[1].text}'"
assert "Third chunk" in results[2].text, f"Expected 'Third chunk' in '{results[2].text}'"
def test_split_text_custom_separator(self):
"""Test text splitting with a custom separator."""
component = SplitTextComponent()
test_text = "First part|Second part|Third part"
component.set_attributes(
{
"data_inputs": [Data(text=test_text)],
"chunk_overlap": 0,
"chunk_size": 10,
"separator": "|",
"session_id": "test_session",
"sender": "test_sender",
"sender_name": "test_sender_name",
}
)
results = component.split_text()
assert len(results) == 3, f"Expected 3 chunks, got {len(results)}"
assert "First part" in results[0].text, f"Expected 'First part', got '{results[0].text}'"
assert "Second part" in results[1].text, f"Expected 'Second part', got '{results[1].text}'"
assert "Third part" in results[2].text, f"Expected 'Third part', got '{results[2].text}'"
def test_split_text_with_metadata(self):
"""Test text splitting while preserving metadata."""
component = SplitTextComponent()
test_metadata = {"source": "test.txt", "author": "test"}
test_text = "Chunk 1\nChunk 2"
component.set_attributes(
{
"data_inputs": [Data(text=test_text, data=test_metadata)],
"chunk_overlap": 0,
"chunk_size": 7,
"separator": "\n",
"session_id": "test_session",
"sender": "test_sender",
"sender_name": "test_sender_name",
}
)
results = component.split_text()
assert len(results) == 2, f"Expected 2 chunks, got {len(results)}"
for result in results:
assert (
result.data["source"] == test_metadata["source"]
), f"Expected source '{test_metadata['source']}', got '{result.data.get('source')}'"
assert (
result.data["author"] == test_metadata["author"]
), f"Expected author '{test_metadata['author']}', got '{result.data.get('author')}'"
def test_split_text_as_dataframe(self):
"""Test converting split text results to DataFrame."""
component = SplitTextComponent()
test_text = "First chunk\nSecond chunk\nThird chunk"
component.set_attributes(
{
"data_inputs": [Data(text=test_text)],
"chunk_overlap": 0,
"chunk_size": 11,
"separator": "\n",
"session_id": "test_session",
"sender": "test_sender",
"sender_name": "test_sender_name",
}
)
data_frame = component.as_dataframe()
assert isinstance(data_frame, DataFrame), "Expected DataFrame instance"
assert len(data_frame) == 3, f"Expected DataFrame with 3 rows, got {len(data_frame)}"
assert list(data_frame.columns) == ["text"], f"Expected columns ['text'], got {list(data_frame.columns)}"
assert (
"First chunk" in data_frame.iloc[0]["text"]
), f"Expected 'First chunk', got '{data_frame.iloc[0]['text']}'"
assert (
"Second chunk" in data_frame.iloc[1]["text"]
), f"Expected 'Second chunk', got '{data_frame.iloc[1]['text']}'"
assert (
"Third chunk" in data_frame.iloc[2]["text"]
), f"Expected 'Third chunk', got '{data_frame.iloc[2]['text']}'"
def test_split_text_empty_input(self):
"""Test handling of empty input text."""
component = SplitTextComponent()
component.set_attributes(
{
"data_inputs": [Data(text="")],
"chunk_overlap": 0,
"chunk_size": 10,
"separator": "\n",
"session_id": "test_session",
"sender": "test_sender",
"sender_name": "test_sender_name",
}
)
results = component.split_text()
assert len(results) == 0, f"Expected 0 chunks for empty input, got {len(results)}"
def test_split_text_single_chunk(self):
"""Test text that fits in a single chunk."""
component = SplitTextComponent()
test_text = "Small text"
component.set_attributes(
{
"data_inputs": [Data(text=test_text)],
"chunk_overlap": 0,
"chunk_size": 100,
"separator": "\n",
"session_id": "test_session",
"sender": "test_sender",
"sender_name": "test_sender_name",
}
)
results = component.split_text()
assert len(results) == 1, f"Expected 1 chunk, got {len(results)}"
assert results[0].text == test_text, f"Expected '{test_text}', got '{results[0].text}'"
def test_split_text_multiple_inputs(self):
"""Test splitting multiple input texts."""
component = SplitTextComponent()
test_texts = ["First text\nSecond line", "Another text\nAnother line"]
component.set_attributes(
{
"data_inputs": [Data(text=text) for text in test_texts],
"chunk_overlap": 0,
"chunk_size": 10,
"separator": "\n",
"session_id": "test_session",
"sender": "test_sender",
"sender_name": "test_sender_name",
}
)
results = component.split_text()
assert len(results) == 4, f"Expected 4 chunks (2 from each text), got {len(results)}"
assert "First text" in results[0].text, f"Expected 'First text', got '{results[0].text}'"
assert "Second line" in results[1].text, f"Expected 'Second line', got '{results[1].text}'"
assert "Another text" in results[2].text, f"Expected 'Another text', got '{results[2].text}'"
assert "Another line" in results[3].text, f"Expected 'Another line', got '{results[3].text}'"

View file

@ -15,6 +15,7 @@ from langflow.components.vectorstores import AstraDBVectorStoreComponent
from langflow.graph import Graph
from langflow.graph.graph.constants import Finish
from langflow.schema import Data
from langflow.schema.dataframe import DataFrame
@pytest.fixture
@ -29,15 +30,15 @@ def ingestion_graph():
openai_embeddings.set(
openai_api_key="sk-123", openai_api_base="https://api.openai.com/v1", openai_api_type="openai"
)
vector_store = AstraDBVectorStoreComponent(_id="vector-store-123")
vector_store = AstraDBVectorStoreComponent(_id="ingestion-vector-store-123")
vector_store.set_on_output(name="search_results", value=[Data(text="This is a test file.")], cache=True)
vector_store.set_on_output(name="dataframe", value=DataFrame(data=[Data(text="This is a test file.")]), cache=True)
vector_store.set(
embedding_model=openai_embeddings.build_embeddings,
ingest_data=text_splitter.split_text,
api_endpoint="https://astra.example.com",
token="token", # noqa: S106
)
vector_store.set_on_output(name="search_results", value=[Data(text="This is a test file.")], cache=True)
return Graph(file_component, vector_store)
@ -55,14 +56,16 @@ def rag_graph():
embedding_model=openai_embeddings.build_embeddings,
)
# Mock search_documents
data_list = [
Data(data={"text": "Hello, world!"}),
Data(data={"text": "Goodbye, world!"}),
]
rag_vector_store.set_on_output(
name="search_results",
value=[
Data(data={"text": "Hello, world!"}),
Data(data={"text": "Goodbye, world!"}),
],
value=data_list,
cache=True,
)
rag_vector_store.set_on_output(name="dataframe", value=DataFrame(data=data_list), cache=True)
parse_data = ParseDataComponent(_id="parse-data-123")
parse_data.set(data=rag_vector_store.search_documents)
prompt_component = PromptComponent(_id="prompt-123")
@ -93,7 +96,7 @@ def test_vector_store_rag(ingestion_graph, rag_graph):
"file-123",
"text-splitter-123",
"openai-embeddings-123",
"vector-store-123",
"ingestion-vector-store-123",
]
assert rag_graph is not None
rag_ids = [
@ -122,30 +125,38 @@ def test_vector_store_rag_dump_components_and_edges(ingestion_graph, rag_graph):
ingestion_data = ingestion_graph_dump["data"]
ingestion_nodes = ingestion_data["nodes"]
assert len(ingestion_nodes) == 4
ingestion_edges = ingestion_data["edges"]
# Sort nodes by id to check components
ingestion_nodes = sorted(ingestion_nodes, key=operator.itemgetter("id"))
# Define expected nodes with their types
expected_nodes = {
"file-123": "File",
"openai-embeddings-123": "OpenAIEmbeddings",
"text-splitter-123": "SplitText",
"ingestion-vector-store-123": "AstraDB",
}
# Check components in the ingestion graph
assert ingestion_nodes[0]["data"]["type"] == "File"
assert ingestion_nodes[0]["id"] == "file-123"
# Verify number of nodes
assert len(ingestion_nodes) == len(expected_nodes), "Unexpected number of nodes"
assert ingestion_nodes[1]["data"]["type"] == "OpenAIEmbeddings"
assert ingestion_nodes[1]["id"] == "openai-embeddings-123"
# Create a mapping of node IDs to their data for easier lookup
node_map = {node["id"]: node["data"] for node in ingestion_nodes}
assert ingestion_nodes[2]["data"]["type"] == "SplitText"
assert ingestion_nodes[2]["id"] == "text-splitter-123"
# Verify each expected node exists with correct type
for node_id, expected_type in expected_nodes.items():
assert node_id in node_map, f"Missing node {node_id}"
assert node_map[node_id]["type"] == expected_type, (
f"Node {node_id} has incorrect type. " f"Expected {expected_type}, got {node_map[node_id]['type']}"
)
assert ingestion_nodes[3]["data"]["type"] == "AstraDB"
assert ingestion_nodes[3]["id"] == "vector-store-123"
# Verify all nodes in graph are expected
unexpected_nodes = set(node_map.keys()) - set(expected_nodes.keys())
assert not unexpected_nodes, f"Found unexpected nodes: {unexpected_nodes}"
# Check edges in the ingestion graph
expected_ingestion_edges = [
("file-123", "text-splitter-123"),
("text-splitter-123", "vector-store-123"),
("openai-embeddings-123", "vector-store-123"),
("text-splitter-123", "ingestion-vector-store-123"),
("openai-embeddings-123", "ingestion-vector-store-123"),
]
assert len(ingestion_edges) == len(expected_ingestion_edges)
@ -236,7 +247,7 @@ def test_vector_store_rag_add(ingestion_graph: Graph, rag_graph: Graph):
{"id": "file-123", "type": "File"},
{"id": "openai-embeddings-123", "type": "OpenAIEmbeddings"},
{"id": "text-splitter-123", "type": "SplitText"},
{"id": "vector-store-123", "type": "AstraDB"},
{"id": "ingestion-vector-store-123", "type": "AstraDB"},
{"id": "chatinput-123", "type": "ChatInput"},
{"id": "chatoutput-123", "type": "ChatOutput"},
{"id": "openai-123", "type": "OpenAIModel"},
@ -255,8 +266,8 @@ def test_vector_store_rag_add(ingestion_graph: Graph, rag_graph: Graph):
# Expected edges in the combined graph (both ingestion and RAG edges)
expected_combined_edges = [
("file-123", "text-splitter-123"),
("text-splitter-123", "vector-store-123"),
("openai-embeddings-123", "vector-store-123"),
("text-splitter-123", "ingestion-vector-store-123"),
("openai-embeddings-123", "ingestion-vector-store-123"),
("chatinput-123", "rag-vector-store-123"),
("openai-embeddings-124", "rag-vector-store-123"),
("chatinput-123", "prompt-123"),

View file

@ -1,11 +1,4 @@
import tempfile
from pathlib import Path
from unittest.mock import ANY, Mock, patch
import httpx
import pytest
import respx
from httpx import Response
from langflow.components import data
@ -15,27 +8,6 @@ def api_request():
return data.APIRequestComponent()
@respx.mock
async def test_successful_get_request(api_request):
# Mocking a successful GET request
url = "https://example.com/api/test"
method = "GET"
mock_response = {"success": True}
respx.get(url).mock(return_value=Response(200, json=mock_response))
# Making the request
result = await api_request.make_request(
client=httpx.AsyncClient(),
method=method,
url=url,
include_httpx_metadata=True,
)
# Assertions
assert result.data["status_code"] == 200
assert result.data["result"] == mock_response
def test_parse_curl(api_request):
# Arrange
field_value = (
@ -55,146 +27,3 @@ def test_parse_curl(api_request):
assert new_build_config["urls"]["value"] == ["https://example.com/api/test"]
assert new_build_config["headers"]["value"] == {"Content-Type": "application/json"}
assert new_build_config["body"]["value"] == {"key": "value"}
@respx.mock
async def test_failed_request(api_request):
# Mocking a failed GET request
url = "https://example.com/api/test"
method = "GET"
respx.get(url).mock(return_value=Response(404))
# Making the request
result = await api_request.make_request(
client=httpx.AsyncClient(), method=method, url=url, include_httpx_metadata=True
)
# Assertions
assert result.data["status_code"] == 404
@respx.mock
async def test_timeout(api_request):
# Mocking a timeout
url = "https://example.com/api/timeout"
method = "GET"
respx.get(url).mock(side_effect=httpx.TimeoutException(message="Timeout", request=None))
# Making the request
result = await api_request.make_request(
client=httpx.AsyncClient(), method=method, url=url, timeout=1, include_httpx_metadata=True
)
# Assertions
assert result.data["status_code"] == 408
assert result.data["error"] == "Request timed out"
@respx.mock
async def test_build_with_multiple_urls(api_request):
# This test depends on having a working internet connection and accessible URLs
# It's better to mock these requests using respx or a similar library
# Setup for multiple URLs
method = "GET"
urls = ["https://example.com/api/one", "https://example.com/api/two"]
# You would mock these requests similarly to the single request tests
for url in urls:
respx.get(url).mock(return_value=Response(200, json={"success": True}))
# Do I have to mock the async client?
#
# Execute the build method
api_request.set_attributes(
{
"method": method,
"urls": urls,
}
)
results = await api_request.make_requests()
# Assertions
assert len(results) == len(urls)
@patch("langflow.components.data.directory.parallel_load_data")
@patch("langflow.components.data.directory.retrieve_file_paths")
@patch("langflow.components.data.DirectoryComponent.resolve_path")
def test_directory_component_build_with_multithreading(
mock_resolve_path, mock_retrieve_file_paths, mock_parallel_load_data
):
# Arrange
directory_component = data.DirectoryComponent()
path = Path(__file__).resolve().parent
depth = 1
max_concurrency = 2
load_hidden = False
recursive = True
silent_errors = False
use_multithreading = True
mock_resolve_path.return_value = str(path)
mock_retrieve_file_paths.return_value = [str(p) for p in path.iterdir() if p.suffix == ".py"]
mock_parallel_load_data.return_value = [Mock()]
# Act
directory_component.set_attributes(
{
"path": str(path),
"depth": depth,
"max_concurrency": max_concurrency,
"load_hidden": load_hidden,
"recursive": recursive,
"silent_errors": silent_errors,
"use_multithreading": use_multithreading,
}
)
directory_component.load_directory()
# Assert
mock_resolve_path.assert_called_once_with(str(path))
mock_retrieve_file_paths.assert_called_once_with(
str(path), load_hidden=load_hidden, recursive=recursive, depth=depth, types=ANY
)
mock_parallel_load_data.assert_called_once_with(
mock_retrieve_file_paths.return_value, silent_errors=silent_errors, max_concurrency=max_concurrency
)
def test_directory_without_mocks():
directory_component = data.DirectoryComponent()
with tempfile.TemporaryDirectory() as temp_dir:
(Path(temp_dir) / "test.txt").write_text("test", encoding="utf-8")
# also add a json file
(Path(temp_dir) / "test.json").write_text('{"test": "test"}', encoding="utf-8")
directory_component.set_attributes({"path": str(temp_dir), "use_multithreading": False})
results = directory_component.load_directory()
assert len(results) == 2
values = ["test", '{"test":"test"}']
assert all(result.text in values for result in results), [
(len(result.text), len(val)) for result, val in zip(results, values, strict=True)
]
# in ../docs/docs/components there are many mdx files
# check if the directory component can load them
# just check if the number of results is the same as the number of files
directory_component = data.DirectoryComponent()
docs_path = Path(__file__).parent.parent.parent.parent.parent / "docs" / "docs" / "Components"
directory_component.set_attributes({"path": str(docs_path), "use_multithreading": False})
results = directory_component.load_directory()
docs_files = list(docs_path.glob("*.md")) + list(docs_path.glob("*.json"))
assert len(results) == len(docs_files)
def test_url_component():
url_component = data.URLComponent()
url_component.set_attributes({"urls": ["https://langflow.org"]})
# the url component can be used to load the contents of a website
data_ = url_component.fetch_content()
assert all(value.data for value in data_)
assert all(value.text for value in data_)
assert all(value.source for value in data_)