Update function signature, refactor code, add new langflow helper functions and files, and add version module (#1570)

* Update function signature and import statements

* Refactor code and fix bugs

* Add new langflow helper functions and remove base model component

* Add new files and modify existing files

* Add version module and update imports

* Update packages include path in pyproject.toml

* Update Poetry version to 1.8.2
This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-03-26 23:18:44 -03:00 committed by GitHub
commit eaf2479c87
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
44 changed files with 121 additions and 613 deletions

View file

@ -14,7 +14,7 @@ on:
- "src/backend/**"
env:
POETRY_VERSION: "1.7.0"
POETRY_VERSION: "1.8.2"
jobs:
lint:
@ -22,7 +22,6 @@ jobs:
strategy:
matrix:
python-version:
- "3.9"
- "3.10"
- "3.11"
steps:

View file

@ -15,7 +15,7 @@ on:
- "src/backend/**"
env:
POETRY_VERSION: "1.5.0"
POETRY_VERSION: "1.8.2"
jobs:
build:

View file

@ -23,7 +23,7 @@ ENV PYTHONUNBUFFERED=1 \
\
# poetry
# https://python-poetry.org/docs/configuration/#using-environment-variables
POETRY_VERSION=1.5.1 \
POETRY_VERSION=1.8.2 \
# make poetry install to this location
POETRY_HOME="/opt/poetry" \
# make poetry create the virtual environment in the project's root

View file

@ -9,10 +9,7 @@ from typing import Optional
import httpx
import typer
from dotenv import load_dotenv
from multiprocess import (
Process, # type: ignore
cpu_count, # type: ignore
)
from multiprocess import Process, cpu_count # type: ignore
from rich import box
from rich import print as rprint
from rich.console import Console

View file

@ -1,199 +0,0 @@
from typing import TYPE_CHECKING, Any, Callable, Coroutine, List, Optional, Tuple, Union
from pydantic.v1 import BaseModel, Field, create_model
from sqlmodel import select
from langflow.schema.schema import INPUT_FIELD_NAME, Record
from langflow.services.database.models.flow.model import Flow
from langflow.services.deps import session_scope
if TYPE_CHECKING:
from langflow.graph.graph.base import Graph
from langflow.graph.vertex.base import Vertex
INPUT_TYPE_MAP = {
"ChatInput": {"type_hint": "Optional[str]", "default": '""'},
"TextInput": {"type_hint": "Optional[str]", "default": '""'},
"JSONInput": {"type_hint": "Optional[dict]", "default": "{}"},
}
def list_flows(*, user_id: Optional[str] = None) -> List[Record]:
if not user_id:
raise ValueError("Session is invalid")
try:
with session_scope() as session:
flows = session.exec(
select(Flow).where(Flow.user_id == user_id).where(Flow.is_component == False) # noqa
).all()
flows_records = [flow.to_record() for flow in flows]
return flows_records
except Exception as e:
raise ValueError(f"Error listing flows: {e}")
async def load_flow(
user_id: str, flow_id: Optional[str] = None, flow_name: Optional[str] = None, tweaks: Optional[dict] = None
) -> "Graph":
from langflow.graph.graph.base import Graph
from langflow.processing.process import process_tweaks
if not flow_id and not flow_name:
raise ValueError("Flow ID or Flow Name is required")
if not flow_id and flow_name:
flow_id = find_flow(flow_name, user_id)
if not flow_id:
raise ValueError(f"Flow {flow_name} not found")
with session_scope() as session:
graph_data = flow.data if (flow := session.get(Flow, flow_id)) else None
if not graph_data:
raise ValueError(f"Flow {flow_id} not found")
if tweaks:
graph_data = process_tweaks(graph_data=graph_data, tweaks=tweaks)
graph = Graph.from_payload(graph_data, flow_id=flow_id)
return graph
def find_flow(flow_name: str, user_id: str) -> Optional[str]:
with session_scope() as session:
flow = session.exec(select(Flow).where(Flow.name == flow_name).where(Flow.user_id == user_id)).first()
return flow.id if flow else None
async def run_flow(
inputs: Union[dict, List[dict]] = None,
tweaks: Optional[dict] = None,
flow_id: Optional[str] = None,
flow_name: Optional[str] = None,
user_id: Optional[str] = None,
) -> Any:
graph = await load_flow(user_id, flow_id, flow_name, tweaks)
if inputs is None:
inputs = []
inputs_list = []
inputs_components = []
types = []
for input_dict in inputs:
inputs_list.append({INPUT_FIELD_NAME: input_dict.get("input_value")})
inputs_components.append(input_dict.get("components", []))
types.append(input_dict.get("type", []))
return await graph.arun(inputs_list, inputs_components=inputs_components, types=types)
def generate_function_for_flow(inputs: List["Vertex"], flow_id: str) -> Coroutine:
"""
Generate a dynamic flow function based on the given inputs and flow ID.
Args:
inputs (List[Vertex]): The list of input vertices for the flow.
flow_id (str): The ID of the flow.
Returns:
Coroutine: The dynamic flow function.
Raises:
None
Example:
inputs = [vertex1, vertex2]
flow_id = "my_flow"
function = generate_function_for_flow(inputs, flow_id)
result = function(input1, input2)
"""
# Prepare function arguments with type hints and default values
args = [
f"{input_.display_name.lower().replace(' ', '_')}: {INPUT_TYPE_MAP[input_.base_name]['type_hint']} = {INPUT_TYPE_MAP[input_.base_name]['default']}"
for input_ in inputs
]
# Maintain original argument names for constructing the tweaks dictionary
original_arg_names = [input_.display_name for input_ in inputs]
# Prepare a Pythonic, valid function argument string
func_args = ", ".join(args)
# Map original argument names to their corresponding Pythonic variable names in the function
arg_mappings = ", ".join(
f'"{original_name}": {name}'
for original_name, name in zip(original_arg_names, [arg.split(":")[0] for arg in args])
)
func_body = f"""
from typing import Optional
async def flow_function({func_args}):
tweaks = {{ {arg_mappings} }}
from langflow.helpers.flow import run_flow
from langchain_core.tools import ToolException
try:
return await run_flow(
tweaks={{key: {{'input_value': value}} for key, value in tweaks.items()}},
flow_id="{flow_id}",
)
except Exception as e:
raise ToolException(f'Error running flow: ' + e)
"""
compiled_func = compile(func_body, "<string>", "exec")
local_scope = {}
exec(compiled_func, globals(), local_scope)
return local_scope["flow_function"]
def build_function_and_schema(flow_record: Record, graph: "Graph") -> Tuple[Callable, BaseModel]:
"""
Builds a dynamic function and schema for a given flow.
Args:
flow_record (Record): The flow record containing information about the flow.
graph (Graph): The graph representing the flow.
Returns:
Tuple[Callable, BaseModel]: A tuple containing the dynamic function and the schema.
"""
flow_id = flow_record.id
inputs = get_flow_inputs(graph)
dynamic_flow_function = generate_function_for_flow(inputs, flow_id)
schema = build_schema_from_inputs(flow_record.name, inputs)
return dynamic_flow_function, schema
def get_flow_inputs(graph: "Graph") -> List["Vertex"]:
"""
Retrieves the flow inputs from the given graph.
Args:
graph (Graph): The graph object representing the flow.
Returns:
List[Record]: A list of input records, where each record contains the ID, name, and description of the input vertex.
"""
inputs = []
for vertex in graph.vertices:
if vertex.is_input:
inputs.append(vertex)
return inputs
def build_schema_from_inputs(name: str, inputs: List[tuple[str, str, str]]) -> BaseModel:
"""
Builds a schema from the given inputs.
Args:
name (str): The name of the schema.
inputs (List[tuple[str, str, str]]): A list of tuples representing the inputs.
Each tuple contains three elements: the input name, the input type, and the input description.
Returns:
BaseModel: The schema model.
"""
fields = {}
for input_ in inputs:
field_name = input_.display_name.lower().replace(" ", "_")
description = input_.description
fields[field_name] = (str, Field(default="", description=description))
return create_model(name, **fields)

View file

@ -1,34 +0,0 @@
from langchain_core.documents import Document
from langflow.schema import Record
def docs_to_records(documents: list[Document]) -> list[Record]:
"""
Converts a list of Documents to a list of Records.
Args:
documents (list[Document]): The list of Documents to convert.
Returns:
list[Record]: The converted list of Records.
"""
return [Record.from_document(document) for document in documents]
def records_to_text(template: str, records: list[Record]) -> str:
"""
Converts a list of Records to a list of texts.
Args:
records (list[Record]): The list of Records to convert.
Returns:
list[str]: The converted list of texts.
"""
if isinstance(records, Record):
records = [records]
# Check if there are any format strings in the template
formated_records = [template.format(data=record.data, **record.data) for record in records]
return "\n".join(formated_records)

View file

@ -164,7 +164,7 @@ def get_is_component_from_data(data: dict):
async def check_langflow_version(component: StoreComponentCreate):
from langflow import __version__ as current_version
from langflow.version.version import __version__ as current_version # type: ignore
if not component.last_tested_version:
component.last_tested_version = current_version

View file

@ -239,7 +239,7 @@ async def create_upload_file(
# get endpoint to return version of langflow
@router.get("/version")
def get_version():
from langflow.version import __version__
from langflow.version import __version__ # type: ignore
return {"version": __version__}

View file

@ -1,10 +1,10 @@
from typing import List, Union
from typing import List, Optional, Union, cast
from langchain.agents import AgentExecutor, BaseMultiActionAgent, BaseSingleActionAgent
from langchain_core.runnables import Runnable
from langflow.custom import CustomComponent
from langflow.field_typing import BaseMemory, Text, Tool
from langflow.interface.custom.custom_component import CustomComponent
class LCAgentComponent(CustomComponent):
@ -44,7 +44,7 @@ class LCAgentComponent(CustomComponent):
inputs: str,
input_variables: list[str],
tools: List[Tool],
memory: BaseMemory = None,
memory: Optional[BaseMemory] = None,
handle_parsing_errors: bool = True,
output_key: str = "output",
) -> Text:
@ -52,7 +52,11 @@ class LCAgentComponent(CustomComponent):
runnable = agent
else:
runnable = AgentExecutor.from_agent_and_tools(
agent=agent, tools=tools, verbose=True, memory=memory, handle_parsing_errors=handle_parsing_errors
agent=agent, # type: ignore
tools=tools,
verbose=True,
memory=memory,
handle_parsing_errors=handle_parsing_errors,
)
input_dict = {"input": inputs}
for var in input_variables:
@ -61,11 +65,11 @@ class LCAgentComponent(CustomComponent):
result = await runnable.ainvoke(input_dict)
self.status = result
if output_key in result:
return result.get(output_key)
return cast(str, result.get(output_key))
elif "output" not in result:
if output_key != "output":
raise ValueError(f"Output key not found in result. Tried '{output_key}' and 'output'.")
else:
raise ValueError("Output key not found in result. Tried 'output'.")
return result.get("output")
return cast(str, result.get("output"))

View file

@ -1,10 +1,10 @@
from typing import Optional
from typing import Optional, Union
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.language_models.llms import LLM
from langchain_core.messages import HumanMessage, SystemMessage
from langflow.interface.custom.custom_component import CustomComponent
from langflow.custom import CustomComponent
class LCModelComponent(CustomComponent):
@ -34,15 +34,15 @@ class LCModelComponent(CustomComponent):
def get_chat_result(
self, runnable: BaseChatModel, stream: bool, input_value: str, system_message: Optional[str] = None
):
messages = []
messages: list[Union[HumanMessage, SystemMessage]] = []
if system_message:
messages.append(SystemMessage(system_message))
messages.append(SystemMessage(content=system_message))
if input_value:
messages.append(HumanMessage(input_value))
messages.append(HumanMessage(content=input_value))
if stream:
result = runnable.stream(messages)
return runnable.stream(messages)
else:
message = runnable.invoke(messages)
result = message.content
self.status = result
return result
return result

View file

@ -1,4 +1,4 @@
from typing import List
from typing import List, Optional
from langchain.agents import create_xml_agent
from langchain_core.prompts import PromptTemplate
@ -69,7 +69,7 @@ class XMLAgentComponent(LCAgentComponent):
llm: BaseLLM,
tools: List[Tool],
prompt: str,
memory: BaseMemory = None,
memory: Optional[BaseMemory] = None,
tool_template: str = "{name}: {description}",
handle_parsing_errors: bool = True,
) -> Text:

