feat: Better multi-file consistency for File Component (#8625)

* feat: First pass at output changes for file component

* SQLite support and json parsing

* [autofix.ci] apply automated fixes

* Update base_file.py

* [autofix.ci] apply automated fixes

* Update file.py

* [autofix.ci] apply automated fixes

* Split text supporting messages

* [autofix.ci] apply automated fixes

* Support structured json data

* Routine for inclusion of sample data

* Couple more template updates

* [autofix.ci] apply automated fixes

* Update Text Sentiment Analysis.json

* Update Portfolio Website Code Generator.json

* Test coverage

* [autofix.ci] apply automated fixes

* Update constants.py

* Update base_file.py

* Update service.py

* Update src/backend/base/langflow/components/processing/split_text.py

Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>

* [autofix.ci] apply automated fixes

* Update service.py

* Add get file size that is unimplemented

* Update service.py

* Fix return

* Remove type annotation

* Update graph for new templates

* Update test_vector_store_rag.py

* Update stop-building.spec.ts

* [autofix.ci] apply automated fixes

* Update fileUploadComponent.spec.ts

* Update fileUploadComponent.spec.ts

* Update fileUploadComponent.spec.ts

* Update fileUploadComponent.spec.ts

* Update fileUploadComponent.spec.ts

* Update fileUploadComponent.spec.ts

* [autofix.ci] apply automated fixes

* Update fileUploadComponent.spec.ts

* test update

* Update fileUploadComponent.spec.ts

* [autofix.ci] apply automated fixes

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Edwin Jose <edwin.jose@datastax.com>
Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
Co-authored-by: Mike Fortman <michael.fortman@datastax.com>
This commit is contained in:
Eric Hare 2025-06-26 14:12:34 -07:00 committed by GitHub
commit e1624b8c6e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 3680 additions and 3965 deletions

View file

@ -23,6 +23,7 @@ router = APIRouter(tags=["Files"], prefix="/files")
# Set the static name of the MCP servers file
MCP_SERVERS_FILE = "_mcp_servers"
SAMPLE_DATA_DIR = Path(__file__).parent / "sample_data"
async def byte_stream_generator(file_input, chunk_size: int = 8192) -> AsyncGenerator[bytes, None]:
@ -61,6 +62,21 @@ async def fetch_file_object(file_id: uuid.UUID, current_user: CurrentActiveUser,
return file
async def save_file_routine(file, storage_service, current_user: CurrentActiveUser, file_content=None, file_name=None):
"""Routine to save the file content to the storage service."""
file_id = uuid.uuid4()
if not file_content:
file_content = await file.read()
if not file_name:
file_name = file.filename
# Save the file using the storage service.
await storage_service.save_file(flow_id=str(current_user.id), file_name=file_name, data=file_content)
return file_id, file_name
@router.post("", status_code=HTTPStatus.CREATED)
@router.post("/", status_code=HTTPStatus.CREATED)
async def upload_user_file(
@ -90,18 +106,7 @@ async def upload_user_file(
# Read file content and create a unique file name
try:
# Create a unique file name
file_id = uuid.uuid4()
file_content = await file.read()
# Get file extension of the file
file_extension = "." + file.filename.split(".")[-1] if file.filename and "." in file.filename else ""
anonymized_file_name = f"{file_id!s}{file_extension}"
# Here we use the current user's id as the folder name
folder = str(current_user.id)
# Save the file using the storage service.
await storage_service.save_file(flow_id=folder, file_name=anonymized_file_name, data=file_content)
file_id, file_name = await save_file_routine(file, storage_service, current_user)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error saving file: {e}") from e
@ -138,17 +143,17 @@ async def upload_user_file(
root_filename = f"{root_filename} ({count + 1})"
# Compute the file size based on the path
file_size = await storage_service.get_file_size(flow_id=folder, file_name=anonymized_file_name)
# Compute the file path
file_path = f"{folder}/{anonymized_file_name}"
file_size = await storage_service.get_file_size(
flow_id=str(current_user.id),
file_name=file_name,
)
# Create a new file record
new_file = UserFile(
id=file_id,
user_id=current_user.id,
name=root_filename,
path=file_path,
path=f"{current_user.id}/{file_name}",
size=file_size,
)
session.add(new_file)
@ -178,14 +183,61 @@ async def get_file_by_name(
raise HTTPException(status_code=500, detail=f"Error fetching file: {e}") from e
async def load_sample_files(current_user: CurrentActiveUser, session: DbSession, storage_service: StorageService):
# Check if the sample files in the SAMPLE_DATA_DIR exist
for sample_file_path in Path(SAMPLE_DATA_DIR).iterdir():
sample_file_name = sample_file_path.name
root_filename, _ = sample_file_name.rsplit(".", 1)
# Check if the sample file exists in the storage service
existing_sample_file = await get_file_by_name(
file_name=root_filename, current_user=current_user, session=session
)
if existing_sample_file:
continue
# Read the binary data of the sample file
binary_data = sample_file_path.read_bytes()
# Write the sample file content to the storage service
file_id, _ = await save_file_routine(
sample_file_path,
storage_service,
current_user,
file_content=binary_data,
file_name=sample_file_name,
)
file_size = await storage_service.get_file_size(
flow_id=str(current_user.id),
file_name=sample_file_name,
)
# Create a UserFile object for the sample file
sample_file = UserFile(
id=file_id,
user_id=current_user.id,
name=root_filename,
path=sample_file_name,
size=file_size,
)
session.add(sample_file)
await session.commit()
await session.refresh(sample_file)
@router.get("")
@router.get("/", status_code=HTTPStatus.OK)
async def list_files(
current_user: CurrentActiveUser,
session: DbSession,
# storage_service: Annotated[StorageService, Depends(get_storage_service)],
) -> list[UserFile]:
"""List the files available to the current user."""
try:
# Load sample files if they don't exist
# TODO: Pending further testing
# await load_sample_files(current_user, session, get_storage_service())
# Fetch from the UserFile table
stmt = select(UserFile).where(UserFile.user_id == current_user.id)
results = await session.exec(stmt)

View file

@ -43,4 +43,4 @@ FIELD_FORMAT_ATTRIBUTES = [
]
SKIPPED_FIELD_ATTRIBUTES = ["advanced"]
ORJSON_OPTIONS = orjson.OPT_INDENT_2 | orjson.OPT_SORT_KEYS | orjson.OPT_OMIT_MICROSECONDS
SKIPPED_COMPONENTS = {"LanguageModelComponent", "Agent"}
SKIPPED_COMPONENTS = {"LanguageModelComponent", "Agent", "File", "FileComponent"}

View file

@ -1,8 +1,11 @@
import ast
import json
import shutil
import tarfile
from abc import ABC, abstractmethod
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING
from zipfile import ZipFile, is_zipfile
import pandas as pd
@ -13,6 +16,9 @@ from langflow.schema.data import Data
from langflow.schema.dataframe import DataFrame
from langflow.schema.message import Message
if TYPE_CHECKING:
from collections.abc import Callable
class BaseFileComponent(Component, ABC):
"""Base class for handling file processing components.
@ -174,8 +180,7 @@ class BaseFileComponent(Component, ABC):
]
_base_outputs = [
Output(display_name="Loaded Files", name="dataframe", method="load_files"),
Output(display_name="Raw Content", name="message", method="load_files_message"),
Output(display_name="Files", name="dataframe", method="load_files"),
]
@abstractmethod
@ -257,6 +262,104 @@ class BaseFileComponent(Component, ABC):
return Message(text=sep.join(parts))
def load_files_path(self) -> Message:
"""Returns a Message containing file paths from loaded files.
Returns:
Message: Message containing file paths
"""
files = self._validate_and_resolve_paths()
paths = [file.path.as_posix() for file in files if file.path.exists()]
return Message(text="\n".join(paths) if paths else "")
def load_files_structured_helper(self, file_path: str) -> list[dict] | None:
if not file_path:
return None
# Map file extensions to pandas read functions with type annotation
file_readers: dict[str, Callable[[str], pd.DataFrame]] = {
".csv": pd.read_csv,
".xlsx": pd.read_excel,
".parquet": pd.read_parquet,
# TODO: sqlite and json support?
}
# Get file extension in lowercase
ext = Path(file_path).suffix.lower()
# Get the appropriate reader function or None
reader = file_readers.get(ext)
if reader:
result = reader(file_path) # MyPy now knows reader is callable
return result.to_dict("records")
return None
def load_files_structured(self) -> DataFrame:
"""Load files and return as DataFrame with structured content.
Returns:
DataFrame: DataFrame containing structured content from all files
"""
data_list = self.load_files_core()
if not data_list:
return DataFrame()
# Get the file path from the first Data object
file_path = data_list[0].data.get(self.SERVER_FILE_PATH_FIELDNAME, None)
# If file_path is provided and is a CSV, read it directly
if file_path and str(file_path).lower().endswith((".csv", ".xlsx", ".parquet")):
rows = self.load_files_structured_helper(file_path)
else:
# Convert Data objects to a list of dictionaries
# TODO: Parse according to docling standards
rows = [data_list[0].data]
self.status = DataFrame(rows)
return DataFrame(rows)
def parse_string_to_dict(self, s: str) -> dict:
# Try JSON first (handles true/false/null)
try:
result = json.loads(s)
if isinstance(result, dict):
return result
except json.JSONDecodeError:
pass
# Fall back to Python literal evaluation
try:
result = ast.literal_eval(s)
if isinstance(result, dict):
return result
except (SyntaxError, ValueError):
pass
# If all parsing fails, return the fallback
return {"value": s}
def load_files_json(self) -> Data:
"""Load files and return as a single Data object containing JSON content.
Returns:
Data: Data object containing JSON content from all files
"""
data_list = self.load_files_core()
if not data_list:
return Data()
# Grab the JSON data
json_data = data_list[0].data[data_list[0].text_key]
json_data = self.parse_string_to_dict(json_data)
self.status = Data(data=json_data)
return Data(data=json_data)
def load_files(self) -> DataFrame:
"""Load files and return as DataFrame.
@ -267,30 +370,21 @@ class BaseFileComponent(Component, ABC):
if not data_list:
return DataFrame()
# First handle CSV files specially
csv_data = []
non_csv_rows = []
# Convert Data objects to a list of dictionaries
all_rows = []
for data in data_list:
file_path = data.data.get(self.SERVER_FILE_PATH_FIELDNAME)
if file_path and str(file_path).lower().endswith(".csv"):
try:
csv_data.extend(pd.read_csv(file_path).to_dict("records"))
except Exception as e:
self.log(f"Error processing CSV file {file_path}: {e}")
if not self.silent_errors:
raise
else:
# Handle non-CSV files as before
row = dict(data.data) if data.data else {}
if "text" in data.data:
row["text"] = data.data["text"]
if file_path:
row["file_path"] = file_path
non_csv_rows.append(row)
row = dict(data.data) if data.data else {}
# Add text if available, otherwise use the data's text property
if "text" in data.data:
row["text"] = data.data["text"]
if file_path:
row["file_path"] = file_path
all_rows.append(row)
self.status = DataFrame(all_rows)
# Combine CSV and non-CSV data
all_rows = csv_data + non_csv_rows
return DataFrame(all_rows)
@property

View file

@ -1,6 +1,9 @@
from copy import deepcopy
from typing import Any
from langflow.base.data.base_file import BaseFileComponent
from langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data
from langflow.io import BoolInput, IntInput
from langflow.io import BoolInput, FileInput, IntInput, Output
from langflow.schema.data import Data
@ -18,8 +21,15 @@ class FileComponent(BaseFileComponent):
VALID_EXTENSIONS = TEXT_FILE_TYPES
_base_inputs = deepcopy(BaseFileComponent._base_inputs)
for input_item in _base_inputs:
if isinstance(input_item, FileInput) and input_item.name == "path":
input_item.real_time_refresh = True
break
inputs = [
*BaseFileComponent._base_inputs,
*_base_inputs,
BoolInput(
name="use_multithreading",
display_name="[Deprecated] Use Multithreading",
@ -37,9 +47,45 @@ class FileComponent(BaseFileComponent):
]
outputs = [
*BaseFileComponent._base_outputs,
Output(display_name="Raw Content", name="message", method="load_files_message"),
]
def update_outputs(self, frontend_node: dict, field_name: str, field_value: Any) -> dict:
"""Dynamically show only the relevant output based on the number of files processed."""
if field_name == "path":
# Add outputs based on the number of files in the path
if len(field_value) == 0:
return frontend_node
frontend_node["outputs"] = []
if len(field_value) == 1:
# We need to check if the file is structured content
file_path = frontend_node["template"]["path"]["file_path"][0]
if file_path.endswith((".csv", ".xlsx", ".parquet")):
frontend_node["outputs"].append(
Output(display_name="Structured Content", name="dataframe", method="load_files_structured"),
)
elif file_path.endswith(".json"):
frontend_node["outputs"].append(
Output(display_name="Structured Content", name="json", method="load_files_json"),
)
# All files get the raw content and path outputs
frontend_node["outputs"].append(
Output(display_name="Raw Content", name="message", method="load_files_message"),
)
frontend_node["outputs"].append(
Output(display_name="File Path", name="path", method="load_files_path"),
)
else:
# For multiple files, we only show the files output
frontend_node["outputs"].append(
Output(display_name="Files", name="dataframe", method="load_files"),
)
return frontend_node
def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]:
"""Processes files either sequentially or in parallel, depending on concurrency settings.

View file

@ -4,6 +4,7 @@ from langflow.custom.custom_component.component import Component
from langflow.io import DropdownInput, HandleInput, IntInput, MessageTextInput, Output
from langflow.schema.data import Data
from langflow.schema.dataframe import DataFrame
from langflow.schema.message import Message
from langflow.utils.util import unescape_string
@ -16,9 +17,9 @@ class SplitTextComponent(Component):
inputs = [
HandleInput(
name="data_inputs",
display_name="Data or DataFrame",
display_name="Input",
info="The data with texts to split in chunks.",
input_types=["Data", "DataFrame"],
input_types=["Data", "DataFrame", "Message"],
required=True,
),
IntInput(
@ -93,6 +94,9 @@ class SplitTextComponent(Component):
except Exception as e:
msg = f"Error converting DataFrame to documents: {e}"
raise TypeError(msg) from e
elif isinstance(self.data_inputs, Message):
self.data_inputs = [self.data_inputs.to_data()]
return self.split_text_base()
else:
if not self.data_inputs:
msg = "No data inputs provided"

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -1,7 +1,6 @@
from langflow.components.data import FileComponent
from langflow.components.input_output import ChatInput, ChatOutput
from langflow.components.languagemodels import OpenAIModelComponent
from langflow.components.processing import ParserComponent
from langflow.components.models import LanguageModelComponent
from langflow.components.prompts import PromptComponent
from langflow.graph import Graph
@ -22,19 +21,17 @@ Question:
Answer:
"""
file_component = FileComponent()
parse_data_component = ParserComponent()
parse_data_component.set(input_data=file_component.load_files)
chat_input = ChatInput()
prompt_component = PromptComponent()
prompt_component.set(
template=template,
context=parse_data_component.parse_combined_text,
context=file_component.load_files_message,
question=chat_input.message_response,
)
openai_component = OpenAIModelComponent()
openai_component.set(input_value=prompt_component.build_prompt)
openai_component = LanguageModelComponent()
openai_component.set(input_value=chat_input.message_response, system_message=prompt_component.build_prompt)
chat_output = ChatOutput()
chat_output.set(input_value=openai_component.text_response)

View file

@ -3,7 +3,7 @@ from textwrap import dedent
from langflow.components.data import FileComponent
from langflow.components.embeddings import OpenAIEmbeddingsComponent
from langflow.components.input_output import ChatInput, ChatOutput
from langflow.components.languagemodels import OpenAIModelComponent
from langflow.components.models import LanguageModelComponent
from langflow.components.processing import ParserComponent
from langflow.components.processing.split_text import SplitTextComponent
from langflow.components.prompts import PromptComponent
@ -15,7 +15,7 @@ def ingestion_graph():
# Ingestion Graph
file_component = FileComponent()
text_splitter = SplitTextComponent()
text_splitter.set(data_inputs=file_component.load_files)
text_splitter.set(data_inputs=file_component.load_files_message)
openai_embeddings = OpenAIEmbeddingsComponent()
vector_store = AstraDBVectorStoreComponent()
vector_store.set(
@ -49,7 +49,7 @@ def rag_graph():
question=chat_input.message_response,
)
openai_component = OpenAIModelComponent()
openai_component = LanguageModelComponent()
openai_component.set(input_value=prompt_component.build_prompt)
chat_output = ChatOutput()

View file

@ -113,7 +113,7 @@ class LocalStorageService(StorageService):
"""Perform any cleanup operations when the service is being torn down."""
# No specific teardown actions required for local
async def get_file_size(self, flow_id: str, file_name: str) -> None:
async def get_file_size(self, flow_id: str, file_name: str):
"""Get the size of a file in the local storage."""
# Get the file size from the file path
file_path = self.data_dir / flow_id / file_name

View file

@ -99,3 +99,6 @@ class S3StorageService(StorageService):
async def teardown(self) -> None:
"""Perform any cleanup operations when the service is being torn down."""
# No specific teardown actions required for S3 storage at the moment.
async def get_file_size(self, flow_id: str, file_name: str):
raise NotImplementedError

View file

@ -39,6 +39,10 @@ class StorageService(Service):
async def list_files(self, flow_id: str) -> list[str]:
raise NotImplementedError
@abstractmethod
async def get_file_size(self, flow_id: str, file_name: str):
raise NotImplementedError
@abstractmethod
async def delete_file(self, flow_id: str, file_name: str) -> None:
raise NotImplementedError

View file

@ -0,0 +1,60 @@
from langflow.components.data import FileComponent
from langflow.io import Output
class TestFileComponentDynamicOutputs:
def test_update_outputs_single_csv_file(self):
"""Test single CSV file shows structured + raw outputs."""
component = FileComponent()
frontend_node = {"outputs": [], "template": {"path": {"file_path": ["test.csv"]}}}
result = component.update_outputs(frontend_node, "path", ["test.csv"])
assert len(result["outputs"]) == 3
output_names = [output.name for output in result["outputs"]]
assert "dataframe" in output_names # Structured content
assert "message" in output_names # Raw content
assert "path" in output_names # File path
def test_update_outputs_single_json_file(self):
"""Test single JSON file shows JSON + raw outputs."""
component = FileComponent()
frontend_node = {"outputs": [], "template": {"path": {"file_path": ["data.json"]}}}
result = component.update_outputs(frontend_node, "path", ["data.json"])
assert len(result["outputs"]) == 3
output_names = [output.name for output in result["outputs"]]
assert "json" in output_names # JSON content
assert "message" in output_names # Raw content
assert "path" in output_names # File path
def test_update_outputs_multiple_files(self):
"""Test multiple files show only Files output."""
component = FileComponent()
frontend_node = {"outputs": [], "template": {"path": {"file_path": ["file1.txt", "file2.txt"]}}}
result = component.update_outputs(frontend_node, "path", ["file1.txt", "file2.txt"])
assert len(result["outputs"]) == 1
assert result["outputs"][0].name == "dataframe"
assert result["outputs"][0].display_name == "Files"
def test_update_outputs_empty_path(self):
"""Test empty path results in no outputs."""
component = FileComponent()
frontend_node = {"outputs": [], "template": {"path": {"file_path": []}}}
result = component.update_outputs(frontend_node, "path", [])
assert len(result["outputs"]) == 0
def test_update_outputs_non_path_field(self):
"""Test non-path fields don't affect outputs."""
component = FileComponent()
original_outputs = [Output(display_name="Test", name="test", method="test_method")]
frontend_node = {"outputs": original_outputs, "template": {"path": {"file_path": ["value"]}}}
result = component.update_outputs(frontend_node, "other_field", "value")
assert result["outputs"] == original_outputs

View file

@ -15,6 +15,7 @@ from langflow.graph.graph.base import Graph
from langflow.graph.graph.constants import Finish
from langflow.schema import Data
from langflow.schema.dataframe import DataFrame
from langflow.schema.message import Message
@pytest.fixture
@ -22,9 +23,9 @@ def ingestion_graph():
# Ingestion Graph
file_component = FileComponent(_id="file-123")
file_component.set(path="test.txt")
file_component.set_on_output(name="dataframe", value=Data(text="This is a test file."), cache=True)
file_component.set_on_output(name="message", value=Message(text="This is a test file."), cache=True)
text_splitter = SplitTextComponent(_id="text-splitter-123")
text_splitter.set(data_inputs=file_component.load_files)
text_splitter.set(data_inputs=file_component.load_files_message)
openai_embeddings = OpenAIEmbeddingsComponent(_id="openai-embeddings-123")
openai_embeddings.set(
openai_api_key="sk-123", openai_api_base="https://api.openai.com/v1", openai_api_type="openai"

View file

@ -82,9 +82,7 @@ test(
await page
.getByTestId("handle-urlcomponent-shownode-extracted pages-right")
.click();
await page
.getByTestId("handle-splittext-shownode-data or dataframe-left")
.click();
await page.getByTestId("handle-splittext-shownode-input-left").click();
//connection 2
await page

View file

@ -250,33 +250,8 @@ test(
await adjustScreenView(page);
await page
.getByTestId("handle-file-shownode-loaded files-right")
.first()
.click();
await page.getByTestId("handle-file-shownode-files-right").first().click();
await page
.getByTestId("processingParser")
.hover()
.then(async () => {
await page.getByTestId("add-component-button-parser").click();
});
await adjustScreenView(page);
await page
.getByTestId("handle-file-shownode-loaded files-right")
.first()
.click();
await page
.getByTestId("handle-parsercomponent-shownode-data or dataframe-left")
.first()
.click();
await page
.getByTestId("handle-parsercomponent-shownode-parsed text-right")
.first()
.click();
await page
.getByTestId("handle-chatoutput-noshownode-inputs-target")
.first()
@ -348,6 +323,17 @@ test(
timeout: 1000,
});
await page.getByTestId(`remove-file-button-${renamedTxtFile}`).click();
await page
.getByTestId("handle-file-shownode-raw content-right")
.first()
.click();
await page
.getByTestId("handle-chatoutput-noshownode-inputs-target")
.first()
.click();
await page
.getByRole("button", { name: "Playground", exact: true })
.click();