Merge branch 'logspace-ai:dev' into fix/vectorstores/pgvector

This commit is contained in:
coolgo0811 2024-01-07 12:14:02 +08:00 committed by GitHub
commit 483cf07d7f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 651 additions and 341 deletions

738
poetry.lock generated

File diff suppressed because it is too large Load diff

View file

@ -104,7 +104,9 @@ qianfan = "0.2.0"
pgvector = "^0.2.3"
pyautogen = "^0.2.0"
langchain-google-genai = "^0.0.2"
elasticsearch = "^8.11.1"
pytube = "^15.0.0"
llama-index = "^0.9.24"
[tool.poetry.group.dev.dependencies]
pytest-asyncio = "^0.23.1"

View file

@ -2,7 +2,6 @@ from typing import Optional
from langchain.llms.base import BaseLLM
from langchain.llms.bedrock import Bedrock
from langflow import CustomComponent
@ -44,7 +43,7 @@ class AmazonBedrockComponent(CustomComponent):
model_kwargs: Optional[dict] = None,
endpoint_url: Optional[str] = None,
streaming: bool = False,
cache: bool | None = None,
cache: Optional[bool] = None,
) -> BaseLLM:
try:
output = Bedrock(

View file

@ -274,6 +274,8 @@ vectorstores:
documentation: "https://python.langchain.com/docs/modules/data_connection/vectorstores/integrations/faiss"
Pinecone:
documentation: "https://python.langchain.com/docs/modules/data_connection/vectorstores/integrations/pinecone"
ElasticsearchStore:
documentation: "https://python.langchain.com/docs/modules/data_connection/vectorstores/integrations/elasticsearch"
SupabaseVectorStore:
documentation: "https://python.langchain.com/docs/modules/data_connection/vectorstores/integrations/supabase"
MongoDBAtlasVectorSearch:

View file

@ -1,6 +1,7 @@
from typing import Any, Callable, Dict, Type
from langchain.vectorstores import (
Pinecone,
ElasticsearchStore,
Qdrant,
Chroma,
FAISS,
@ -226,11 +227,34 @@ def initialize_qdrant(class_object: Type[Qdrant], params: dict):
return class_object.from_documents(**params)
def initialize_elasticsearch(class_object: Type[ElasticsearchStore], params: dict):
"""Initialize elastic and return the class object"""
if "index_name" not in params:
raise ValueError("Elasticsearch Index must be provided in the params")
if "es_url" not in params:
raise ValueError("Elasticsearch URL must be provided in the params")
if not docs_in_params(params):
existing_index_params = {
"embedding": params.pop("embedding"),
}
if "index_name" in params:
existing_index_params["index_name"] = params.pop("index_name")
if "es_url" in params:
existing_index_params["es_url"] = params.pop("es_url")
return class_object.from_existing_index(**existing_index_params)
# If there are docs in the params, create a new index
if "texts" in params:
params["documents"] = params.pop("texts")
return class_object.from_documents(**params)
vecstore_initializer: Dict[str, Callable[[Type[Any], dict], Any]] = {
"Pinecone": initialize_pinecone,
"Chroma": initialize_chroma,
"Qdrant": initialize_qdrant,
"Weaviate": initialize_weaviate,
"ElasticsearchStore": initialize_elasticsearch,
"FAISS": initialize_faiss,
"SupabaseVectorStore": initialize_supabase,
"MongoDBAtlasVectorSearch": initialize_mongodb,

View file

@ -5,14 +5,15 @@ from typing import Any, Dict, List
import orjson
from fastapi import WebSocket, status
from loguru import logger
from starlette.websockets import WebSocketState
from langflow.api.v1.schemas import ChatMessage, ChatResponse, FileResponse
from langflow.interface.utils import pil_to_base64
from langflow.services import ServiceType, service_manager
from langflow.services.base import Service
from langflow.services.chat.cache import Subject
from langflow.services.chat.utils import process_graph
from loguru import logger
from starlette.websockets import WebSocketState
from .cache import cache_service
@ -117,7 +118,7 @@ class ChatService(Service):
if "after sending" in str(exc):
logger.error(f"Error closing connection: {exc}")
async def process_message(self, client_id: str, payload: Dict, langchain_object: Any):
async def process_message(self, client_id: str, payload: Dict, build_result: Any):
# Process the graph data and chat message
chat_inputs = payload.pop("inputs", {})
chatkey = payload.pop("chatKey", None)
@ -134,12 +135,12 @@ class ChatService(Service):
logger.debug("Generating result and thought")
result, intermediate_steps, raw_output = await process_graph(
langchain_object=langchain_object,
build_result=build_result,
chat_inputs=chat_inputs,
client_id=client_id,
session_id=self.connection_ids[client_id],
)
self.set_cache(client_id, langchain_object)
self.set_cache(client_id, build_result)
except Exception as e:
# Log stack trace
logger.exception(e)
@ -205,8 +206,8 @@ class ChatService(Service):
continue
with self.chat_cache.set_client_id(client_id):
if langchain_object := self.cache_service.get(client_id).get("result"):
await self.process_message(client_id, payload, langchain_object)
if build_result := self.cache_service.get(client_id).get("result"):
await self.process_message(client_id, payload, build_result)
else:
raise RuntimeError(f"Could not find a build result for client_id {client_id}")

View file

@ -1,20 +1,28 @@
from typing import Any
from langchain.agents import AgentExecutor
from langchain.chains.base import Chain
from langchain_core.runnables import Runnable
from loguru import logger
from langflow.api.v1.schemas import ChatMessage
from langflow.interface.utils import try_setting_streaming_options
from langflow.processing.base import get_result_and_steps
from langflow.utils.chat import ChatDefinition
LANGCHAIN_RUNNABLES = (Chain, Runnable, AgentExecutor)
async def process_graph(
langchain_object,
build_result,
chat_inputs: ChatMessage,
client_id: str,
session_id: str,
):
langchain_object = try_setting_streaming_options(langchain_object)
build_result = try_setting_streaming_options(build_result)
logger.debug("Loaded langchain object")
if langchain_object is None:
if build_result is None:
# Raise user facing error
raise ValueError("There was an error loading the langchain_object. Please, check all the nodes and try again.")
@ -25,15 +33,36 @@ async def process_graph(
chat_inputs.message = {}
logger.debug("Generating result and thought")
result, intermediate_steps, raw_output = await get_result_and_steps(
langchain_object,
chat_inputs.message,
client_id=client_id,
session_id=session_id,
)
if isinstance(build_result, LANGCHAIN_RUNNABLES):
result, intermediate_steps, raw_output = await get_result_and_steps(
build_result,
chat_inputs.message,
client_id=client_id,
session_id=session_id,
)
elif isinstance(build_result, ChatDefinition):
raw_output = await run_build_result(
build_result,
chat_inputs,
client_id=client_id,
session_id=session_id,
)
if isinstance(raw_output, dict):
if not build_result.output_key:
raise ValueError("No output key provided to ChatDefinition when returning a dict.")
result = raw_output[build_result.output_key]
else:
result = raw_output
intermediate_steps = []
else:
raise TypeError(f"Unknown type {type(build_result)}")
logger.debug("Generated result and intermediate_steps")
return result, intermediate_steps, raw_output
except Exception as e:
# Log stack trace
logger.exception(e)
raise e
async def run_build_result(build_result: Any, chat_inputs: ChatMessage, client_id: str, session_id: str):
return build_result(inputs=chat_inputs.message)

View file

@ -11,6 +11,7 @@ BASIC_FIELDS = [
"persist_directory",
"persist",
"weaviate_url",
"es_url",
"index_name",
"namespace",
"folder_path",
@ -170,6 +171,33 @@ class VectorStoreFrontendNode(FrontendNode):
value="",
)
extra_fields.extend((extra_field, extra_field2))
elif self.template.type_name == "ElasticsearchStore":
# add elastic and elastic credentials
extra_field = TemplateField(
name="es_url",
field_type="str",
required=True,
placeholder="http://localhost:9200",
show=True,
advanced=False,
multiline=False,
value="http://localhost:9200",
display_name="Elasticsearch URL",
)
extra_field2 = TemplateField(
name="index_name",
field_type="str",
required=True,
placeholder="test-index",
show=True,
advanced=False,
multiline=False,
value="test-index",
display_name="Index Name",
)
extra_fields.extend((extra_field, extra_field2))
elif self.template.type_name == "FAISS":
extra_field = TemplateField(
name="folder_path",

View file

@ -0,0 +1,34 @@
from typing import Any, Callable, Optional, Union
from langchain_core.prompts import PromptTemplate as LCPromptTemplate
from langflow.utils.prompt import GenericPromptTemplate
from llama_index.prompts import PromptTemplate as LIPromptTemplate
PromptTemplate = Union[LCPromptTemplate, LIPromptTemplate]
class ChatDefinition:
def __init__(
self,
func: Callable,
inputs: list[str],
output_key: Optional[str] = None,
prompt_template: Optional[PromptTemplate] = None,
):
self.func = func
self.input_keys = inputs
self.output_key = output_key
self.prompt_template = prompt_template
@classmethod
def from_prompt_template(cls, prompt_template: PromptTemplate, func: Callable, output_key: Optional[str] = None):
prompt = GenericPromptTemplate(prompt_template)
return cls(
func=func,
inputs=prompt.input_keys,
output_key=output_key,
prompt_template=prompt_template,
)
def __call__(self, inputs: dict, callbacks: Optional[Any] = None) -> dict:
return self.func(inputs, callbacks)

View file

@ -0,0 +1,58 @@
from typing import Any, Union
from langchain_core.prompts import PromptTemplate as LCPromptTemplate
from llama_index.prompts import PromptTemplate as LIPromptTemplate
PromptTemplateTypes = Union[LCPromptTemplate, LIPromptTemplate]
class GenericPromptTemplate:
def __init__(self, prompt_template: PromptTemplateTypes):
object.__setattr__(self, "prompt_template", prompt_template)
@property
def input_keys(self):
prompt_template = object.__getattribute__(self, "prompt_template")
if isinstance(prompt_template, LCPromptTemplate):
return prompt_template.input_variables
elif isinstance(prompt_template, LIPromptTemplate):
return prompt_template.template_vars
else:
raise TypeError(f"Unknown prompt template type {type(prompt_template)}")
def to_lc_prompt(self):
prompt_template = object.__getattribute__(self, "prompt_template")
if isinstance(prompt_template, LCPromptTemplate):
return prompt_template
elif isinstance(prompt_template, LIPromptTemplate):
return LCPromptTemplate.from_template(prompt_template.get_template())
else:
raise TypeError(f"Unknown prompt template type {type(prompt_template)}")
def to_li_prompt(self):
prompt_template = object.__getattribute__(self, "prompt_template")
if isinstance(prompt_template, LIPromptTemplate):
return prompt_template
elif isinstance(prompt_template, LCPromptTemplate):
return LIPromptTemplate(template=prompt_template.template)
else:
raise TypeError(f"Unknown prompt template type {type(prompt_template)}")
def __or__(self, other):
prompt_template = object.__getattribute__(self, "prompt_template")
if isinstance(prompt_template, LIPromptTemplate):
return self.to_lc_prompt() | other
else:
raise TypeError(f"Unknown prompt template type {type(other)}")
def __getattribute__(self, name: str) -> Any:
if name in {
"input_keys",
"to_lc_prompt",
"to_li_prompt",
"__or__",
"prompt_template",
}:
return object.__getattribute__(self, name)
prompt_template = object.__getattribute__(self, "prompt_template")
return getattr(prompt_template, name)

View file

@ -0,0 +1,19 @@
const SvgElasticsearchLogo = (props) => (
<svg
xmlns="http://www.w3.org/2000/svg"
width="24"
height="24"
fill="none"
stroke="currentColor"
strokeLinecap="round"
strokeLinejoin="round"
strokeWidth="2"
className="icon icon-tabler icon-tabler-brand-elastic"
viewBox="0 0 24 24"
>
<path stroke="none" d="M0 0h24v24H0z"></path>
<path d="M14 2a5 5 0 015 5c0 .712-.232 1.387-.5 2 1.894.042 3.5 1.595 3.5 3.5 0 1.869-1.656 3.4-3.5 3.5.333.625.5 1.125.5 1.5a2.5 2.5 0 01-2.5 2.5c-.787 0-1.542-.432-2-1-.786 1.73-2.476 3-4.5 3a5 5 0 01-4.583-7 3.5 3.5 0 01-.11-6.992h.195a2.5 2.5 0 012-4c.787 0 1.542.432 2 1 .786-1.73 2.476-3 4.5-3zM8.5 9l-3-1"></path>
<path d="M9.5 5l-1 4 1 2 5 2 4-4M18.499 16l-3-.5-1-2.5M14.5 19l1-3.5M5.417 15L9.5 11"></path>
</svg>
);
export default SvgElasticsearchLogo;

View file

@ -0,0 +1,9 @@
<svg xmlns="http://www.w3.org/2000/svg" class="icon icon-tabler icon-tabler-brand-elastic" width="24" height="24" viewBox="0 0 24 24" stroke-width="2" stroke="currentColor" fill="none" stroke-linecap="round" stroke-linejoin="round">
<path stroke="none" d="M0 0h24v24H0z" fill="none" />
<path d="M14 2a5 5 0 0 1 5 5c0 .712 -.232 1.387 -.5 2c1.894 .042 3.5 1.595 3.5 3.5c0 1.869 -1.656 3.4 -3.5 3.5c.333 .625 .5 1.125 .5 1.5a2.5 2.5 0 0 1 -2.5 2.5c-.787 0 -1.542 -.432 -2 -1c-.786 1.73 -2.476 3 -4.5 3a5 5 0 0 1 -4.583 -7a3.5 3.5 0 0 1 -.11 -6.992l.195 0a2.5 2.5 0 0 1 2 -4c.787 0 1.542 .432 2 1c.786 -1.73 2.476 -3 4.5 -3z" />
<path d="M8.5 9l-3 -1" />
<path d="M9.5 5l-1 4l1 2l5 2l4 -4" />
<path d="M18.499 16l-3 -.5l-1 -2.5" />
<path d="M14.5 19l1 -3.5" />
<path d="M5.417 15l4.083 -4" />
</svg>

After

Width:  |  Height:  |  Size: 810 B

View file

@ -0,0 +1,9 @@
import React, { forwardRef } from "react";
import SvgElasticsearchLogo from "./ElasticsearchLogo";
export const ElasticsearchIcon = forwardRef<
SVGSVGElement,
React.PropsWithChildren<{}>
>((props, ref) => {
return <SvgElasticsearchLogo ref={ref} {...props} />;
});

View file

@ -111,6 +111,7 @@ import { AnthropicIcon } from "../icons/Anthropic";
import { BingIcon } from "../icons/Bing";
import { ChromaIcon } from "../icons/ChromaIcon";
import { CohereIcon } from "../icons/Cohere";
import { ElasticsearchIcon } from "../icons/ElasticsearchStore";
import { EvernoteIcon } from "../icons/Evernote";
import { FBIcon } from "../icons/FacebookMessenger";
import { GitBookIcon } from "../icons/GitBook";
@ -256,6 +257,7 @@ export const nodeIconsLucide: iconsType = {
OpenAIEmbeddings: OpenAiIcon,
Pinecone: PineconeIcon,
Qdrant: QDrantIcon,
ElasticsearchStore: ElasticsearchIcon,
Weaviate: WeaviateIcon,
Searx: SearxIcon,
SlackDirectoryLoader: SvgSlackIcon,

View file

@ -578,4 +578,4 @@ def test_async_task_processing_vector_store(client, added_vector_store, created_
# Validate that the task completed successfully and the result is as expected
assert "result" in task_status_json, task_status_json
assert "output" in task_status_json["result"], task_status_json["result"]
assert "Langflow" in task_status_json["result"]["output"], task_status_json["result"]
assert "Langflow" in task_status_json["result"]["output"], task_status_json["result"]