feat: add cassandra components (#2056)

* Add cassandra store component

* Add cassandra search component

* revert poetry changes

* fix type

* Add cassandra icon

* Add Cassandra Message Writer

* Add cassandra message reader

* poetry

* Fix init of cass reader

* move cassio import to base project and inline imports in backend

* running make format

* remove file

* remove cassio import

* update lockfile

* Actually update lockfile:

* merge fixes
This commit is contained in:
Jordan Frazier 2024-06-10 15:04:24 -07:00 committed by GitHub
commit 11ef216c0a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 620 additions and 63 deletions

View file

@ -1,9 +1,7 @@
from typing import Optional, cast
from langchain_astradb.chat_message_histories import AstraDBChatMessageHistory
from langflow.base.memory.memory import BaseMemoryComponent
from langflow.field_typing import Text
from langflow.schema.schema import Record
@ -51,6 +49,14 @@ class AstraDBMessageReaderComponent(BaseMemoryComponent):
Returns:
list[Record]: A list of Record objects representing the search results.
"""
try:
from langchain_astradb.chat_message_histories import AstraDBChatMessageHistory
except ImportError:
raise ImportError(
"Could not import langchain Astra DB integration package. "
"Please install it with `pip install langchain-astradb`."
)
memory: AstraDBChatMessageHistory = cast(AstraDBChatMessageHistory, kwargs.get("memory"))
if not memory:
raise ValueError("AstraDBChatMessageHistory instance is required.")
@ -63,14 +69,14 @@ class AstraDBMessageReaderComponent(BaseMemoryComponent):
def build(
self,
session_id: Text,
session_id: str,
collection_name: str,
token: str,
api_endpoint: str,
namespace: Optional[str] = None,
) -> list[Record]:
try:
pass
from langchain_astradb.chat_message_histories import AstraDBChatMessageHistory
except ImportError:
raise ImportError(
"Could not import langchain Astra DB integration package. "

View file

@ -1,11 +1,9 @@
from typing import Optional
from langflow.base.memory.memory import BaseMemoryComponent
from langflow.field_typing import Text
from langflow.schema.schema import Record
from langchain_core.messages import BaseMessage
from langchain_astradb import AstraDBChatMessageHistory
class AstraDBMessageWriterComponent(BaseMemoryComponent):
@ -50,7 +48,7 @@ class AstraDBMessageWriterComponent(BaseMemoryComponent):
self,
sender: str,
sender_name: str,
text: Text,
text: str,
session_id: str,
metadata: Optional[dict] = None,
**kwargs,
@ -59,17 +57,27 @@ class AstraDBMessageWriterComponent(BaseMemoryComponent):
Adds a message to the AstraDBChatMessageHistory memory.
Args:
sender (Text): The type of the message sender. Valid values are "Machine" or "User".
sender_name (Text): The name of the message sender.
text (Text): The content of the message.
session_id (Text): The session ID associated with the message.
sender (str): The type of the message sender. Typically "ai" or "human".
sender_name (str): The name of the message sender.
text (str): The content of the message.
session_id (str): The session ID associated with the message.
metadata (dict | None, optional): Additional metadata for the message. Defaults to None.
**kwargs: Additional keyword arguments.
**kwargs: Additional keyword arguments, including:
memory (AstraDBChatMessageHistory | None): The memory instance to add the message to.
Raises:
ValueError: If the AstraDBChatMessageHistory instance is not provided.
"""
try:
from langchain_astradb.chat_message_histories import AstraDBChatMessageHistory
except ImportError:
raise ImportError(
"Could not import langchain Astra DB integration package. "
"Please install it with `pip install langchain-astradb`."
)
memory: AstraDBChatMessageHistory | None = kwargs.pop("memory", None)
if memory is None:
raise ValueError("AstraDBChatMessageHistory instance is required.")
@ -89,14 +97,14 @@ class AstraDBMessageWriterComponent(BaseMemoryComponent):
def build(
self,
input_value: Record,
session_id: Text,
session_id: str,
collection_name: str,
token: str,
api_endpoint: str,
namespace: Optional[str] = None,
) -> Record:
try:
pass
from langchain_astradb.chat_message_histories import AstraDBChatMessageHistory
except ImportError:
raise ImportError(
"Could not import langchain Astra DB integration package. "

View file

@ -0,0 +1,86 @@
from typing import Optional, cast
from langchain_community.chat_message_histories import CassandraChatMessageHistory
from langflow.base.memory.memory import BaseMemoryComponent
from langflow.schema.schema import Record
class CassandraMessageReaderComponent(BaseMemoryComponent):
display_name = "Cassandra Message Reader"
description = "Retrieves stored chat messages from a Cassandra table on Astra DB."
def build_config(self):
return {
"session_id": {
"display_name": "Session ID",
"info": "Session ID of the chat history.",
"input_types": ["Text"],
},
"database_id": {
"display_name": "Database ID",
"info": "The Astra database ID.",
},
"table_name": {
"display_name": "Table Name",
"info": "The name of the table where messages are stored.",
},
"token": {
"display_name": "Token",
"info": "Authentication token for accessing Cassandra on Astra DB.",
"password": True,
},
"keyspace": {
"display_name": "Keyspace",
"info": "Optional key space within Astra DB. The keyspace should already be created.",
"input_types": ["Text"],
"advanced": True,
},
}
def get_messages(self, **kwargs) -> list[Record]:
"""
Retrieves messages from the CassandraChatMessageHistory memory.
Args:
memory (CassandraChatMessageHistory): The CassandraChatMessageHistory instance to retrieve messages from.
Returns:
list[Record]: A list of Record objects representing the search results.
"""
memory: CassandraChatMessageHistory = cast(CassandraChatMessageHistory, kwargs.get("memory"))
if not memory:
raise ValueError("CassandraChatMessageHistory instance is required.")
# Get messages from the memory
messages = memory.messages
results = [Record.from_lc_message(message) for message in messages]
return list(results)
def build(
self,
session_id: str,
table_name: str,
token: str,
database_id: str,
keyspace: Optional[str] = None,
) -> list[Record]:
try:
import cassio
except ImportError:
raise ImportError(
"Could not import cassio integration package. " "Please install it with `pip install cassio`."
)
cassio.init(token=token, database_id=database_id)
memory = CassandraChatMessageHistory(
session_id=session_id,
table_name=table_name,
keyspace=keyspace,
)
records = self.get_messages(memory=memory)
self.status = records
return records

View file

@ -0,0 +1,122 @@
from typing import Optional
from langflow.base.memory.memory import BaseMemoryComponent
from langflow.schema.schema import Record
from langchain_core.messages import BaseMessage
from langchain_community.chat_message_histories import CassandraChatMessageHistory
class CassandraMessageWriterComponent(BaseMemoryComponent):
display_name = "Cassandra Message Writer"
description = "Writes a message to a Cassandra table on Astra DB."
def build_config(self):
return {
"input_value": {
"display_name": "Input Record",
"info": "Record to write to Cassandra.",
},
"session_id": {
"display_name": "Session ID",
"info": "Session ID of the chat history.",
"input_types": ["Text"],
},
"database_id": {
"display_name": "Database ID",
"info": "The Astra database ID.",
},
"table_name": {
"display_name": "Table Name",
"info": "The name of the table where messages will be stored.",
},
"token": {
"display_name": "Token",
"info": "Authentication token for accessing Cassandra on Astra DB.",
"password": True,
},
"keyspace": {
"display_name": "Keyspace",
"info": "Optional key space within Astra DB. The keyspace should already be created.",
"input_types": ["Text"],
"advanced": True,
},
"ttl_seconds": {
"display_name": "TTL Seconds",
"info": "Optional time-to-live for the messages.",
"input_types": ["Number"],
"advanced": True,
},
}
def add_message(
self,
sender: str,
sender_name: str,
text: str,
session_id: str,
metadata: Optional[dict] = None,
**kwargs,
):
"""
Adds a message to the CassandraChatMessageHistory memory.
Args:
sender (str): The type of the message sender. Typically "ai" or "human".
sender_name (str): The name of the message sender.
text (str): The content of the message.
session_id (str): The session ID associated with the message.
metadata (dict | None, optional): Additional metadata for the message. Defaults to None.
**kwargs: Additional keyword arguments, including:
memory (CassandraChatMessageHistory | None): The memory instance to add the message to.
Raises:
ValueError: If the CassandraChatMessageHistory instance is not provided.
"""
memory: CassandraChatMessageHistory | None = kwargs.pop("memory", None)
if memory is None:
raise ValueError("CassandraChatMessageHistory instance is required.")
text_list = [
BaseMessage(
content=text,
sender=sender,
sender_name=sender_name,
metadata=metadata,
session_id=session_id,
)
]
memory.add_messages(text_list)
def build(
self,
input_value: Record,
session_id: str,
table_name: str,
token: str,
database_id: str,
keyspace: Optional[str] = None,
ttl_seconds: Optional[int] = None,
) -> Record:
try:
import cassio
except ImportError:
raise ImportError(
"Could not import cassio integration package. " "Please install it with `pip install cassio`."
)
cassio.init(token=token, database_id=database_id)
memory = CassandraChatMessageHistory(
session_id=session_id,
table_name=table_name,
keyspace=keyspace,
ttl_seconds=ttl_seconds,
)
self.add_message(**input_value.data, memory=memory)
self.status = f"Added message to Cassandra memory for session {session_id}"
return input_value

View file

@ -0,0 +1,94 @@
from typing import Any, List, Optional, Tuple
from langflow.components.vectorstores.Cassandra import CassandraVectorStoreComponent
from langflow.components.vectorstores.base.model import LCVectorStoreComponent
from langflow.field_typing import Embeddings, Text
from langflow.schema import Record
from langchain_community.utilities.cassandra import SetupMode
class CassandraSearchComponent(LCVectorStoreComponent):
display_name = "Cassandra Search"
description = "Searches an existing Cassandra Vector Store."
icon = "Cassandra"
field_order = ["token", "database_id", "table_name", "input_value", "embedding"]
def build_config(self):
return {
"search_type": {
"display_name": "Search Type",
"options": ["Similarity", "MMR"],
},
"input_value": {
"display_name": "Input Value",
"info": "Input value to search",
},
"embedding": {"display_name": "Embedding", "info": "Embedding to use"},
"token": {
"display_name": "Token",
"info": "Authentication token for accessing Cassandra on Astra DB.",
"password": True,
},
"database_id": {
"display_name": "Database ID",
"info": "The Astra database ID.",
},
"table_name": {
"display_name": "Table Name",
"info": "The name of the table where vectors will be stored.",
},
"keyspace": {
"display_name": "Keyspace",
"info": "Optional key space within Astra DB. The keyspace should already be created.",
"advanced": True,
},
"body_index_options": {
"display_name": "Body Index Options",
"info": "Optional options used to create the body index.",
"advanced": True,
},
"setup_mode": {
"display_name": "Setup Mode",
"info": "Configuration mode for setting up the Cassandra table, with options like 'Sync', 'Async', or 'Off'.",
"options": ["Sync", "Async", "Off"],
"advanced": True,
},
"number_of_results": {
"display_name": "Number of Results",
"info": "Number of results to return.",
"advanced": True,
},
}
def build(
self,
embedding: Embeddings,
table_name: str,
input_value: Text,
token: str,
database_id: str,
search_type: str = "similarity",
number_of_results: int = 4,
keyspace: Optional[str] = None,
body_index_options: Optional[List[Tuple[str, Any]]] = None,
setup_mode: SetupMode = SetupMode.SYNC,
) -> List[Record]:
vector_store = CassandraVectorStoreComponent().build(
embedding=embedding,
table_name=table_name,
token=token,
database_id=database_id,
keyspace=keyspace,
body_index_options=body_index_options,
setup_mode=setup_mode,
)
try:
return self.search_with_vector_store(input_value, search_type, vector_store, k=number_of_results)
except KeyError as e:
if "content" in str(e):
raise ValueError(
"You should ingest data through Langflow (or LangChain) to query it in Langflow. Your collection does not contain a field name 'content'."
)
else:
raise e

View file

@ -1,6 +1,4 @@
from typing import List, Optional, Union
from langchain_astradb import AstraDBVectorStore
from langchain_astradb.utils.astradb import SetupMode
from langflow.custom import CustomComponent
from langflow.field_typing import Embeddings, VectorStore
@ -111,6 +109,15 @@ class AstraDBVectorStoreComponent(CustomComponent):
metadata_indexing_exclude: Optional[List[str]] = None,
collection_indexing_policy: Optional[dict] = None,
) -> Union[VectorStore, BaseRetriever]:
try:
from langchain_astradb import AstraDBVectorStore
from langchain_astradb.utils.astradb import SetupMode
except ImportError:
raise ImportError(
"Could not import langchain Astra DB integration package. "
"Please install it with `pip install langchain-astradb`."
)
try:
setup_mode_value = SetupMode[setup_mode.upper()]
except KeyError:

View file

@ -0,0 +1,110 @@
from typing import Any, List, Optional, Tuple
from langchain_community.vectorstores import Cassandra
from langchain_community.utilities.cassandra import SetupMode
from langflow.custom import CustomComponent
from langflow.field_typing import Embeddings, VectorStore
from langflow.schema import Record
class CassandraVectorStoreComponent(CustomComponent):
display_name = "Cassandra"
description = "Builds or loads a Cassandra Vector Store."
icon = "Cassandra"
field_order = ["token", "database_id", "table_name", "inputs", "embedding"]
def build_config(self):
return {
"inputs": {
"display_name": "Inputs",
"info": "Optional list of records to be processed and stored in the vector store.",
},
"embedding": {"display_name": "Embedding", "info": "Embedding to use"},
"token": {
"display_name": "Token",
"info": "Authentication token for accessing Cassandra on Astra DB.",
"password": True,
},
"database_id": {
"display_name": "Database ID",
"info": "The Astra database ID.",
},
"table_name": {
"display_name": "Table Name",
"info": "The name of the table where vectors will be stored.",
},
"keyspace": {
"display_name": "Keyspace",
"info": "Optional key space within Astra DB. The keyspace should already be created.",
"advanced": True,
},
"ttl_seconds": {
"display_name": "TTL Seconds",
"info": "Optional time-to-live for the added texts.",
"advanced": True,
},
"batch_size": {
"display_name": "Batch Size",
"info": "Optional number of records to process in a single batch.",
"advanced": True,
},
"body_index_options": {
"display_name": "Body Index Options",
"info": "Optional options used to create the body index.",
"advanced": True,
},
"setup_mode": {
"display_name": "Setup Mode",
"info": "Configuration mode for setting up the Cassandra table, with options like 'Sync', 'Async', or 'Off'.",
"options": ["Sync", "Async", "Off"],
"advanced": True,
},
}
def build(
self,
embedding: Embeddings,
token: str,
database_id: str,
inputs: Optional[List[Record]] = None,
keyspace: Optional[str] = None,
table_name: str = "",
ttl_seconds: Optional[int] = None,
batch_size: int = 16,
body_index_options: Optional[List[Tuple[str, Any]]] = None,
setup_mode: SetupMode = SetupMode.SYNC,
) -> VectorStore:
try:
import cassio
except ImportError:
raise ImportError(
"Could not import cassio integration package. " "Please install it with `pip install cassio`."
)
cassio.init(
database_id=database_id,
token=token,
)
if inputs:
documents = [_input.to_lc_document() for _input in inputs]
table = Cassandra.from_documents(
documents=documents,
embedding=embedding,
table_name=table_name,
keyspace=keyspace,
ttl_seconds=ttl_seconds,
batch_size=batch_size,
body_index_options=body_index_options,
)
else:
table = Cassandra(
embedding=embedding,
table_name=table_name,
keyspace=keyspace,
ttl_seconds=ttl_seconds,
body_index_options=body_index_options,
setup_mode=setup_mode,
)
return table