feat: add dataframe support for vector stores (#6990)

* feat: add dataframe support for vector stores

* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes (attempt 2/3)

* [autofix.ci] apply automated fixes

* fix: remove import

* fix: ruff error

* fix: mypy errors

* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Edwin Jose <edwin.jose@datastax.com>
Co-authored-by: italojohnny <italojohnnydosanjos@gmail.com>
Co-authored-by: cristhianzl <cristhian.lousa@gmail.com>
This commit is contained in:
Rodrigo Nader 2025-03-17 16:05:12 -03:00 committed by GitHub
commit 86502232a1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
27 changed files with 129 additions and 32 deletions

View file

@ -1,12 +1,12 @@
from abc import abstractmethod
from functools import wraps
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
from langflow.custom import Component
from langflow.field_typing import Text, VectorStore
from langflow.helpers.data import docs_to_data
from langflow.inputs.inputs import BoolInput
from langflow.io import DataInput, MultilineInput, Output
from langflow.io import HandleInput, MultilineInput, Output
from langflow.schema import Data, DataFrame
if TYPE_CHECKING:
@ -56,9 +56,11 @@ class LCVectorStoreComponent(Component):
trace_type = "retriever"
inputs = [
DataInput(
HandleInput(
name="ingest_data",
display_name="Ingest Data",
input_types=["Data", "DataFrame"],
is_list=True,
),
MultilineInput(
name="search_query",
@ -99,6 +101,24 @@ class LCVectorStoreComponent(Component):
msg = f"Method '{method_name}' must be defined."
raise ValueError(msg)
def _prepare_ingest_data(self) -> list[Any]:
"""Prepares ingest_data by converting DataFrame to Data if needed."""
ingest_data: list | Data | DataFrame = self.ingest_data
if not ingest_data:
return [ingest_data]
if not isinstance(ingest_data, list):
ingest_data = [ingest_data]
result = []
for _input in ingest_data:
if isinstance(_input, DataFrame):
result.extend(_input.to_data_list())
else:
result.append(_input)
return result
def search_with_vector_store(
self,
input_value: Text,

View file

@ -995,6 +995,8 @@ class AstraDBVectorStoreComponent(LCVectorStoreComponent):
return vector_store
def _add_documents_to_vector_store(self, vector_store) -> None:
self.ingest_data = self._prepare_ingest_data()
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):

View file

@ -226,6 +226,8 @@ class AstraDBGraphVectorStoreComponent(LCVectorStoreComponent):
return vector_store
def _add_documents_to_vector_store(self, vector_store) -> None:
self.ingest_data = self._prepare_ingest_data()
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):

View file

@ -158,8 +158,11 @@ class CassandraVectorStoreComponent(LCVectorStoreComponent):
password=self.token,
cluster_kwargs=self.cluster_kwargs,
)
documents = []
# Convert DataFrame to Data if needed using parent's method
self.ingest_data = self._prepare_ingest_data()
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())

View file

@ -144,6 +144,9 @@ class CassandraGraphVectorStoreComponent(LCVectorStoreComponent):
password=self.token,
cluster_kwargs=self.cluster_kwargs,
)
self.ingest_data = self._prepare_ingest_data()
documents = []
for _input in self.ingest_data or []:

View file

@ -7,7 +7,7 @@ from typing_extensions import override
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.base.vectorstores.utils import chroma_collection_to_data
from langflow.io import BoolInput, DropdownInput, HandleInput, IntInput, StrInput
from langflow.schema import Data
from langflow.schema import Data, DataFrame
class ChromaVectorStoreComponent(LCVectorStoreComponent):
@ -122,10 +122,14 @@ class ChromaVectorStoreComponent(LCVectorStoreComponent):
def _add_documents_to_vector_store(self, vector_store: "Chroma") -> None:
"""Adds documents to the Vector Store."""
if not self.ingest_data:
ingest_data: list | Data | DataFrame = self.ingest_data
if not ingest_data:
self.status = ""
return
# Convert DataFrame to Data if needed using parent's method
ingest_data = self._prepare_ingest_data()
stored_documents_without_id = []
if self.allow_duplicates:
stored_data = []
@ -136,7 +140,7 @@ class ChromaVectorStoreComponent(LCVectorStoreComponent):
stored_documents_without_id.append(value)
documents = []
for _input in self.ingest_data or []:
for _input in ingest_data or []:
if isinstance(_input, Data):
if _input not in stored_documents_without_id:
documents.append(_input.to_lc_document())