View file

@ -22,7 +22,7 @@ class CohereEmbeddingsComponent(CustomComponent):
self,
request_timeout: Optional[float] = None,
cohere_api_key: str = "",
max_retries: Optional[int] = None,
max_retries: int = 3,
model: str = "embed-english-v2.0",
truncate: Optional[str] = None,
user_agent: str = "langchain",

View file

@ -1,7 +1,6 @@
from typing import Any, Callable, Dict, List, Optional, Union
from langchain_openai.embeddings.base import OpenAIEmbeddings
from pydantic.v1.types import SecretStr
from langflow.field_typing import NestedDict
from langflow.interface.custom.custom_component import CustomComponent
@ -100,8 +99,6 @@ class OpenAIEmbeddingsComponent(CustomComponent):
if disallowed_special == ["all"]:
disallowed_special = "all" # type: ignore
api_key = SecretStr(openai_api_key) if openai_api_key else None
return OpenAIEmbeddings(
tiktoken_enabled=tiktoken_enable,
default_headers=default_headers,
@ -116,7 +113,7 @@ class OpenAIEmbeddingsComponent(CustomComponent):
model=model,
model_kwargs=model_kwargs,
base_url=openai_api_base,
api_key=api_key,
api_key=openai_api_key,
openai_api_type=openai_api_type,
api_version=openai_api_version,
organization=openai_organization,

View file

@ -1,4 +1,4 @@
from typing import Any, List, Optional, Text
from typing import Any, List, Optional
from langchain_core.tools import StructuredTool
from loguru import logger
@ -8,6 +8,7 @@ from langflow.field_typing import Tool
from langflow.graph.graph.base import Graph
from langflow.helpers.flow import build_function_and_schema
from langflow.schema.dotdict import dotdict
from langflow.schema.schema import Record
class FlowToolComponent(CustomComponent):
@ -19,7 +20,7 @@ class FlowToolComponent(CustomComponent):
flow_records = self.list_flows()
return [flow_record.data["name"] for flow_record in flow_records]
def get_flow(self, flow_name: str) -> Optional[Text]:
def get_flow(self, flow_name: str) -> Optional[Record]:
"""
Retrieves a flow by its name.
@ -82,4 +83,4 @@ class FlowToolComponent(CustomComponent):
description_repr = repr(tool.description).strip("'")
args_str = "\n".join([f"- {arg_name}: {arg_data['description']}" for arg_name, arg_data in tool.args.items()])
self.status = f"{description_repr}\nArguments:\n{args_str}"
return tool
return tool # type: ignore

View file

@ -1,10 +1,12 @@
from typing import Any, List, Optional, Text, Tuple
from typing import Any, List, Optional
from langflow.helpers.flow import get_flow_inputs
from loguru import logger
from langflow.custom import CustomComponent
from langflow.graph.graph.base import Graph
from langflow.graph.schema import ResultData, RunOutputs
from langflow.graph.vertex.base import Vertex
from langflow.schema import Record
from langflow.schema.dotdict import dotdict
from langflow.template.field.base import TemplateField
@ -20,7 +22,7 @@ class SubFlowComponent(CustomComponent):
flow_records = self.list_flows()
return [flow_record.data["name"] for flow_record in flow_records]
def get_flow(self, flow_name: str) -> Optional[Text]:
def get_flow(self, flow_name: str) -> Optional[Record]:
flow_records = self.list_flows()
for flow_record in flow_records:
if flow_record.data["name"] == flow_name:
@ -42,7 +44,7 @@ class SubFlowComponent(CustomComponent):
raise ValueError(f"Flow {field_value} not found.")
graph = Graph.from_payload(flow_record.data["data"])
# Get all inputs from the graph
inputs = self.get_flow_inputs(graph)
inputs = get_flow_inputs(graph)
# Add inputs to the build config
build_config = self.add_inputs_to_build_config(inputs, build_config)
except Exception as e:
@ -50,21 +52,13 @@ class SubFlowComponent(CustomComponent):
return build_config
def get_flow_inputs(self, graph: Graph) -> List[Record]:
inputs = []
for vertex in graph.vertices:
if vertex.is_input:
inputs.append((vertex.id, vertex.display_name, vertex.description))
logger.debug(inputs)
return inputs
def add_inputs_to_build_config(self, inputs: List[Tuple], build_config: dotdict):
def add_inputs_to_build_config(self, inputs: List[Vertex], build_config: dotdict):
new_fields: list[TemplateField] = []
for input_id, input_display_name, input_description in inputs:
for vertex in inputs:
field = TemplateField(
display_name=input_display_name,
name=input_id,
info=input_description,
display_name=vertex.display_name,
name=vertex.id,
info=vertex.description,
field_type="str",
default=None,
)
@ -110,12 +104,15 @@ class SubFlowComponent(CustomComponent):
tweaks=tweaks,
flow_name=flow_name,
)
if not run_outputs:
return []
run_output = run_outputs[0]
records = []
for output in run_output.outputs:
if output:
records.extend(self.build_records_from_result_data(output))
if run_output is not None:
for output in run_output.outputs:
if output:
records.extend(self.build_records_from_result_data(output))
self.status = records
logger.debug(records)

View file

@ -2,7 +2,6 @@ from typing import Optional
from langchain.llms.base import BaseLanguageModel
from langchain_openai import AzureChatOpenAI
from pydantic.v1 import SecretStr
from langflow.base.models.model import LCModelComponent
from langflow.field_typing import Text
@ -91,21 +90,20 @@ class AzureChatOpenAIComponent(LCModelComponent):
azure_endpoint: str,
input_value: Text,
azure_deployment: str,
api_key: str,
api_version: str,
api_key: Optional[str] = None,
system_message: Optional[str] = None,
temperature: float = 0.7,
max_tokens: Optional[int] = 1000,
stream: bool = False,
) -> BaseLanguageModel:
secret_api_key = SecretStr(api_key)
try:
output = AzureChatOpenAI(
model=model,
azure_endpoint=azure_endpoint,
azure_deployment=azure_deployment,
api_version=api_version,
api_key=secret_api_key,
api_key=api_key,
temperature=temperature,
max_tokens=max_tokens,
)

View file

@ -1,3 +0,0 @@
from .model import LCModelComponent
__all__ = ["LCModelComponent"]

View file

@ -34,4 +34,4 @@ class SearchApiToolComponent(CustomComponent):
tool = SearchAPIRun(api_wrapper=search_api_wrapper)
self.status = tool
return tool
return tool # type: ignore

View file

@ -1,6 +1,7 @@
from typing import List, Optional
from langchain_astradb import AstraDBVectorStore
from langchain_astradb.utils.astradb import SetupMode
from langflow.custom import CustomComponent
from langflow.field_typing import Embeddings, VectorStore
@ -83,6 +84,10 @@ class AstraDBVectorStoreComponent(CustomComponent):
metadata_indexing_exclude: Optional[List[str]] = None,
collection_indexing_policy: Optional[dict] = None,
) -> VectorStore:
try:
setup_mode_value = SetupMode[setup_mode.upper()]
except KeyError:
raise ValueError(f"Invalid setup mode: {setup_mode}")
if inputs:
documents = [_input.to_lc_document() for _input in inputs]
@ -98,7 +103,7 @@ class AstraDBVectorStoreComponent(CustomComponent):
bulk_insert_batch_concurrency=bulk_insert_batch_concurrency,
bulk_insert_overwrite_concurrency=bulk_insert_overwrite_concurrency,
bulk_delete_concurrency=bulk_delete_concurrency,
setup_mode=setup_mode,
setup_mode=setup_mode_value,
pre_delete_collection=pre_delete_collection,
metadata_indexing_include=metadata_indexing_include,
metadata_indexing_exclude=metadata_indexing_exclude,
@ -116,7 +121,7 @@ class AstraDBVectorStoreComponent(CustomComponent):
bulk_insert_batch_concurrency=bulk_insert_batch_concurrency,
bulk_insert_overwrite_concurrency=bulk_insert_overwrite_concurrency,
bulk_delete_concurrency=bulk_delete_concurrency,
setup_mode=setup_mode,
setup_mode=setup_mode_value,
pre_delete_collection=pre_delete_collection,
metadata_indexing_include=metadata_indexing_include,
metadata_indexing_exclude=metadata_indexing_exclude,

View file

@ -6,7 +6,7 @@ from langflow.field_typing import Embeddings, Text
from langflow.schema import Record
class AstraDBSearchComponent(AstraDBVectorStoreComponent, LCVectorStoreComponent):
class AstraDBSearchComponent(LCVectorStoreComponent):
display_name = "AstraDB Search"
description = "Searches an existing AstraDB Vector Store"
@ -74,7 +74,7 @@ class AstraDBSearchComponent(AstraDBVectorStoreComponent, LCVectorStoreComponent
self,
embedding: Embeddings,
collection_name: str,
input_value: Optional[Text] = None,
input_value: Text,
search_type: str = "Similarity",
token: Optional[str] = None,
api_endpoint: Optional[str] = None,
@ -90,7 +90,7 @@ class AstraDBSearchComponent(AstraDBVectorStoreComponent, LCVectorStoreComponent
metadata_indexing_exclude: Optional[List[str]] = None,
collection_indexing_policy: Optional[dict] = None,
) -> List[Record]:
vector_store = super().build(
vector_store = AstraDBVectorStoreComponent().build(
embedding=embedding,
collection_name=collection_name,
token=token,

View file

@ -6,7 +6,7 @@ from langflow.field_typing import Embeddings, NestedDict, Text
from langflow.schema import Record
class MongoDBAtlasSearchComponent(MongoDBAtlasComponent, LCVectorStoreComponent):
class MongoDBAtlasSearchComponent(LCVectorStoreComponent):
display_name = "MongoDB Atlas Search"
description = "Search a MongoDB Atlas Vector Store for similar documents."
@ -37,9 +37,10 @@ class MongoDBAtlasSearchComponent(MongoDBAtlasComponent, LCVectorStoreComponent)
search_kwargs: Optional[NestedDict] = None,
) -> List[Record]:
search_kwargs = search_kwargs or {}
vector_store = super().build(
connection_string=mongodb_atlas_cluster_uri,
namespace=f"{db_name}.{collection_name}",
vector_store = MongoDBAtlasComponent().build(
mongodb_atlas_cluster_uri=mongodb_atlas_cluster_uri,
collection_name=collection_name,
db_name=db_name,
embedding=embedding,
index_name=index_name,
)

View file

@ -1,7 +1,7 @@
import asyncio
from collections import defaultdict, deque
from itertools import chain
from typing import TYPE_CHECKING, Coroutine, Dict, Generator, List, Optional, Type, Union
from typing import TYPE_CHECKING, Callable, Coroutine, Dict, Generator, List, Literal, Optional, Type, Union
from loguru import logger
@ -201,7 +201,7 @@ class Graph:
self,
inputs: Dict[str, str],
input_components: list[str],
input_type: str,
input_type: Literal["chat", "text", "json", "any"] | None,
outputs: list[str],
stream: bool,
session_id: str,
@ -236,7 +236,7 @@ class Graph:
continue
# If the input_type is not any and the input_type is not in the vertex id
# Example: input_type = "chat" and vertex.id = "OpenAI-19ddn"
elif input_type != "any" and input_type not in vertex.id.lower():
elif input_type is not None and input_type != "any" and input_type not in vertex.id.lower():
continue
if vertex is None:
raise ValueError(f"Vertex {vertex_id} not found")
@ -269,9 +269,9 @@ class Graph:
def run(
self,
inputs: Dict[str, str],
input_components: Optional[list[str]] = None,
types: Optional[list[str]] = None,
inputs: list[Dict[str, str]],
input_components: Optional[list[list[str]]] = None,
types: Optional[list[Literal["chat", "text", "json", "any"] | None]] = None,
outputs: Optional[list[str]] = None,
session_id: Optional[str] = None,
stream: bool = False,
@ -309,7 +309,7 @@ class Graph:
self,
inputs: list[Dict[str, str]],
inputs_components: Optional[list[list[str]]] = None,
types: Optional[list[str]] = None,
types: Optional[list[Literal["chat", "text", "json", "any"] | None]] = None,
outputs: Optional[list[str]] = None,
session_id: Optional[str] = None,
stream: bool = False,
@ -338,8 +338,12 @@ class Graph:
inputs = [{}]
# Length of all should be the as inputs length
# just add empty lists to complete the length
if inputs_components is None:
inputs_components = []
for _ in range(len(inputs) - len(inputs_components)):
inputs_components.append([])
if types is None:
types = []
for _ in range(len(inputs) - len(types)):
types.append("any")
for run_inputs, components, input_type in zip(inputs, inputs_components, types):
@ -650,7 +654,7 @@ class Graph:
async def build_vertex(
self,
lock: asyncio.Lock,
set_cache_coro: Coroutine,
set_cache_coro: Callable[["Graph", asyncio.Lock], Coroutine],
vertex_id: str,
inputs_dict: Optional[Dict[str, str]] = None,
user_id: Optional[str] = None,
@ -693,7 +697,9 @@ class Graph:
logger.exception(f"Error building vertex: {exc}")
raise exc
async def get_next_and_top_level_vertices(self, lock: asyncio.Lock, set_cache_coro: Coroutine, vertex: Vertex):
async def get_next_and_top_level_vertices(
self, lock: asyncio.Lock, set_cache_coro: Callable[["Graph", asyncio.Lock], Coroutine], vertex: Vertex
):
"""
Retrieves the next runnable vertices and the top level vertices for a given vertex.

View file

@ -1,6 +1,6 @@
import asyncio
from collections import defaultdict
from typing import TYPE_CHECKING, Coroutine, List
from typing import TYPE_CHECKING, Awaitable, Callable, List
if TYPE_CHECKING:
from langflow.graph.graph.base import Graph
@ -55,7 +55,7 @@ class RunnableVerticesManager:
async def get_next_runnable_vertices(
self,
lock: asyncio.Lock,
set_cache_coro: Coroutine,
set_cache_coro: Callable[["Graph", asyncio.Lock], Awaitable[None]],
graph: "Graph",
vertex: "Vertex",
):
@ -85,7 +85,7 @@ class RunnableVerticesManager:
for v_id in set(next_runnable_vertices): # Use set to avoid duplicates
self.update_vertex_run_state(v_id, is_runnable=False)
self.remove_from_predecessors(v_id)
await set_cache_coro(data=graph, lock=lock)
await set_cache_coro(graph, lock)
return next_runnable_vertices
@staticmethod

View file

@ -1,3 +0,0 @@
from .record import docs_to_records, records_to_text
__all__ = ["docs_to_records", "records_to_text"]

View file

@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Any, Callable, Coroutine, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Optional, Tuple, Type, Union, cast
from pydantic.v1 import BaseModel, Field, create_model
from sqlmodel import select
@ -63,28 +63,30 @@ def find_flow(flow_name: str, user_id: str) -> Optional[str]:
async def run_flow(
inputs: Union[dict, List[dict]] = None,
inputs: Optional[Union[dict, List[dict]]] = None,
tweaks: Optional[dict] = None,
flow_id: Optional[str] = None,
flow_name: Optional[str] = None,
user_id: Optional[str] = None,
) -> Any:
if not user_id:
raise ValueError("Session is invalid")
graph = await load_flow(user_id, flow_id, flow_name, tweaks)
if inputs is None:
inputs = []
inputs_list = []
inputs_list: list[dict[str, str]] = []
inputs_components = []
types = []
for input_dict in inputs:
inputs_list.append({INPUT_FIELD_NAME: input_dict.get("input_value")})
inputs_list.append({INPUT_FIELD_NAME: cast(str, input_dict.get("input_value", ""))})
inputs_components.append(input_dict.get("components", []))
types.append(input_dict.get("type", []))
return await graph.arun(inputs_list, inputs_components=inputs_components, types=types)
def generate_function_for_flow(inputs: List["Vertex"], flow_id: str) -> Coroutine:
def generate_function_for_flow(inputs: List["Vertex"], flow_id: str) -> Callable[..., Awaitable[Any]]:
"""
Generate a dynamic flow function based on the given inputs and flow ID.
@ -138,12 +140,14 @@ async def flow_function({func_args}):
"""
compiled_func = compile(func_body, "<string>", "exec")
local_scope = {}
local_scope: dict = {}
exec(compiled_func, globals(), local_scope)
return local_scope["flow_function"]
def build_function_and_schema(flow_record: Record, graph: "Graph") -> Tuple[Callable, BaseModel]:
def build_function_and_schema(
flow_record: Record, graph: "Graph"
) -> Tuple[Callable[..., Awaitable[Any]], Type[BaseModel]]:
"""
Builds a dynamic function and schema for a given flow.
@ -178,7 +182,7 @@ def get_flow_inputs(graph: "Graph") -> List["Vertex"]:
return inputs
def build_schema_from_inputs(name: str, inputs: List[tuple[str, str, str]]) -> BaseModel:
def build_schema_from_inputs(name: str, inputs: List["Vertex"]) -> Type[BaseModel]:
"""
Builds a schema from the given inputs.
@ -196,4 +200,4 @@ def build_schema_from_inputs(name: str, inputs: List[tuple[str, str, str]]) -> B
field_name = input_.display_name.lower().replace(" ", "_")
description = input_.description
fields[field_name] = (str, Field(default="", description=description))
return create_model(name, **fields)
return create_model(name, **fields) # type: ignore

View file

@ -423,11 +423,13 @@ class CustomComponent(Component):
return validate.create_function(self.code, self.function_entrypoint_name)
async def load_flow(self, flow_id: str, tweaks: Optional[dict] = None) -> "Graph":
return await load_flow(flow_id, tweaks)
if not self._user_id:
raise ValueError("Session is invalid")
return await load_flow(user_id=self._user_id, flow_id=flow_id, tweaks=tweaks)
async def run_flow(
self,
inputs: Union[dict, List[dict]] = None,
inputs: Optional[Union[dict, List[dict]]] = None,
flow_id: Optional[str] = None,
flow_name: Optional[str] = None,
tweaks: Optional[dict] = None,

View file

@ -38,7 +38,7 @@ async def instantiate_class(
user_id=None,
) -> Any:
"""Instantiate class from module type and key, and params"""
from langflow.legacy_custom.customs import CUSTOM_NODES
from langflow.interface.custom_lists import CUSTOM_NODES
vertex_type = vertex.vertex_type
base_type = vertex.base_type
@ -50,7 +50,9 @@ async def instantiate_class(
if custom_node := CUSTOM_NODES.get(vertex_type):
if hasattr(custom_node, "initialize"):
return custom_node.initialize(**params)
return custom_node(**params)
if callable(custom_node):
return custom_node(**params)
raise ValueError(f"Custom node {vertex_type} is not callable")
logger.debug(f"Instantiating {vertex_type} of type {base_type}")
if not base_type:
raise ValueError("No base type provided for vertex")

View file

@ -5,8 +5,6 @@ from langflow.interface.base import LangChainTypeCreator
from langflow.interface.tools.constants import ALL_TOOLS_NAMES, CUSTOM_TOOLS, FILE_TOOLS, OTHER_TOOLS
from langflow.interface.tools.util import get_tool_params
from langflow.legacy_custom import customs
from langflow.interface.tools.util import get_tool_params
from langflow.legacy_custom import customs
from langflow.services.deps import get_settings_service
from langflow.template.field.base import TemplateField
from langflow.template.template.base import Template

View file

@ -1,7 +1,7 @@
from langflow.template import frontend_node
# These should always be instantiated
CUSTOM_NODES = {
CUSTOM_NODES: dict[str, dict[str, frontend_node.base.FrontendNode]] = {
# "prompts": {
# "ZeroShotPrompt": frontend_node.prompts.ZeroShotPromptNode(),
# },

View file

@ -125,7 +125,7 @@ class Result(BaseModel):
async def run_graph(
graph: Union["Graph", dict],
graph: "Graph",
flow_id: str,
stream: bool,
session_id: Optional[str] = None,

View file

@ -23,7 +23,7 @@ class ServiceFactory:
raise self.service_class(*args, **kwargs)
def hash_factory(factory: ServiceFactory) -> str:
def hash_factory(factory: Type[ServiceFactory]) -> str:
return factory.service_class.__name__
@ -38,7 +38,7 @@ def hash_infer_service_types_args(factory_class: Type[ServiceFactory], available
@cached(cache=LRUCache(maxsize=10), key=hash_infer_service_types_args)
def infer_service_types(factory_class: Type[ServiceFactory], available_services=None) -> "ServiceType":
def infer_service_types(factory_class: Type[ServiceFactory], available_services=None) -> list["ServiceType"]:
create_method = factory_class.create
type_hints = get_type_hints(create_method, globalns=available_services)
service_types = []

View file

@ -1,5 +1,6 @@
import secrets
from pathlib import Path
from typing import Literal
from loguru import logger
from passlib.context import CryptContext
@ -14,7 +15,7 @@ class AuthSettings(BaseSettings):
# Login settings
CONFIG_DIR: str
SECRET_KEY: SecretStr = Field(
default=None,
default=SecretStr(""),
description="Secret key for JWT. If not provided, a random one will be generated.",
frozen=False,
)
@ -33,13 +34,13 @@ class AuthSettings(BaseSettings):
SUPERUSER: str = DEFAULT_SUPERUSER
SUPERUSER_PASSWORD: str = DEFAULT_SUPERUSER_PASSWORD
REFRESH_SAME_SITE: str = "none"
REFRESH_SAME_SITE: Literal["lax", "strict", "none"] = "none"
"""The SameSite attribute of the refresh token cookie."""
REFRESH_SECURE: bool = True
"""The Secure attribute of the refresh token cookie."""
REFRESH_HTTPONLY: bool = True
"""The HttpOnly attribute of the refresh token cookie."""
ACCESS_SAME_SITE: str = "none"
ACCESS_SAME_SITE: Literal["lax", "strict", "none"] = "none"
"""The SameSite attribute of the access token cookie."""
ACCESS_SECURE: bool = True
"""The Secure attribute of the access token cookie."""
@ -85,9 +86,10 @@ class AuthSettings(BaseSettings):
secret_key_path = Path(config_dir) / "secret_key"
if value:
if value and isinstance(value, SecretStr):
logger.debug("Secret key provided")
write_secret_to_file(secret_key_path, value)
secret_value = value.get_secret_value()
write_secret_to_file(secret_key_path, secret_value)
else:
logger.debug("No secret key provided, generating a random one")
@ -103,4 +105,4 @@ class AuthSettings(BaseSettings):
write_secret_to_file(secret_key_path, value)
logger.debug("Saved secret key")
return value
return value if isinstance(value, SecretStr) else SecretStr(value)

View file

@ -10,10 +10,12 @@ from langflow.template.frontend_node import (
textsplitters,
tools,
vectorstores,
base,
)
__all__ = [
"agents",
"base",
"chains",
"embeddings",
"memories",

View file

@ -17,7 +17,7 @@ repository = "https://github.com/logspace-ai/langflow"
license = "MIT"
readme = "README.md"
keywords = ["nlp", "langchain", "openai", "gpt", "gui"]
packages = [{ include = "langflow" }, { include = "py.typed" }]
packages = [{ include = "langflow" }, { include = "langflow/py.typed" }]
include = ["pyproject.toml", "README.md", "langflow/**/*"]
documentation = "https://docs.langflow.org"

View file

@ -1,70 +0,0 @@
from typing import List, Union
from langchain.agents import AgentExecutor, BaseMultiActionAgent, BaseSingleActionAgent
from langflow import CustomComponent
from langflow.field_typing import BaseMemory, Text, Tool
class LCAgentComponent(CustomComponent):
def build_config(self):
return {
"lc": {
"display_name": "LangChain",
"info": "The LangChain to interact with.",
},
"handle_parsing_errors": {
"display_name": "Handle Parsing Errors",
"info": "If True, the agent will handle parsing errors. If False, the agent will raise an error.",
"advanced": True,
},
"output_key": {
"display_name": "Output Key",
"info": "The key to use to get the output from the agent.",
"advanced": True,
},
"memory": {
"display_name": "Memory",
"info": "Memory to use for the agent.",
},
"tools": {
"display_name": "Tools",
"info": "Tools the agent can use.",
},
"input_value": {
"display_name": "Input",
"info": "Input text to pass to the agent.",
},
}
async def run_agent(
self,
agent: Union[BaseSingleActionAgent, BaseMultiActionAgent, AgentExecutor],
inputs: str,
input_variables: list[str],
tools: List[Tool],
memory: BaseMemory = None,
handle_parsing_errors: bool = True,
output_key: str = "output",
) -> Text:
if isinstance(agent, AgentExecutor):
runnable = agent
else:
runnable = AgentExecutor.from_agent_and_tools(
agent=agent, tools=tools, verbose=True, memory=memory, handle_parsing_errors=handle_parsing_errors
)
input_dict = {"input": inputs}
for var in input_variables:
if var not in ["agent_scratchpad", "input"]:
input_dict[var] = ""
result = await runnable.ainvoke(input_dict)
self.status = result
if output_key in result:
return result.get(output_key)
elif "output" not in result:
if output_key != "output":
raise ValueError(f"Output key not found in result. Tried '{output_key}' and 'output'.")
else:
raise ValueError("Output key not found in result. Tried 'output'.")
return result.get("output")

View file

@ -1,3 +0,0 @@
from .model import LCModelComponent
__all__ = ["LCModelComponent"]

View file

@ -1,48 +0,0 @@
from typing import Optional
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.language_models.llms import LLM
from langchain_core.messages import HumanMessage, SystemMessage
from langflow import CustomComponent
class LCModelComponent(CustomComponent):
display_name: str = "Model Name"
description: str = "Model Description"
def get_result(self, runnable: LLM, stream: bool, input_value: str):
"""
Retrieves the result from the output of a Runnable object.
Args:
output (Runnable): The output object to retrieve the result from.
stream (bool): Indicates whether to use streaming or invocation mode.
input_value (str): The input value to pass to the output object.
Returns:
The result obtained from the output object.
"""
if stream:
result = runnable.stream(input_value)
else:
message = runnable.invoke(input_value)
result = message.content if hasattr(message, "content") else message
self.status = result
return result
def get_chat_result(
self, runnable: BaseChatModel, stream: bool, input_value: str, system_message: Optional[str] = None
):
messages = []
if input_value:
messages.append(HumanMessage(input_value))
if system_message:
messages.append(SystemMessage(system_message))
if stream:
result = runnable.stream(messages)
else:
message = runnable.invoke(messages)
result = message.content
self.status = result
return result

View file

@ -1,85 +0,0 @@
from typing import Any, List, Optional, Text
from langchain_core.tools import StructuredTool
from loguru import logger
from langflow import CustomComponent
from langflow.field_typing import Tool
from langflow.graph.graph.base import Graph
from langflow.helpers.flow import build_function_and_schema
from langflow.schema.dotdict import dotdict
class FlowToolComponent(CustomComponent):
display_name = "Flow as Tool"
description = "Construct a Tool from a function that runs the loaded Flow."
field_order = ["flow_name", "name", "description", "return_direct"]
def get_flow_names(self) -> List[str]:
flow_records = self.list_flows()
return [flow_record.data["name"] for flow_record in flow_records]
def get_flow(self, flow_name: str) -> Optional[Text]:
"""
Retrieves a flow by its name.
Args:
flow_name (str): The name of the flow to retrieve.
Returns:
Optional[Text]: The flow record if found, None otherwise.
"""
flow_records = self.list_flows()
for flow_record in flow_records:
if flow_record.data["name"] == flow_name:
return flow_record
return None
def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
logger.debug(f"Updating build config with field value {field_value} and field name {field_name}")
if field_name == "flow_name":
build_config["flow_name"]["options"] = self.get_flow_names()
return build_config
def build_config(self):
return {
"flow_name": {
"display_name": "Flow Name",
"info": "The name of the flow to run.",
"options": [],
"real_time_refresh": True,
"refresh_button": True,
},
"name": {
"display_name": "Name",
"description": "The name of the tool.",
},
"description": {
"display_name": "Description",
"description": "The description of the tool.",
},
"return_direct": {
"display_name": "Return Direct",
"description": "Return the result directly from the Tool.",
"advanced": True,
},
}
async def build(self, flow_name: str, name: str, description: str, return_direct: bool = False) -> Tool:
flow_record = self.get_flow(flow_name)
if not flow_record:
raise ValueError("Flow not found.")
graph = Graph.from_payload(flow_record.data["data"])
dynamic_flow_function, schema = build_function_and_schema(flow_record, graph)
tool = StructuredTool.from_function(
coroutine=dynamic_flow_function,
name=name,
description=description,
return_direct=return_direct,
args_schema=schema,
)
description_repr = repr(tool.description).strip("'")
args_str = "\n".join([f"- {arg_name}: {arg_data['description']}" for arg_name, arg_data in tool.args.items()])
self.status = f"{description_repr}\nArguments:\n{args_str}"
return tool

View file

@ -1,25 +0,0 @@
from langflow.custom import CustomComponent
class SchemaComponent(CustomComponent):
display_name = "Schema"
description = "Construct a Schema from a list of fields."
def build_config(self):
return {
"fields": {
"display_name": "Fields",
"info": "The fields to include in the schema.",
},
"name": {
"display_name": "Name",
"info": "The name of the schema.",
},
}
def build(self, name: str, fields: list[dict]):
# The idea for this component is to use create_model from pydantic to create a schema
# from a list of fields. This will be useful for creating schemas for the flow tool.
pass
# field is a simple list of dictionaries with the field name and

View file

@ -1,37 +0,0 @@
from langchain_community.tools.searchapi import SearchAPIRun
from langchain_community.utilities.searchapi import SearchApiAPIWrapper
from langflow import CustomComponent
from langflow.field_typing import Tool
class SearchApiToolComponent(CustomComponent):
display_name: str = "SearchApi Tool"
description: str = "Real-time search engine results API."
documentation: str = "https://www.searchapi.io/docs/google"
field_config = {
"engine": {
"display_name": "Engine",
"field_type": "str",
"info": "The search engine to use.",
},
"api_key": {
"display_name": "API Key",
"field_type": "str",
"required": True,
"password": True,
"info": "The API key to use SearchApi.",
},
}
def build(
self,
engine: str,
api_key: str,
) -> Tool:
search_api_wrapper = SearchApiAPIWrapper(engine=engine, searchapi_api_key=api_key)
tool = SearchAPIRun(api_wrapper=search_api_wrapper)
self.status = tool
return tool

View file

@ -0,0 +1 @@
from .version import __version__ # noqa: F401

View file

@ -3,6 +3,5 @@ from importlib import metadata
try:
__version__ = metadata.version(__package__)
except metadata.PackageNotFoundError:
# Case where package metadata is not available.
__version__ = ""
del metadata # optional, avoids polluting the results of dir(__package__)
del metadata