View file

@ -83,6 +83,9 @@ class ClickhouseVectorStoreComponent(LCVectorStoreComponent):
msg = f"Failed to connect to Clickhouse: {e}"
raise ValueError(msg) from e
# Convert DataFrame to Data if needed using parent's method
self.ingest_data = self._prepare_ingest_data()
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):

View file

@ -55,6 +55,8 @@ class CouchbaseVectorStoreComponent(LCVectorStoreComponent):
msg = f"Failed to connect to Couchbase: {e}"
raise ValueError(msg) from e
self.ingest_data = self._prepare_ingest_data()
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):

View file

@ -136,6 +136,8 @@ class ElasticsearchVectorStoreComponent(LCVectorStoreComponent):
def _prepare_documents(self) -> list[Document]:
"""Prepares documents from the input data to add to the vector store."""
self.ingest_data = self._prepare_ingest_data()
documents = []
for data in self.ingest_data:
if isinstance(data, Data):

View file

@ -69,6 +69,9 @@ class FaissVectorStoreComponent(LCVectorStoreComponent):
path = self.get_persist_directory()
path.mkdir(parents=True, exist_ok=True)
# Convert DataFrame to Data if needed using parent's method
self.ingest_data = self._prepare_ingest_data()
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):

View file

@ -242,6 +242,9 @@ class HCDVectorStoreComponent(LCVectorStoreComponent):
return vector_store
def _add_documents_to_vector_store(self, vector_store) -> None:
# Convert DataFrame to Data if needed using parent's method
self.ingest_data = self._prepare_ingest_data()
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):

View file

@ -85,6 +85,9 @@ class MilvusVectorStoreComponent(LCVectorStoreComponent):
timeout=self.timeout,
)
# Convert DataFrame to Data if needed using parent's method
self.ingest_data = self._prepare_ingest_data()
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):

View file

@ -85,6 +85,9 @@ class MongoVectorStoreComponent(LCVectorStoreComponent):
msg = f"Failed to connect to MongoDB Atlas: {e}"
raise ValueError(msg) from e
# Convert DataFrame to Data if needed using parent's method
self.ingest_data = self._prepare_ingest_data()
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):

View file

@ -132,6 +132,9 @@ class OpenSearchVectorStoreComponent(LCVectorStoreComponent):
def _add_documents_to_vector_store(self, vector_store: "OpenSearchVectorSearch") -> None:
"""Adds documents to the Vector Store."""
# Convert DataFrame to Data if needed using parent's method
self.ingest_data = self._prepare_ingest_data()
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):

View file

@ -29,6 +29,9 @@ class PGVectorStoreComponent(LCVectorStoreComponent):
@check_cached_vector_store
def build_vector_store(self) -> PGVector:
# Convert DataFrame to Data if needed using parent's method
self.ingest_data = self._prepare_ingest_data()
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):

View file

@ -73,9 +73,13 @@ class PineconeVectorStoreComponent(LCVectorStoreComponent):
error_msg = "Error building Pinecone vector store"
raise ValueError(error_msg) from e
else:
self.ingest_data = self._prepare_ingest_data()
# Process documents if any
documents = []
if self.ingest_data:
# Convert DataFrame to Data if needed using parent's method
for doc in self.ingest_data:
if isinstance(doc, Data):
documents.append(doc.to_lc_document())

View file

@ -69,8 +69,11 @@ class QdrantVectorStoreComponent(LCVectorStoreComponent):
}
server_kwargs = {k: v for k, v in server_kwargs.items() if v is not None}
documents = []
# Convert DataFrame to Data if needed using parent's method
self.ingest_data = self._prepare_ingest_data()
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())

View file

@ -41,8 +41,10 @@ class RedisVectorStoreComponent(LCVectorStoreComponent):
@check_cached_vector_store
def build_vector_store(self) -> Redis:
documents = []
# Convert DataFrame to Data if needed using parent's method
self.ingest_data = self._prepare_ingest_data()
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())

View file

@ -33,6 +33,9 @@ class SupabaseVectorStoreComponent(LCVectorStoreComponent):
def build_vector_store(self) -> SupabaseVectorStore:
supabase: Client = create_client(self.supabase_url, supabase_key=self.supabase_service_key)
# Convert DataFrame to Data if needed using parent's method
self.ingest_data = self._prepare_ingest_data()
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):

View file

@ -68,6 +68,9 @@ class UpstashVectorStoreComponent(LCVectorStoreComponent):
def build_vector_store(self) -> UpstashVectorStore:
use_upstash_embedding = self.embedding is None
# Convert DataFrame to Data if needed using parent's method
self.ingest_data = self._prepare_ingest_data()
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):

View file

@ -5,7 +5,7 @@ from langchain_community.vectorstores import Vectara
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import HandleInput, IntInput, SecretStrInput, StrInput
from langflow.schema import Data
from langflow.schema import Data, DataFrame
if TYPE_CHECKING:
from langchain_community.vectorstores import Vectara
@ -58,12 +58,16 @@ class VectaraVectorStoreComponent(LCVectorStoreComponent):
def _add_documents_to_vector_store(self, vector_store: "Vectara") -> None:
"""Adds documents to the Vector Store."""
if not self.ingest_data:
ingest_data: list | Data | DataFrame = self.ingest_data
if not ingest_data:
self.status = "No documents to add to Vectara"
return
# Convert DataFrame to Data if needed using parent's method
ingest_data = self._prepare_ingest_data()
documents = []
for _input in self.ingest_data or []:
for _input in ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:

View file

@ -47,6 +47,9 @@ class WeaviateVectorStoreComponent(LCVectorStoreComponent):
msg = f"Weaviate requires the index name to be capitalized. Use: {self.index_name.capitalize()}"
raise ValueError(msg)
# Convert DataFrame to Data if needed using parent's method
self.ingest_data = self._prepare_ingest_data()
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):

View file

@ -809,7 +809,7 @@
},
"stream": {
"_input_type": "BoolInput",
"advanced": false,
"advanced": true,
"display_name": "Stream",
"dynamic": false,
"info": "Stream the response from the model. Streaming works only in Chat.",

View file

@ -2325,7 +2325,7 @@
"show": true,
"title_case": false,
"type": "code",
"value": "from langchain_sambanova import ChatSambaNovaCloud\nfrom pydantic.v1 import SecretStr\n\nfrom langflow.base.models.model import LCModelComponent\nfrom langflow.base.models.sambanova_constants import SAMBANOVA_MODEL_NAMES\nfrom langflow.field_typing import LanguageModel\nfrom langflow.field_typing.range_spec import RangeSpec\nfrom langflow.io import DropdownInput, IntInput, SecretStrInput, SliderInput, StrInput\n\n\nclass SambaNovaComponent(LCModelComponent):\n display_name = \"SambaNova\"\n description = \"Generate text using Sambanova LLMs.\"\n documentation = \"https://cloud.sambanova.ai/\"\n icon = \"SambaNova\"\n name = \"SambaNovaModel\"\n\n inputs = [\n *LCModelComponent._base_inputs,\n StrInput(\n name=\"base_url\",\n display_name=\"SambaNova Cloud Base Url\",\n advanced=True,\n info=\"The base URL of the Sambanova Cloud API. \"\n \"Defaults to https://api.sambanova.ai/v1/chat/completions. \"\n \"You can change this to use other urls like Sambastudio\",\n ),\n DropdownInput(\n name=\"model_name\",\n display_name=\"Model Name\",\n advanced=False,\n options=SAMBANOVA_MODEL_NAMES,\n value=SAMBANOVA_MODEL_NAMES[0],\n ),\n SecretStrInput(\n name=\"api_key\",\n display_name=\"Sambanova API Key\",\n info=\"The Sambanova API Key to use for the Sambanova model.\",\n advanced=False,\n value=\"SAMBANOVA_API_KEY\",\n required=True,\n ),\n IntInput(\n name=\"max_tokens\",\n display_name=\"Max Tokens\",\n advanced=True,\n value=2048,\n info=\"The maximum number of tokens to generate.\",\n ),\n SliderInput(\n name=\"top_p\",\n display_name=\"top_p\",\n advanced=True,\n value=1.0,\n range_spec=RangeSpec(min=0, max=1, step=0.01),\n info=\"Model top_p\",\n ),\n SliderInput(\n name=\"temperature\", display_name=\"Temperature\", value=0.1, range_spec=RangeSpec(min=0, max=2, step=0.01)\n ),\n ]\n\n def build_model(self) -> LanguageModel: # type: ignore[type-var]\n sambanova_url = self.base_url\n sambanova_api_key = self.api_key\n model_name = self.model_name\n max_tokens = self.max_tokens\n top_p = self.top_p\n temperature = self.temperature\n\n api_key = SecretStr(sambanova_api_key).get_secret_value() if sambanova_api_key else None\n\n return ChatSambaNovaCloud(\n model=model_name,\n max_tokens=max_tokens or 1024,\n temperature=temperature or 0.07,\n top_p=top_p,\n sambanova_url=sambanova_url,\n sambanova_api_key=api_key,\n )\n"
"value": "from langchain_sambanova import ChatSambaNovaCloud\nfrom pydantic.v1 import SecretStr\n\nfrom langflow.base.models.model import LCModelComponent\nfrom langflow.base.models.sambanova_constants import SAMBANOVA_MODEL_NAMES\nfrom langflow.field_typing import LanguageModel\nfrom langflow.field_typing.range_spec import RangeSpec\nfrom langflow.io import DropdownInput, IntInput, SecretStrInput, SliderInput, StrInput\n\n\nclass SambaNovaComponent(LCModelComponent):\n display_name = \"SambaNova\"\n description = \"Generate text using Sambanova LLMs.\"\n documentation = \"https://cloud.sambanova.ai/\"\n icon = \"SambaNova\"\n name = \"SambaNovaModel\"\n\n inputs = [\n *LCModelComponent._base_inputs,\n StrInput(\n name=\"base_url\",\n display_name=\"SambaNova Cloud Base Url\",\n advanced=True,\n info=\"The base URL of the Sambanova Cloud API. \"\n \"Defaults to https://api.sambanova.ai/v1/chat/completions. \"\n \"You can change this to use other urls like Sambastudio\",\n ),\n DropdownInput(\n name=\"model_name\",\n display_name=\"Model Name\",\n advanced=False,\n options=SAMBANOVA_MODEL_NAMES,\n value=SAMBANOVA_MODEL_NAMES[0],\n ),\n SecretStrInput(\n name=\"api_key\",\n display_name=\"Sambanova API Key\",\n info=\"The Sambanova API Key to use for the Sambanova model.\",\n advanced=False,\n value=\"SAMBANOVA_API_KEY\",\n required=True,\n ),\n IntInput(\n name=\"max_tokens\",\n display_name=\"Max Tokens\",\n advanced=True,\n value=2048,\n info=\"The maximum number of tokens to generate.\",\n ),\n SliderInput(\n name=\"top_p\",\n display_name=\"top_p\",\n advanced=True,\n value=1.0,\n range_spec=RangeSpec(min=0, max=1, step=0.01),\n info=\"Model top_p\",\n ),\n SliderInput(\n name=\"temperature\",\n display_name=\"Temperature\",\n value=0.1,\n range_spec=RangeSpec(min=0, max=2, step=0.01),\n advanced=True,\n ),\n ]\n\n def build_model(self) -> LanguageModel: # type: ignore[type-var]\n sambanova_url = self.base_url\n sambanova_api_key = self.api_key\n model_name = self.model_name\n max_tokens = self.max_tokens\n top_p = self.top_p\n temperature = self.temperature\n\n api_key = SecretStr(sambanova_api_key).get_secret_value() if sambanova_api_key else None\n\n return ChatSambaNovaCloud(\n model=model_name,\n max_tokens=max_tokens or 1024,\n temperature=temperature or 0.07,\n top_p=top_p,\n sambanova_url=sambanova_url,\n sambanova_api_key=api_key,\n )\n"
},
"input_value": {
"_input_type": "MessageInput",
@ -2405,7 +2405,7 @@
},
"stream": {
"_input_type": "BoolInput",
"advanced": false,
"advanced": true,
"display_name": "Stream",
"dynamic": false,
"info": "Stream the response from the model. Streaming works only in Chat.",
@ -2447,7 +2447,7 @@
},
"temperature": {
"_input_type": "SliderInput",
"advanced": false,
"advanced": true,
"display_name": "Temperature",
"dynamic": false,
"info": "",

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -168,3 +168,13 @@ class DataFrame(pandas_DataFrame):
DataFrame: A new DataFrame with the converted Documents
"""
return DataFrame(docs)
def __eq__(self, other):
"""Override equality to handle comparison with empty DataFrames and non-DataFrame objects."""
if self.empty:
return False
if isinstance(other, list) and not other: # Empty list case
return False
if not isinstance(other, DataFrame | pd.DataFrame): # Non-DataFrame case
return False
return super().__eq__(other)