feat: Activate ruff rules UP(pyupgrade) (#3871)

* Activate ruff rules UP(pyupgrade)

* Set ruff target version

* Fix threading.Lock | None error
Due to https://github.com/python/cpython/issues/114315

* Fix Text in custom component supported typed

* Fix mypy issues

---------

Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
This commit is contained in:
Christophe Bornet 2024-09-25 19:38:50 +02:00 committed by GitHub
commit f8a2a7d3b8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
186 changed files with 911 additions and 983 deletions

View file

@ -4,7 +4,6 @@ import sys
import time
import warnings
from pathlib import Path
from typing import Optional
import click
import httpx
@ -84,7 +83,7 @@ def run(
workers: int = typer.Option(1, help="Number of worker processes.", envvar="LANGFLOW_WORKERS"),
timeout: int = typer.Option(300, help="Worker timeout in seconds.", envvar="LANGFLOW_WORKER_TIMEOUT"),
port: int = typer.Option(7860, help="Port to listen on.", envvar="LANGFLOW_PORT"),
components_path: Optional[Path] = typer.Option(
components_path: Path | None = typer.Option(
Path(__file__).parent / "components",
help="Path to the directory containing custom components.",
envvar="LANGFLOW_COMPONENTS_PATH",
@ -93,7 +92,7 @@ def run(
env_file: Path = typer.Option(None, help="Path to the .env file containing environment variables."),
log_level: str = typer.Option("critical", help="Logging level.", envvar="LANGFLOW_LOG_LEVEL"),
log_file: Path = typer.Option("logs/langflow.log", help="Path to the log file.", envvar="LANGFLOW_LOG_FILE"),
cache: Optional[str] = typer.Option(
cache: str | None = typer.Option(
envvar="LANGFLOW_LANGCHAIN_CACHE",
help="Type of cache to use. (InMemoryCache, SQLiteCache)",
default=None,
@ -161,7 +160,7 @@ def run(
health_check_max_retries=health_check_max_retries,
)
# create path object if path is provided
static_files_dir: Optional[Path] = Path(path) if path else None
static_files_dir: Path | None = Path(path) if path else None
settings_service = get_settings_service()
settings_service.set("backend_only", backend_only)
app = setup_app(static_files_dir=static_files_dir, backend_only=backend_only)

View file

@ -1,3 +1,5 @@
from __future__ import annotations
import uuid
import warnings
from typing import TYPE_CHECKING, Any
@ -69,7 +71,7 @@ def build_input_keys_response(langchain_object, artifacts):
return input_keys_response
def validate_is_component(flows: list["Flow"]):
def validate_is_component(flows: list[Flow]):
for flow in flows:
if not flow.data or flow.is_component is not None:
continue
@ -152,7 +154,7 @@ async def build_graph_from_db_no_cache(flow_id: str, session: Session):
return await build_graph_from_data(flow_id, flow.data, flow_name=flow.name, user_id=str(flow.user_id))
async def build_graph_from_db(flow_id: str, session: Session, chat_service: "ChatService"):
async def build_graph_from_db(flow_id: str, session: Session, chat_service: ChatService):
graph = await build_graph_from_db_no_cache(flow_id, session)
await chat_service.set_cache(flow_id, graph)
return graph
@ -160,7 +162,7 @@ async def build_graph_from_db(flow_id: str, session: Session, chat_service: "Cha
async def build_and_cache_graph_from_data(
flow_id: str,
chat_service: "ChatService",
chat_service: ChatService,
graph_data: dict,
): # -> Graph | Any:
"""Build and cache the graph."""

View file

@ -1,3 +1,5 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from uuid import UUID
@ -25,7 +27,7 @@ class AsyncStreamingLLMCallbackHandleSIO(AsyncCallbackHandler):
def __init__(self, session_id: str):
self.chat_service = get_chat_service()
self.client_id = session_id
self.socketio_service: "SocketIOService" = get_socket_service()
self.socketio_service: SocketIOService = get_socket_service()
self.sid = session_id
# self.socketio_service = self.chat_service.active_connections[self.client_id]

View file

@ -1,3 +1,5 @@
from __future__ import annotations
import asyncio
import json
import time
@ -72,9 +74,9 @@ async def retrieve_vertices_order(
data: Annotated[FlowDataRequest | None, Body(embed=True)] | None = None,
stop_component_id: str | None = None,
start_component_id: str | None = None,
chat_service: "ChatService" = Depends(get_chat_service),
chat_service: ChatService = Depends(get_chat_service),
session=Depends(get_session),
telemetry_service: "TelemetryService" = Depends(get_telemetry_service),
telemetry_service: TelemetryService = Depends(get_telemetry_service),
):
"""
Retrieve the vertices order for a given flow.
@ -148,12 +150,12 @@ async def build_flow(
stop_component_id: str | None = None,
start_component_id: str | None = None,
log_builds: bool | None = True,
chat_service: "ChatService" = Depends(get_chat_service),
chat_service: ChatService = Depends(get_chat_service),
current_user=Depends(get_current_active_user),
telemetry_service: "TelemetryService" = Depends(get_telemetry_service),
telemetry_service: TelemetryService = Depends(get_telemetry_service),
session=Depends(get_session),
):
async def build_graph_and_get_order() -> tuple[list[str], list[str], "Graph"]:
async def build_graph_and_get_order() -> tuple[list[str], list[str], Graph]:
start_time = time.perf_counter()
components_count = None
try:
@ -205,7 +207,7 @@ async def build_flow(
logger.exception(exc)
raise HTTPException(status_code=500, detail=str(exc)) from exc
async def _build_vertex(vertex_id: str, graph: "Graph", event_manager: "EventManager") -> VertexBuildResponse:
async def _build_vertex(vertex_id: str, graph: Graph, event_manager: EventManager) -> VertexBuildResponse:
flow_id_str = str(flow_id)
next_runnable_vertices = []
@ -320,9 +322,9 @@ async def build_flow(
async def build_vertices(
vertex_id: str,
graph: "Graph",
graph: Graph,
client_consumed_queue: asyncio.Queue,
event_manager: "EventManager",
event_manager: EventManager,
) -> None:
build_task = asyncio.create_task(asyncio.to_thread(asyncio.run, _build_vertex(vertex_id, graph, event_manager)))
try:
@ -457,9 +459,9 @@ async def build_vertex(
background_tasks: BackgroundTasks,
inputs: Annotated[InputValueRequest | None, Body(embed=True)] = None,
files: list[str] | None = None,
chat_service: "ChatService" = Depends(get_chat_service),
chat_service: ChatService = Depends(get_chat_service),
current_user=Depends(get_current_active_user),
telemetry_service: "TelemetryService" = Depends(get_telemetry_service),
telemetry_service: TelemetryService = Depends(get_telemetry_service),
):
"""Build a vertex instead of the entire graph.
@ -489,7 +491,7 @@ async def build_vertex(
if not cache:
# If there's no cache
logger.warning(f"No cache found for {flow_id_str}. Building graph starting at {vertex_id}")
graph: "Graph" = await build_graph_from_db(
graph: Graph = await build_graph_from_db(
flow_id=flow_id_str, session=next(get_session()), chat_service=chat_service
)
else:
@ -609,8 +611,8 @@ async def build_vertex_stream(
flow_id: uuid.UUID,
vertex_id: str,
session_id: str | None = None,
chat_service: "ChatService" = Depends(get_chat_service),
session_service: "SessionService" = Depends(get_session_service),
chat_service: ChatService = Depends(get_chat_service),
session_service: SessionService = Depends(get_session_service),
):
"""Build a vertex instead of the entire graph.
@ -650,7 +652,7 @@ async def build_vertex_stream(
else:
graph = cache.get("result")
vertex: "InterfaceVertex" = graph.get_vertex(vertex_id)
vertex: InterfaceVertex = graph.get_vertex(vertex_id)
if not hasattr(vertex, "stream"):
raise ValueError(f"Vertex {vertex_id} does not support streaming")
if isinstance(vertex._built_result, str) and vertex._built_result:

View file

@ -1,3 +1,5 @@
from __future__ import annotations
import time
from asyncio import Lock
from http import HTTPStatus
@ -62,7 +64,7 @@ router = APIRouter(tags=["Base"])
@router.get("/all", dependencies=[Depends(get_current_active_user)])
async def get_all(
settings_service=Depends(get_settings_service),
cache_service: "CacheService" = Depends(dependency=get_cache_service),
cache_service: CacheService = Depends(dependency=get_cache_service),
force_refresh: bool = False,
):
from langflow.interface.types import get_and_cache_all_types_dict
@ -181,7 +183,7 @@ async def simplified_run_flow(
input_request: SimplifiedAPIRequest = SimplifiedAPIRequest(),
stream: bool = False,
api_key_user: UserRead = Depends(api_key_security),
telemetry_service: "TelemetryService" = Depends(get_telemetry_service),
telemetry_service: TelemetryService = Depends(get_telemetry_service),
):
"""
Executes a specified flow by ID with input customization, performance enhancements through caching, and optional data streaming.
@ -290,7 +292,7 @@ async def webhook_run_flow(
user: Annotated[User, Depends(get_user_by_flow_id_or_endpoint_name)],
request: Request,
background_tasks: BackgroundTasks,
telemetry_service: "TelemetryService" = Depends(get_telemetry_service),
telemetry_service: TelemetryService = Depends(get_telemetry_service),
):
"""
Run a flow using a webhook request.
@ -484,7 +486,7 @@ async def process(
tweaks: dict | None = None,
clear_cache: Annotated[bool, Body(embed=True)] = False, # noqa: F821
session_id: Annotated[None | str, Body(embed=True)] = None, # noqa: F821
task_service: "TaskService" = Depends(get_task_service),
task_service: TaskService = Depends(get_task_service),
api_key_user: UserRead = Depends(api_key_security),
sync: Annotated[bool, Body(embed=True)] = True, # noqa: F821
session_service: SessionService = Depends(get_session_service),
@ -633,7 +635,7 @@ def get_config():
try:
from langflow.services.deps import get_settings_service
settings_service: "SettingsService" = get_settings_service() # type: ignore
settings_service: SettingsService = get_settings_service() # type: ignore
return settings_service.settings.model_dump()
except Exception as exc:
logger.exception(exc)

View file

@ -1,5 +1,5 @@
from abc import abstractmethod
from typing import List, Optional, Union, cast
from typing import cast
from langchain.agents import AgentExecutor, BaseMultiActionAgent, BaseSingleActionAgent
from langchain.agents.agent import RunnableAgent
@ -20,7 +20,7 @@ from langflow.utils.constants import MESSAGE_SENDER_AI
class LCAgentComponent(Component):
trace_type = "agent"
_base_inputs: List[InputTypes] = [
_base_inputs: list[InputTypes] = [
MessageTextInput(name="input_value", display_name="Input"),
BoolInput(
name="handle_parsing_errors",
@ -89,7 +89,7 @@ class LCAgentComponent(Component):
}
return {**base, "agent_executor_kwargs": agent_kwargs}
def get_chat_history_data(self) -> Optional[List[Data]]:
def get_chat_history_data(self) -> list[Data] | None:
# might be overridden in subclasses
return None
@ -128,7 +128,7 @@ class LCToolsAgentComponent(LCAgentComponent):
async def run_agent(
self,
agent: Union[Runnable, BaseSingleActionAgent, BaseMultiActionAgent, AgentExecutor],
agent: Runnable | BaseSingleActionAgent | BaseMultiActionAgent | AgentExecutor,
) -> Text:
if isinstance(agent, AgentExecutor):
runnable = agent

View file

@ -1,4 +1,4 @@
from typing import Any, Dict, List
from typing import Any
from uuid import UUID
from langchain.callbacks.base import AsyncCallbackHandler
@ -15,14 +15,14 @@ class AgentAsyncHandler(AsyncCallbackHandler):
async def on_tool_start(
self,
serialized: Dict[str, Any],
serialized: dict[str, Any],
input_str: str,
*,
run_id: UUID,
parent_run_id: UUID | None = None,
tags: List[str] | None = None,
metadata: Dict[str, Any] | None = None,
inputs: Dict[str, Any] | None = None,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
inputs: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
if self.log_function is None:
@ -62,7 +62,7 @@ class AgentAsyncHandler(AsyncCallbackHandler):
*,
run_id: UUID,
parent_run_id: UUID | None = None,
tags: List[str] | None = None,
tags: list[str] | None = None,
**kwargs: Any,
) -> None:
if self.log_function is None:
@ -85,7 +85,7 @@ class AgentAsyncHandler(AsyncCallbackHandler):
*,
run_id: UUID,
parent_run_id: UUID | None = None,
tags: List[str] | None = None,
tags: list[str] | None = None,
**kwargs: Any,
) -> None:
if self.log_function is None:

View file

@ -1,4 +1,5 @@
from typing import Callable, List, Tuple, Union, cast
from collections.abc import Callable
from typing import cast
from crewai import Agent, Crew, Process, Task # type: ignore
from crewai.task import TaskOutput # type: ignore
@ -62,7 +63,7 @@ class BaseCrewComponent(Component):
def get_step_callback(
self,
) -> Callable:
def step_callback(agent_output: Union[AgentFinish, List[Tuple[AgentAction, str]]]):
def step_callback(agent_output: AgentFinish | list[tuple[AgentAction, str]]):
_id = self._vertex.id if self._vertex else self.display_name
if isinstance(agent_output, AgentFinish):
messages = agent_output.messages

View file

@ -1,4 +1,5 @@
from typing import Any, Callable, Dict, List, Optional, Sequence, Union
from collections.abc import Callable, Sequence
from typing import Any
from langchain.agents import (
create_json_chat_agent,
@ -24,17 +25,17 @@ class AgentSpec(BaseModel):
BaseLanguageModel,
Sequence[BaseTool],
BasePromptTemplate | ChatPromptTemplate,
Optional[Callable[[List[BaseTool]], str]],
Optional[Union[bool, List[str]]],
Callable[[list[BaseTool]], str] | None,
bool | list[str] | None,
],
Any,
]
prompt: Optional[Any] = None
fields: List[str]
hub_repo: Optional[str] = None
prompt: Any | None = None
fields: list[str]
hub_repo: str | None = None
def data_to_messages(data: List[Data]) -> List[BaseMessage]:
def data_to_messages(data: list[Data]) -> list[BaseMessage]:
"""
Convert a list of data to a list of messages.
@ -51,9 +52,9 @@ def validate_and_create_xml_agent(
llm: BaseLanguageModel,
tools: Sequence[BaseTool],
prompt: BasePromptTemplate,
tools_renderer: Callable[[List[BaseTool]], str] = render_text_description,
tools_renderer: Callable[[list[BaseTool]], str] = render_text_description,
*,
stop_sequence: Union[bool, List[str]] = True,
stop_sequence: bool | list[str] = True,
):
return create_xml_agent(
llm=llm,
@ -68,9 +69,9 @@ def validate_and_create_openai_tools_agent(
llm: BaseLanguageModel,
tools: Sequence[BaseTool],
prompt: ChatPromptTemplate,
tools_renderer: Callable[[List[BaseTool]], str] = render_text_description,
tools_renderer: Callable[[list[BaseTool]], str] = render_text_description,
*,
stop_sequence: Union[bool, List[str]] = True,
stop_sequence: bool | list[str] = True,
):
return create_openai_tools_agent(
llm=llm,
@ -83,9 +84,9 @@ def validate_and_create_tool_calling_agent(
llm: BaseLanguageModel,
tools: Sequence[BaseTool],
prompt: ChatPromptTemplate,
tools_renderer: Callable[[List[BaseTool]], str] = render_text_description,
tools_renderer: Callable[[list[BaseTool]], str] = render_text_description,
*,
stop_sequence: Union[bool, List[str]] = True,
stop_sequence: bool | list[str] = True,
):
return create_tool_calling_agent(
llm=llm,
@ -98,9 +99,9 @@ def validate_and_create_json_chat_agent(
llm: BaseLanguageModel,
tools: Sequence[BaseTool],
prompt: ChatPromptTemplate,
tools_renderer: Callable[[List[BaseTool]], str] = render_text_description,
tools_renderer: Callable[[list[BaseTool]], str] = render_text_description,
*,
stop_sequence: Union[bool, List[str]] = True,
stop_sequence: bool | list[str] = True,
):
return create_json_chat_agent(
llm=llm,
@ -111,7 +112,7 @@ def validate_and_create_json_chat_agent(
)
AGENTS: Dict[str, AgentSpec] = {
AGENTS: dict[str, AgentSpec] = {
"Tool Calling Agent": AgentSpec(
func=validate_and_create_tool_calling_agent,
prompt=None,

View file

@ -153,13 +153,13 @@ def parse_context(curl_command):
# proxy_auth = parsed_args.proxy_user
if parsed_args.proxy and parsed_args.proxy_user:
proxies = {
"http": "http://{}@{}/".format(parsed_args.proxy_user, parsed_args.proxy),
"https": "http://{}@{}/".format(parsed_args.proxy_user, parsed_args.proxy),
"http": f"http://{parsed_args.proxy_user}@{parsed_args.proxy}/",
"https": f"http://{parsed_args.proxy_user}@{parsed_args.proxy}/",
}
elif parsed_args.proxy:
proxies = {
"http": "http://{}/".format(parsed_args.proxy),
"https": "http://{}/".format(parsed_args.proxy),
"http": f"http://{parsed_args.proxy}/",
"https": f"http://{parsed_args.proxy}/",
}
return ParsedContext(

View file

@ -1,8 +1,8 @@
import unicodedata
import xml.etree.ElementTree as ET
from collections.abc import Callable
from concurrent import futures
from pathlib import Path
from typing import Callable, List, Optional
import chardet
import orjson
@ -49,8 +49,8 @@ def retrieve_file_paths(
load_hidden: bool,
recursive: bool,
depth: int,
types: List[str] = TEXT_FILE_TYPES,
) -> List[str]:
types: list[str] = TEXT_FILE_TYPES,
) -> list[str]:
path_obj = Path(path)
if not path_obj.exists() or not path_obj.is_dir():
raise ValueError(f"Path {path} must exist and be a directory.")
@ -75,7 +75,7 @@ def retrieve_file_paths(
return file_paths
def partition_file_to_data(file_path: str, silent_errors: bool) -> Optional[Data]:
def partition_file_to_data(file_path: str, silent_errors: bool) -> Data | None:
# Use the partition function to load the file
from unstructured.partition.auto import partition # type: ignore
@ -103,7 +103,7 @@ def read_text_file(file_path: str) -> str:
if encoding in ["Windows-1252", "Windows-1254", "MacRoman"]:
encoding = "utf-8"
with open(file_path, "r", encoding=encoding) as f:
with open(file_path, encoding=encoding) as f:
return f.read()
@ -122,7 +122,7 @@ def parse_pdf_to_text(file_path: str) -> str:
return "\n\n".join([page.extract_text() for page in reader.pages])
def parse_text_file_to_data(file_path: str, silent_errors: bool) -> Optional[Data]:
def parse_text_file_to_data(file_path: str, silent_errors: bool) -> Data | None:
try:
if file_path.endswith(".pdf"):
text = parse_pdf_to_text(file_path)
@ -171,11 +171,11 @@ def parse_text_file_to_data(file_path: str, silent_errors: bool) -> Optional[Dat
def parallel_load_data(
file_paths: List[str],
file_paths: list[str],
silent_errors: bool,
max_concurrency: int,
load_function: Callable = parse_text_file_to_data,
) -> List[Optional[Data]]:
) -> list[Data | None]:
with futures.ThreadPoolExecutor(max_workers=max_concurrency) as executor:
loaded_files = executor.map(
lambda file_path: load_function(file_path, silent_errors),

View file

@ -1,5 +1,3 @@
from typing import List
from loguru import logger
from langflow.graph.schema import ResultData, RunOutputs
@ -7,7 +5,7 @@ from langflow.schema import Data
from langflow.schema.message import Message
def build_data_from_run_outputs(run_outputs: RunOutputs) -> List[Data]:
def build_data_from_run_outputs(run_outputs: RunOutputs) -> list[Data]:
"""
Build a list of data from the given RunOutputs.
@ -27,7 +25,7 @@ def build_data_from_run_outputs(run_outputs: RunOutputs) -> List[Data]:
return data
def build_data_from_result_data(result_data: ResultData, get_final_results_only: bool = True) -> List[Data]:
def build_data_from_result_data(result_data: ResultData, get_final_results_only: bool = True) -> list[Data]:
"""
Build a list of data from the given ResultData.
@ -80,7 +78,7 @@ def build_data_from_result_data(result_data: ResultData, get_final_results_only:
return data
def format_flow_output_data(data: List[Data]) -> str:
def format_flow_output_data(data: list[Data]) -> str:
"""
Format the flow output data into a string.

View file

@ -1,4 +1,4 @@
from typing import AsyncIterator, Iterator, Optional, Union
from collections.abc import AsyncIterator, Iterator
from langflow.custom import Component
from langflow.memory import store_message
@ -54,7 +54,7 @@ class ChatComponent(Component):
def _stream_message(self, message: Message, message_id: str) -> str:
iterator = message.text
if not isinstance(iterator, (AsyncIterator, Iterator)):
if not isinstance(iterator, AsyncIterator | Iterator):
raise ValueError("The message must be an iterator or an async iterator.")
if isinstance(iterator, AsyncIterator):
@ -68,12 +68,12 @@ class ChatComponent(Component):
def build_with_data(
self,
sender: Optional[str] = "User",
sender_name: Optional[str] = "User",
input_value: Optional[Union[str, Data, Message]] = None,
files: Optional[list[str]] = None,
session_id: Optional[str] = None,
return_message: Optional[bool] = False,
sender: str | None = "User",
sender_name: str | None = "User",
input_value: str | Data | Message | None = None,
files: list[str] | None = None,
session_id: str | None = None,
return_message: bool | None = False,
) -> Message:
message: Message | None = None

View file

@ -1,5 +1,5 @@
from abc import abstractmethod
from typing import Sequence, Union
from collections.abc import Sequence
from langflow.custom import Component
from langflow.field_typing import Tool
@ -24,7 +24,7 @@ class LCToolComponent(Component):
raise ValueError(f"Method '{method_name}' must be defined.")
@abstractmethod
def run_model(self) -> Union[Data, list[Data]]:
def run_model(self) -> Data | list[Data]:
"""
Run model and return the output.
"""

View file

@ -1,5 +1,3 @@
from typing import Optional
from langflow.custom import CustomComponent
from langflow.schema import Data
from langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_USER
@ -45,6 +43,6 @@ class BaseMemoryComponent(CustomComponent):
raise NotImplementedError
def add_message(
self, sender: str, sender_name: str, text: str, session_id: str, metadata: Optional[dict] = None, **kwargs
self, sender: str, sender_name: str, text: str, session_id: str, metadata: dict | None = None, **kwargs
):
raise NotImplementedError

View file

@ -1,7 +1,6 @@
import json
import warnings
from abc import abstractmethod
from typing import List, Optional, Union
from langchain_core.language_models.llms import LLM
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage, SystemMessage
@ -20,7 +19,7 @@ class LCModelComponent(Component):
description: str = "Model Description"
trace_type = "llm"
_base_inputs: List[InputTypes] = [
_base_inputs: list[InputTypes] = [
MessageInput(name="input_value", display_name="Input"),
MessageTextInput(
name="system_message",
@ -138,9 +137,9 @@ class LCModelComponent(Component):
runnable: LanguageModel,
stream: bool,
input_value: str | Message,
system_message: Optional[str] = None,
system_message: str | None = None,
):
messages: list[Union[BaseMessage]] = []
messages: list[BaseMessage] = []
if not input_value and not system_message:
raise ValueError("The message you want to send to the model is empty.")
system_message_added = False
@ -161,7 +160,7 @@ class LCModelComponent(Component):
if system_message and not system_message_added:
messages.append(SystemMessage(content=system_message))
inputs: Union[list, dict] = messages or {}
inputs: list | dict = messages or {}
try:
runnable = runnable.with_config( # type: ignore
{

View file

@ -1,5 +1,5 @@
from collections import defaultdict
from typing import Any, Dict, List, Optional
from typing import Any
from fastapi import HTTPException
from langchain_core.prompts import PromptTemplate
@ -198,7 +198,7 @@ def update_input_variables_field(input_variables, template):
def process_prompt_template(
template: str, name: str, custom_fields: Optional[Dict[str, List[str]]], frontend_node_template: Dict[str, Any]
template: str, name: str, custom_fields: dict[str, list[str]] | None, frontend_node_template: dict[str, Any]
):
"""Process and validate prompt template, update template and custom fields."""
# Validate the prompt template and extract input variables

View file

@ -1,5 +1,5 @@
import warnings
from typing import Any, List, Optional, Type
from typing import Any
from langchain_core.runnables import RunnableConfig
from langchain_core.tools import BaseTool, ToolException
@ -15,10 +15,10 @@ from langflow.utils.async_helpers import run_until_complete
class FlowTool(BaseTool):
name: str
description: str
graph: Optional[Graph] = None
flow_id: Optional[str] = None
user_id: Optional[str] = None
inputs: List["Vertex"] = []
graph: Graph | None = None
flow_id: str | None = None
user_id: str | None = None
inputs: list["Vertex"] = []
get_final_results_only: bool = True
@property
@ -26,7 +26,7 @@ class FlowTool(BaseTool):
schema = self.get_input_schema()
return schema.schema()["properties"]
def get_input_schema(self, config: Optional[RunnableConfig] = None) -> Type[BaseModel]:
def get_input_schema(self, config: RunnableConfig | None = None) -> type[BaseModel]:
"""The tool's input schema."""
if self.args_schema is not None:
return self.args_schema
@ -68,7 +68,7 @@ class FlowTool(BaseTool):
data.extend(build_data_from_result_data(output, get_final_results_only=self.get_final_results_only))
return format_flow_output_data(data)
def validate_inputs(self, args_names: List[dict[str, str]], args: Any, kwargs: Any):
def validate_inputs(self, args_names: list[dict[str, str]], args: Any, kwargs: Any):
"""Validate the inputs."""
if len(args) > 0 and len(args) != len(args_names):

View file

@ -1,6 +1,6 @@
from abc import ABC, ABCMeta, abstractmethod
from functools import wraps
from typing import List, cast
from typing import cast
from langchain_core.documents import Document
from loguru import logger
@ -94,7 +94,7 @@ class LCVectorStoreComponent(Component, ABC, metaclass=EnforceCacheDecoratorMeta
vector_store: VectorStore,
k=10,
**kwargs,
) -> List[Data]:
) -> list[Data]:
"""
Search for data in the vector store based on the input value and search type.
@ -110,7 +110,7 @@ class LCVectorStoreComponent(Component, ABC, metaclass=EnforceCacheDecoratorMeta
ValueError: If invalid inputs are provided.
"""
docs: List[Document] = []
docs: list[Document] = []
if input_value and isinstance(input_value, str) and hasattr(vector_store, "search"):
docs = vector_store.search(query=input_value, search_type=search_type.lower(), k=k, **kwargs)
else:
@ -140,7 +140,7 @@ class LCVectorStoreComponent(Component, ABC, metaclass=EnforceCacheDecoratorMeta
else:
raise ValueError(f"Vector Store {vector_store.__class__.__name__} does not have an as_retriever method.")
def search_documents(self) -> List[Data]:
def search_documents(self) -> list[Data]:
"""
Search for documents in the vector store.
"""

View file

@ -1,5 +1,5 @@
import json
from typing import Any, Dict, Union
from typing import Any
import requests
from bs4 import BeautifulSoup
@ -54,7 +54,7 @@ class AddContentToPage(LCToolComponent):
args_schema=self.AddContentToPageSchema,
)
def _add_content_to_page(self, markdown_text: str, block_id: str) -> Union[Dict[str, Any], str]:
def _add_content_to_page(self, markdown_text: str, block_id: str) -> dict[str, Any] | str:
try:
html_text = markdown(markdown_text)
soup = BeautifulSoup(html_text, "html.parser")
@ -209,7 +209,7 @@ class AddContentToPage(LCToolComponent):
return blocks
def create_block(self, block_type: str, content: str, **kwargs) -> Dict[str, Any]:
def create_block(self, block_type: str, content: str, **kwargs) -> dict[str, Any]:
block: dict[str, Any] = {
"object": "block",
"type": block_type,

View file

@ -1,5 +1,5 @@
import json
from typing import Any, Dict, Union
from typing import Any
import requests
from langchain.tools import StructuredTool
@ -60,7 +60,7 @@ class NotionPageCreator(LCToolComponent):
args_schema=self.NotionPageCreatorSchema,
)
def _create_notion_page(self, database_id: str, properties_json: str) -> Union[Dict[str, Any], str]:
def _create_notion_page(self, database_id: str, properties_json: str) -> dict[str, Any] | str:
if not database_id or not properties_json:
return "Invalid input. Please provide 'database_id' and 'properties_json'."

View file

@ -1,5 +1,3 @@
from typing import Dict, Union
import requests
from langchain.tools import StructuredTool
from pydantic import BaseModel, Field
@ -50,7 +48,7 @@ class NotionDatabaseProperties(LCToolComponent):
args_schema=self.NotionDatabasePropertiesSchema,
)
def _fetch_database_properties(self, database_id: str) -> Union[Dict, str]:
def _fetch_database_properties(self, database_id: str) -> dict | str:
url = f"https://api.notion.com/v1/databases/{database_id}"
headers = {
"Authorization": f"Bearer {self.notion_secret}",

View file

@ -1,5 +1,5 @@
import json
from typing import Any, Dict, List, Optional
from typing import Any
import requests
from langchain.tools import StructuredTool
@ -43,12 +43,12 @@ class NotionListPages(LCToolComponent):
class NotionListPagesSchema(BaseModel):
database_id: str = Field(..., description="The ID of the Notion database to query.")
query_json: Optional[str] = Field(
query_json: str | None = Field(
default="",
description="A JSON string containing the filters and sorts for querying the database. Leave empty for no filters or sorts.",
)
def run_model(self) -> List[Data]:
def run_model(self) -> list[Data]:
result = self._query_notion_database(self.database_id, self.query_json)
if isinstance(result, str):
@ -89,7 +89,7 @@ class NotionListPages(LCToolComponent):
args_schema=self.NotionListPagesSchema,
)
def _query_notion_database(self, database_id: str, query_json: Optional[str] = None) -> List[Dict[str, Any]] | str:
def _query_notion_database(self, database_id: str, query_json: str | None = None) -> list[dict[str, Any]] | str:
url = f"https://api.notion.com/v1/databases/{database_id}/query"
headers = {
"Authorization": f"Bearer {self.notion_secret}",

View file

@ -1,5 +1,3 @@
from typing import Dict, List
import requests
from langchain.tools import StructuredTool
from pydantic import BaseModel
@ -28,7 +26,7 @@ class NotionUserList(LCToolComponent):
class NotionUserListSchema(BaseModel):
pass
def run_model(self) -> List[Data]:
def run_model(self) -> list[Data]:
users = self._list_users()
records = []
combined_text = ""
@ -53,7 +51,7 @@ class NotionUserList(LCToolComponent):
args_schema=self.NotionUserListSchema,
)
def _list_users(self) -> List[Dict]:
def _list_users(self) -> list[dict]:
url = "https://api.notion.com/v1/users"
headers = {
"Authorization": f"Bearer {self.notion_secret}",

View file

@ -1,4 +1,4 @@
from typing import Any, Dict, List
from typing import Any
import requests
from langchain.tools import StructuredTool
@ -49,7 +49,7 @@ class NotionSearch(LCToolComponent):
filter_value: str = Field(default="page", description="Filter type: 'page' or 'database'.")
sort_direction: str = Field(default="descending", description="Sort direction: 'ascending' or 'descending'.")
def run_model(self) -> List[Data]:
def run_model(self) -> list[Data]:
results = self._search_notion(self.query, self.filter_value, self.sort_direction)
records = []
combined_text = f"Results found: {len(results)}\n\n"
@ -89,7 +89,7 @@ class NotionSearch(LCToolComponent):
def _search_notion(
self, query: str, filter_value: str = "page", sort_direction: str = "descending"
) -> List[Dict[str, Any]]:
) -> list[dict[str, Any]]:
url = "https://api.notion.com/v1/search"
headers = {
"Authorization": f"Bearer {self.notion_secret}",

View file

@ -1,5 +1,5 @@
import json
from typing import Any, Dict, Union
from typing import Any
import requests
from langchain.tools import StructuredTool
@ -39,7 +39,7 @@ class NotionPageUpdate(LCToolComponent):
class NotionPageUpdateSchema(BaseModel):
page_id: str = Field(..., description="The ID of the Notion page to update.")
properties: Union[str, Dict[str, Any]] = Field(
properties: str | dict[str, Any] = Field(
..., description="The properties to update on the page (as a JSON string or a dictionary)."
)
@ -63,7 +63,7 @@ class NotionPageUpdate(LCToolComponent):
args_schema=self.NotionPageUpdateSchema,
)
def _update_notion_page(self, page_id: str, properties: Union[str, Dict[str, Any]]) -> Union[Dict[str, Any], str]:
def _update_notion_page(self, page_id: str, properties: str | dict[str, Any]) -> dict[str, Any] | str:
url = f"https://api.notion.com/v1/pages/{page_id}"
headers = {
"Authorization": f"Bearer {self.notion_secret}",

View file

@ -22,7 +22,7 @@ class JsonAgentComponent(LCAgentComponent):
def build_agent(self) -> AgentExecutor:
if self.path.endswith("yaml") or self.path.endswith("yml"):
with open(self.path, "r") as file:
with open(self.path) as file:
yaml_dict = yaml.load(file, Loader=yaml.FullLoader)
spec = JsonSpec(dict_=yaml_dict)
else:

View file

@ -1,5 +1,3 @@
from typing import List, Optional
from langchain.agents import create_openai_tools_agent
from langchain_core.prompts import ChatPromptTemplate, HumanMessagePromptTemplate, PromptTemplate
@ -35,7 +33,7 @@ class OpenAIToolsAgentComponent(LCToolsAgentComponent):
DataInput(name="chat_history", display_name="Chat History", is_list=True, advanced=True),
]
def get_chat_history_data(self) -> Optional[List[Data]]:
def get_chat_history_data(self) -> list[Data] | None:
return self.chat_history
def create_agent_runnable(self):

View file

@ -24,7 +24,7 @@ class OpenAPIAgentComponent(LCAgentComponent):
def build_agent(self) -> AgentExecutor:
if self.path.endswith("yaml") or self.path.endswith("yml"):
with open(self.path, "r") as file:
with open(self.path) as file:
yaml_dict = yaml.load(file, Loader=yaml.FullLoader)
spec = JsonSpec(dict_=yaml_dict)
else:

View file

@ -1,5 +1,3 @@
from typing import List, Optional
from langchain.agents import create_tool_calling_agent
from langchain_core.prompts import ChatPromptTemplate, HumanMessagePromptTemplate, PromptTemplate
@ -30,7 +28,7 @@ class ToolCallingAgentComponent(LCToolsAgentComponent):
DataInput(name="chat_history", display_name="Chat History", is_list=True, advanced=True),
]
def get_chat_history_data(self) -> Optional[List[Data]]:
def get_chat_history_data(self) -> list[Data] | None:
return self.chat_history
def create_agent_runnable(self):

View file

@ -1,5 +1,3 @@
from typing import List, Optional
from langchain.agents import create_xml_agent
from langchain_core.prompts import ChatPromptTemplate, HumanMessagePromptTemplate, PromptTemplate
@ -52,7 +50,7 @@ Question: {input}
),
]
def get_chat_history_data(self) -> Optional[List[Data]]:
def get_chat_history_data(self) -> list[Data] | None:
return self.chat_history
def create_agent_runnable(self):

View file

@ -1,4 +1,4 @@
from typing import Any, Optional
from typing import Any
from astra_assistants import patch # type: ignore
from openai import OpenAI
@ -20,7 +20,7 @@ class AssistantsRun(Component):
self,
build_config: dotdict,
field_value: Any,
field_name: Optional[str] = None,
field_name: str | None = None,
):
if field_name == "thread_id":
if field_value is None:

View file

@ -1,6 +1,6 @@
import asyncio
import json
from typing import Any, List, Optional
from typing import Any
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
import httpx
@ -104,8 +104,8 @@ class APIRequestComponent(Component):
client: httpx.AsyncClient,
method: str,
url: str,
headers: Optional[dict] = None,
body: Optional[dict] = None,
headers: dict | None = None,
body: dict | None = None,
timeout: int = 5,
) -> Data:
method = method.upper()
@ -162,7 +162,7 @@ class APIRequestComponent(Component):
url_parts[4] = urlencode(query)
return urlunparse(url_parts)
async def make_requests(self) -> List[Data]:
async def make_requests(self) -> list[Data]:
method = self.method
urls = [url.strip() for url in self.urls if url.strip()]
curl = self.curl

View file

@ -1,5 +1,3 @@
from typing import List
from langflow.base.data.utils import parallel_load_data, parse_text_file_to_data, retrieve_file_paths
from langflow.custom import Component
from langflow.io import BoolInput, IntInput, MessageTextInput
@ -68,7 +66,7 @@ class DirectoryComponent(Component):
Output(display_name="Data", name="data", method="load_directory"),
]
def load_directory(self) -> List[Data]:
def load_directory(self) -> list[Data]:
path = self.path
types = self.types or [] # self.types is already a list due to is_list=True
depth = self.depth

View file

@ -1,8 +1,9 @@
import base64
import json
import re
from collections.abc import Iterator
from json.decoder import JSONDecodeError
from typing import Any, Iterator, List, Optional
from typing import Any
from google.auth.exceptions import RefreshError
from google.oauth2.credentials import Credentials
@ -29,7 +30,7 @@ class GmailLoaderComponent(Component):
display_name="JSON String of the Service Account Token",
info="JSON string containing OAuth 2.0 access token information for service account access",
required=True,
value=str("""{
value="""{
"account": "",
"client_id": "",
"client_secret": "",
@ -41,7 +42,7 @@ class GmailLoaderComponent(Component):
"token": "",
"token_uri": "https://oauth2.googleapis.com/token",
"universe_domain": "googleapis.com"
}"""),
}""",
),
MessageTextInput(
name="label_ids",
@ -66,7 +67,7 @@ class GmailLoaderComponent(Component):
def load_emails(self) -> Data:
class CustomGMailLoader(GMailLoader):
def __init__(
self, creds: Any, n: int = 100, label_ids: Optional[List[str]] = None, raise_error: bool = False
self, creds: Any, n: int = 100, label_ids: list[str] | None = None, raise_error: bool = False
) -> None:
super().__init__(creds, n, raise_error)
self.label_ids = label_ids if label_ids is not None else ["SENT"]

View file

@ -1,6 +1,5 @@
import json
from json.decoder import JSONDecodeError
from typing import Optional
from google.auth.exceptions import RefreshError
from google.oauth2.credentials import Credentials
@ -37,7 +36,7 @@ class GoogleDriveComponent(Component):
def load_documents(self) -> Data:
class CustomGoogleDriveLoader(GoogleDriveLoader):
creds: Optional[Credentials] = None
creds: Credentials | None = None
"""Credentials object to be passed directly."""
def _load_credentials(self):

View file

@ -1,5 +1,4 @@
import json
from typing import List
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
@ -146,13 +145,13 @@ class GoogleDriveSearchComponent(Component):
return {"doc_urls": doc_urls, "doc_ids": doc_ids, "doc_titles_urls": doc_titles_urls, "doc_titles": doc_titles}
def search_doc_ids(self) -> List[str]:
def search_doc_ids(self) -> list[str]:
return self.search_files()["doc_ids"]
def search_doc_urls(self) -> List[str]:
def search_doc_urls(self) -> list[str]:
return self.search_files()["doc_urls"]
def search_doc_titles(self) -> List[str]:
def search_doc_titles(self) -> list[str]:
return self.search_files()["doc_titles"]
def search_data(self) -> Data:

View file

@ -1,5 +1,3 @@
from typing import List
from langchain_core.documents import Document
from langflow.custom import CustomComponent
@ -16,7 +14,7 @@ class DocumentsToDataComponent(CustomComponent):
"documents": {"display_name": "Documents"},
}
def build(self, documents: List[Document]) -> List[Data]:
def build(self, documents: list[Document]) -> list[Data]:
if isinstance(documents, Document):
documents = [documents]
data = [Data.from_document(document) for document in documents]

View file

@ -1,5 +1,3 @@
from typing import List
from langflow.custom import CustomComponent
from langflow.schema import Data
@ -16,7 +14,7 @@ class ListFlowsComponent(CustomComponent):
def build(
self,
) -> List[Data]:
) -> list[Data]:
flows = self.list_flows()
self.status = flows
return flows

View file

@ -1,5 +1,3 @@
from typing import Optional
from langflow.custom import CustomComponent
from langflow.schema.message import Message
from langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_USER
@ -28,8 +26,8 @@ class MessageComponent(CustomComponent):
def build(
self,
sender: str = MESSAGE_SENDER_USER,
sender_name: Optional[str] = None,
session_id: Optional[str] = None,
sender_name: str | None = None,
session_id: str | None = None,
text: str = "",
) -> Message:
message = Message(

View file

@ -1,5 +1,3 @@
from typing import List
from langchain_text_splitters import CharacterTextSplitter
from langflow.custom import Component
@ -52,7 +50,7 @@ class SplitTextComponent(Component):
data.append(Data(text=doc.page_content, data=doc.metadata))
return data
def split_text(self) -> List[Data]:
def split_text(self) -> list[Data]:
separator = unescape_string(self.separator)
documents = []

View file

@ -1,4 +1,4 @@
from typing import Any, List, Optional
from typing import Any
from loguru import logger
@ -22,11 +22,11 @@ class SubFlowComponent(CustomComponent):
field_order = ["flow_name"]
name = "SubFlow"
def get_flow_names(self) -> List[str]:
def get_flow_names(self) -> list[str]:
flow_datas = self.list_flows()
return [flow_data.data["name"] for flow_data in flow_datas]
def get_flow(self, flow_name: str) -> Optional[Data]:
def get_flow(self, flow_name: str) -> Data | None:
flow_datas = self.list_flows()
for flow_data in flow_datas:
if flow_data.data["name"] == flow_name:
@ -56,7 +56,7 @@ class SubFlowComponent(CustomComponent):
return build_config
def add_inputs_to_build_config(self, inputs: List[Vertex], build_config: dotdict):
def add_inputs_to_build_config(self, inputs: list[Vertex], build_config: dotdict):
new_fields: list[Input] = []
for vertex in inputs:
field = Input(
@ -96,9 +96,9 @@ class SubFlowComponent(CustomComponent):
},
}
async def build(self, flow_name: str, get_final_results_only: bool = True, **kwargs) -> List[Data]:
async def build(self, flow_name: str, get_final_results_only: bool = True, **kwargs) -> list[Data]:
tweaks = {key: {"input_value": value} for key, value in kwargs.items()}
run_outputs: List[Optional[RunOutputs]] = await self.run_flow(
run_outputs: list[RunOutputs | None] = await self.run_flow(
tweaks=tweaks,
flow_name=flow_name,
)

View file

@ -1,5 +1,3 @@
from typing import List
from langchain_community.document_loaders import ConfluenceLoader
from langchain_community.document_loaders.confluence import ContentFormat
@ -79,7 +77,7 @@ class ConfluenceComponent(Component):
)
return loader
def load_documents(self) -> List[Data]:
def load_documents(self) -> list[Data]:
confluence = self.build_confluence()
documents = confluence.load()
data = [Data.from_document(doc) for doc in documents] # Using the from_document method of Data

View file

@ -1,6 +1,5 @@
import re
from pathlib import Path
from typing import List
from langchain_community.document_loaders.git import GitLoader
@ -109,7 +108,7 @@ class GitLoaderComponent(Component):
)
return loader
def load_documents(self) -> List[Data]:
def load_documents(self) -> list[Data]:
gitloader = self.build_gitloader()
documents = list(gitloader.lazy_load())
data = [Data.from_document(doc) for doc in documents]

View file

@ -1,5 +1,3 @@
from typing import List
from langchain_unstructured import UnstructuredLoader
from langflow.custom import Component
@ -47,7 +45,7 @@ class UnstructuredComponent(Component):
return loader
def load_documents(self) -> List[Data]:
def load_documents(self) -> list[Data]:
unstructured = self.build_unstructured()
documents = unstructured.load()

View file

@ -1,5 +1,3 @@
from typing import List
import numpy as np
from langflow.custom import Component
@ -33,7 +31,7 @@ class EmbeddingSimilarityComponent(Component):
]
def compute_similarity(self) -> Data:
embedding_vectors: List[Data] = self.embedding_vectors
embedding_vectors: list[Data] = self.embedding_vectors
# Assert that the list contains exactly two Data objects
assert len(embedding_vectors) == 2, "Exactly two embedding vectors are required."

View file

@ -1,17 +1,11 @@
# from langflow.field_typing import Data
from typing import List, Optional
import numpy as np
# TODO: remove ignore once the google package is published with types
from google.ai.generativelanguage_v1beta.types import (
BatchEmbedContentsRequest,
)
from google.ai.generativelanguage_v1beta.types import BatchEmbedContentsRequest
from langchain_core.embeddings import Embeddings
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_google_genai._common import (
GoogleGenerativeAIError,
)
from langchain_google_genai._common import GoogleGenerativeAIError
from langflow.custom import Component
from langflow.io import MessageTextInput, Output, SecretStrInput
@ -43,13 +37,13 @@ class GoogleGenerativeAIEmbeddingsComponent(Component):
def embed_documents(
self,
texts: List[str],
texts: list[str],
*,
batch_size: int = 100,
task_type: Optional[str] = None,
titles: Optional[List[str]] = None,
output_dimensionality: Optional[int] = 1536,
) -> List[List[float]]:
task_type: str | None = None,
titles: list[str] | None = None,
output_dimensionality: int | None = 1536,
) -> list[list[float]]:
"""Embed a list of strings. Google Generative AI currently
sets a max batch size of 100 strings.
@ -64,7 +58,7 @@ class GoogleGenerativeAIEmbeddingsComponent(Component):
Returns:
List of embeddings, one for each text.
"""
embeddings: List[List[float]] = []
embeddings: list[list[float]] = []
batch_start_index = 0
for batch in GoogleGenerativeAIEmbeddings._prepare_batches(texts, batch_size):
if titles:
@ -95,10 +89,10 @@ class GoogleGenerativeAIEmbeddingsComponent(Component):
def embed_query(
self,
text: str,
task_type: Optional[str] = None,
title: Optional[str] = None,
output_dimensionality: Optional[int] = 1536,
) -> List[float]:
task_type: str | None = None,
title: str | None = None,
output_dimensionality: int | None = 1536,
) -> list[float]:
"""Embed a text.
Args:

View file

@ -1,6 +1,5 @@
import concurrent.futures
import json
from typing import List
import httpx
from langchain_core.pydantic_v1 import BaseModel, SecretStr
@ -15,7 +14,7 @@ class AIMLEmbeddingsImpl(BaseModel, Embeddings):
api_key: SecretStr
model: str
def embed_documents(self, texts: List[str]) -> List[List[float]]:
def embed_documents(self, texts: list[str]) -> list[list[float]]:
embeddings = [None] * len(texts)
headers = {
"Content-Type": "application/json",
@ -58,5 +57,5 @@ class AIMLEmbeddingsImpl(BaseModel, Embeddings):
result_data = response.json()
return result_data
def embed_query(self, text: str) -> List[float]:
def embed_query(self, text: str) -> list[float]:
return self.embed_documents([text])[0]

View file

@ -1,5 +1,3 @@
from typing import List
from langflow.custom import Component
from langflow.io import DataInput, MessageTextInput, Output
from langflow.schema import Data
@ -31,7 +29,7 @@ class FilterDataComponent(Component):
]
def filter_data(self) -> Data:
filter_criteria: List[str] = self.filter_criteria
filter_criteria: list[str] = self.filter_criteria
data = self.data.data if isinstance(self.data, Data) else {}
# Filter the data

View file

@ -1,5 +1,5 @@
import uuid
from typing import Any, Optional
from typing import Any
from langflow.custom import Component
from langflow.io import MessageTextInput, Output
@ -26,7 +26,7 @@ class IDGeneratorComponent(Component):
Output(display_name="ID", name="id", method="generate_id"),
]
def update_build_config(self, build_config: dotdict, field_value: Any, field_name: Optional[str] = None):
def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
if field_name == "unique_id":
build_config[field_name]["value"] = str(uuid.uuid4())
return build_config

View file

@ -1,5 +1,3 @@
from typing import List
from langchain_text_splitters import CharacterTextSplitter
from langflow.custom import Component
@ -52,7 +50,7 @@ class SplitTextComponent(Component):
data.append(Data(text=doc.page_content, data=doc.metadata))
return data
def split_text(self) -> List[Data]:
def split_text(self) -> list[Data]:
separator = unescape_string(self.separator)
documents = []

View file

@ -1,5 +1,4 @@
import uuid
from typing import Optional
from langflow.custom import CustomComponent
from langflow.schema import Data
@ -51,9 +50,9 @@ class FirecrawlCrawlApi(CustomComponent):
api_key: str,
url: str,
timeout: int = 30000,
crawlerOptions: Optional[Data] = None,
pageOptions: Optional[Data] = None,
idempotency_key: Optional[str] = None,
crawlerOptions: Data | None = None,
pageOptions: Data | None = None,
idempotency_key: str | None = None,
) -> Data:
try:
from firecrawl.firecrawl import FirecrawlApp # type: ignore

View file

@ -1,5 +1,3 @@
from typing import Optional
from langflow.custom import CustomComponent
from langflow.schema import Data
@ -46,8 +44,8 @@ class FirecrawlScrapeApi(CustomComponent):
api_key: str,
url: str,
timeout: int = 10000,
pageOptions: Optional[Data] = None,
extractorOptions: Optional[Data] = None,
pageOptions: Data | None = None,
extractorOptions: Data | None = None,
) -> Data:
try:
from firecrawl.firecrawl import FirecrawlApp # type: ignore

View file

@ -1,5 +1,3 @@
from typing import List
import requests
from langchain_groq import ChatGroq
from pydantic.v1 import SecretStr
@ -55,7 +53,7 @@ class GroqModel(LCModelComponent):
),
]
def get_models(self) -> List[str]:
def get_models(self) -> list[str]:
api_key = self.groq_api_key
base_url = self.groq_api_base or "https://api.groq.com"
url = f"{base_url}/openai/v1/models"

View file

@ -1,4 +1,4 @@
from typing import Any, Dict, Optional
from typing import Any
from langchain_community.llms.huggingface_endpoint import HuggingFaceEndpoint
from tenacity import retry, stop_after_attempt, wait_fixed
@ -84,15 +84,15 @@ class HuggingFaceEndpointsComponent(LCModelComponent):
def create_huggingface_endpoint(
self,
model_id: str,
task: Optional[str],
huggingfacehub_api_token: Optional[str],
model_kwargs: Dict[str, Any],
task: str | None,
huggingfacehub_api_token: str | None,
model_kwargs: dict[str, Any],
max_new_tokens: int,
top_k: Optional[int],
top_k: int | None,
top_p: float,
typical_p: Optional[float],
temperature: Optional[float],
repetition_penalty: Optional[float],
typical_p: float | None,
temperature: float | None,
repetition_penalty: float | None,
) -> HuggingFaceEndpoint:
retry_attempts = self.retry_attempts
endpoint_url = self.get_api_url()

View file

@ -1,5 +1,6 @@
import re
from typing import List
from langchain_core.prompts import HumanMessagePromptTemplate
from langchain_core.prompts import HumanMessagePromptTemplate
@ -55,7 +56,7 @@ class LangChainHubPromptComponent(Component):
pattern = r"\{(.*?)\}"
# Get all the custom fields
custom_fields: List[str] = []
custom_fields: list[str] = []
full_template = ""
for message in prompt_template:
# Find all matches

View file

@ -1,5 +1,5 @@
import warnings
from typing import Any, List, Optional
from typing import Any
from langflow.base.langchain_utilities.model import LCToolComponent
from langflow.base.tools.flow_tool import FlowTool
@ -19,11 +19,11 @@ class FlowToolComponent(LCToolComponent):
name = "FlowTool"
beta = True
def get_flow_names(self) -> List[str]:
def get_flow_names(self) -> list[str]:
flow_datas = self.list_flows()
return [flow_data.data["name"] for flow_data in flow_datas]
def get_flow(self, flow_name: str) -> Optional[Data]:
def get_flow(self, flow_name: str) -> Data | None:
"""
Retrieves a flow by its name.

View file

@ -1,5 +1,3 @@
from typing import Optional
from langflow.custom import CustomComponent
from langflow.schema import Data
@ -21,7 +19,7 @@ class NotifyComponent(CustomComponent):
},
}
def build(self, name: str, data: Optional[Data] = None, append: bool = False) -> Data:
def build(self, name: str, data: Data | None = None, append: bool = False) -> Data:
if data and not isinstance(data, Data):
if isinstance(data, str):
data = Data(text=data)

View file

@ -1,4 +1,4 @@
from typing import Callable, List
from collections.abc import Callable
from langflow.custom import Component
from langflow.custom.utils import get_function
@ -46,7 +46,7 @@ class PythonFunctionComponent(Component):
func = get_function(function_code)
return func
def execute_function(self) -> List[dotdict | str] | dotdict | str:
def execute_function(self) -> list[dotdict | str] | dotdict | str:
function_code = self.function_code
if not function_code:
@ -58,7 +58,7 @@ class PythonFunctionComponent(Component):
except Exception as e:
return f"Error executing function: {str(e)}"
def execute_function_data(self) -> List[Data]:
def execute_function_data(self) -> list[Data]:
results = self.execute_function()
results = results if isinstance(results, list) else [results]
data = [(Data(text=x) if isinstance(x, str) else Data(**x)) for x in results]

View file

@ -1,4 +1,4 @@
from typing import Any, List, Optional
from typing import Any
from langflow.base.flow_processing.utils import build_data_from_run_outputs
from langflow.custom import Component
@ -13,7 +13,7 @@ class RunFlowComponent(Component):
name = "RunFlow"
beta: bool = True
def get_flow_names(self) -> List[str]:
def get_flow_names(self) -> list[str]:
flow_data = self.list_flows()
return [flow_data.data["name"] for flow_data in flow_data]
@ -47,12 +47,12 @@ class RunFlowComponent(Component):
Output(display_name="Run Outputs", name="run_outputs", method="generate_results"),
]
async def generate_results(self) -> List[Data]:
async def generate_results(self) -> list[Data]:
if "flow_name" not in self._attributes or not self._attributes["flow_name"]:
raise ValueError("Flow name is required")
flow_name = self._attributes["flow_name"]
results: List[Optional[RunOutputs]] = await self.run_flow(
results: list[RunOutputs | None] = await self.run_flow(
inputs={"input_value": self.input_value}, flow_name=flow_name, tweaks=self.tweaks
)
if isinstance(results, list):

View file

@ -1,4 +1,4 @@
from typing import Any, List, Optional
from typing import Any
from loguru import logger
@ -17,11 +17,11 @@ class SubFlowComponent(Component):
name = "SubFlow"
beta: bool = True
def get_flow_names(self) -> List[str]:
def get_flow_names(self) -> list[str]:
flow_data = self.list_flows()
return [flow_data.data["name"] for flow_data in flow_data]
def get_flow(self, flow_name: str) -> Optional[Data]:
def get_flow(self, flow_name: str) -> Data | None:
flow_datas = self.list_flows()
for flow_data in flow_datas:
if flow_data.data["name"] == flow_name:
@ -50,7 +50,7 @@ class SubFlowComponent(Component):
return build_config
def add_inputs_to_build_config(self, inputs_vertex: List[Vertex], build_config: dotdict):
def add_inputs_to_build_config(self, inputs_vertex: list[Vertex], build_config: dotdict):
new_fields: list[dotdict] = []
for vertex in inputs_vertex:
@ -81,7 +81,7 @@ class SubFlowComponent(Component):
outputs = [Output(name="flow_outputs", display_name="Flow Outputs", method="generate_results")]
async def generate_results(self) -> List[Data]:
async def generate_results(self) -> list[Data]:
tweaks: dict = {}
for field in self._attributes.keys():
if field != "flow_name":

View file

@ -1,4 +1,4 @@
from typing import Optional, cast
from typing import cast
from langchain_community.retrievers import AmazonKendraRetriever
@ -33,10 +33,10 @@ class AmazonKendraRetrieverComponent(CustomComponent):
self,
index_id: str,
top_k: int = 3,
region_name: Optional[str] = None,
credentials_profile_name: Optional[str] = None,
attribute_filter: Optional[dict] = None,
user_context: Optional[dict] = None,
region_name: str | None = None,
credentials_profile_name: str | None = None,
attribute_filter: dict | None = None,
user_context: dict | None = None,
) -> Retriever: # type: ignore[type-var]
try:
output = AmazonKendraRetriever(

View file

@ -1,4 +1,4 @@
from typing import List, cast
from typing import cast
from langchain.retrievers import ContextualCompressionRetriever
from langchain_cohere import CohereRerank
@ -73,7 +73,7 @@ class CohereRerankComponent(LCVectorStoreComponent):
retriever = ContextualCompressionRetriever(base_compressor=cohere_reranker, base_retriever=self.retriever)
return cast(Retriever, retriever)
async def search_documents(self) -> List[Data]: # type: ignore
async def search_documents(self) -> list[Data]: # type: ignore
retriever = self.build_base_retriever()
documents = await retriever.ainvoke(self.search_query, config={"callbacks": self.get_langchain_callbacks()})
data = self.to_data(documents)

View file

@ -1,4 +1,4 @@
from typing import Optional, cast
from typing import cast
from langchain_community.retrievers import MetalRetriever
from metal_sdk.metal import Metal # type: ignore
@ -21,7 +21,7 @@ class MetalRetrieverComponent(CustomComponent):
"code": {"show": False},
}
def build(self, api_key: str, client_id: str, index_id: str, params: Optional[dict] = None) -> Retriever: # type: ignore[type-var]
def build(self, api_key: str, client_id: str, index_id: str, params: dict | None = None) -> Retriever: # type: ignore[type-var]
try:
metal = Metal(api_key=api_key, client_id=client_id, index_id=index_id)
except Exception as e:

View file

@ -1,5 +1,3 @@
from typing import Optional
from langchain.retrievers import MultiQueryRetriever
from langflow.custom import CustomComponent
@ -42,7 +40,7 @@ class MultiQueryRetrieverComponent(CustomComponent):
self,
llm: LanguageModel,
retriever: BaseRetriever,
prompt: Optional[Text] = None,
prompt: Text | None = None,
parser_key: str = "lines",
) -> MultiQueryRetriever:
if not prompt:

View file

@ -1,4 +1,4 @@
from typing import Any, List, cast
from typing import Any, cast
from langchain.retrievers import ContextualCompressionRetriever
@ -70,7 +70,7 @@ class NvidiaRerankComponent(LCVectorStoreComponent):
retriever = ContextualCompressionRetriever(base_compressor=nvidia_reranker, base_retriever=self.retriever)
return cast(Retriever, retriever)
async def search_documents(self) -> List[Data]: # type: ignore
async def search_documents(self) -> list[Data]: # type: ignore
retriever = self.build_base_retriever()
documents = await retriever.ainvoke(self.search_query, config={"callbacks": self.get_langchain_callbacks()})
data = self.to_data(documents)

View file

@ -1,5 +1,3 @@
from typing import List
from langchain.chains.query_constructor.base import AttributeInfo
from langchain.retrievers.self_query.base import SelfQueryRetriever
@ -53,7 +51,7 @@ class SelfQueryRetrieverComponent(Component):
Output(display_name="Retrieved Documents", name="documents", method="retrieve_documents"),
]
def retrieve_documents(self) -> List[Data]:
def retrieve_documents(self) -> list[Data]:
metadata_field_infos = [AttributeInfo(**value.data) for value in self.attribute_infos]
self_query_retriever = SelfQueryRetriever.from_llm(
llm=self.llm,

View file

@ -1,5 +1,5 @@
import json
from typing import List, cast
from typing import cast
from langchain.chains.query_constructor.base import AttributeInfo
from langchain.retrievers.self_query.base import SelfQueryRetriever
@ -40,7 +40,7 @@ class VectaraSelfQueryRetriverComponent(CustomComponent):
vectorstore: VectorStore,
document_content_description: str,
llm: LanguageModel,
metadata_field_info: List[str],
metadata_field_info: list[str],
) -> Retriever: # type: ignore
metadata_field_obj = []

View file

@ -1,4 +1,5 @@
from typing import Any, Sequence
from collections.abc import Sequence
from typing import Any
from composio_langchain import Action, App, ComposioToolSet # type: ignore
from langchain_core.tools import Tool

View file

@ -1,5 +1,3 @@
from typing import List, Union
from langchain_community.agent_toolkits.base import BaseToolkit
from langchain_core.tools import Tool, tool
from metaphor_python import Metaphor # type: ignore
@ -25,7 +23,7 @@ class MetaphorToolkit(CustomComponent):
use_autoprompt: bool = True,
search_num_results: int = 5,
similar_num_results: int = 5,
) -> Union[Tool, BaseToolkit]:
) -> Tool | BaseToolkit:
# If documents, then we need to create a Vectara instance using .from_documents
client = Metaphor(api_key=metaphor_api_key)
@ -35,7 +33,7 @@ class MetaphorToolkit(CustomComponent):
return client.search(query, use_autoprompt=use_autoprompt, num_results=search_num_results)
@tool
def get_contents(ids: List[str]):
def get_contents(ids: list[str]):
"""Get contents of a webpage.
The ids passed in should be a list of ids as fetched from `search`.

View file

@ -1,4 +1,4 @@
from typing import List, cast
from typing import cast
from langchain_community.tools.bing_search import BingSearchResults
from langchain_community.utilities import BingSearchAPIWrapper
@ -24,7 +24,7 @@ class BingSearchAPIComponent(LCToolComponent):
IntInput(name="k", display_name="Number of results", value=4, required=True),
]
def run_model(self) -> List[Data]:
def run_model(self) -> list[Data]:
if self.bing_search_url:
wrapper = BingSearchAPIWrapper(
bing_search_url=self.bing_search_url, bing_subscription_key=self.bing_subscription_key

View file

@ -1,6 +1,5 @@
import ast
import operator
from typing import List
from langchain.tools import StructuredTool
from pydantic import BaseModel, Field
@ -28,7 +27,7 @@ class CalculatorToolComponent(LCToolComponent):
class CalculatorToolSchema(BaseModel):
expression: str = Field(..., description="The arithmetic expression to evaluate.")
def run_model(self) -> List[Data]:
def run_model(self) -> list[Data]:
return self._evaluate_expression(self.expression)
def build_tool(self) -> Tool:
@ -39,7 +38,7 @@ class CalculatorToolComponent(LCToolComponent):
args_schema=self.CalculatorToolSchema,
)
def _evaluate_expression(self, expression: str) -> List[Data]:
def _evaluate_expression(self, expression: str) -> list[Data]:
try:
# Define the allowed operators
operators = {

View file

@ -1,4 +1,4 @@
from typing import Any, Dict, List
from typing import Any
from langchain.tools import StructuredTool
from langchain_community.tools import DuckDuckGoSearchRun
@ -37,7 +37,7 @@ class DuckDuckGoSearchComponent(LCToolComponent):
def build_tool(self) -> Tool:
wrapper = self._build_wrapper()
def search_func(query: str, max_results: int = 5, max_snippet_length: int = 100) -> List[Dict[str, Any]]:
def search_func(query: str, max_results: int = 5, max_snippet_length: int = 100) -> list[dict[str, Any]]:
full_results = wrapper.run(f"{query} (site:*)")
result_list = full_results.split("\n")[:max_results]
limited_results = []
@ -57,7 +57,7 @@ class DuckDuckGoSearchComponent(LCToolComponent):
self.status = "DuckDuckGo Search Tool created"
return tool
def run_model(self) -> List[Data]:
def run_model(self) -> list[Data]:
tool = self.build_tool()
results = tool.run(
{

View file

@ -1,5 +1,5 @@
import json
from typing import Any, Dict, Optional, Union
from typing import Any
from urllib.parse import urljoin
import httpx
@ -33,7 +33,7 @@ class GleanSearchAPIComponent(LCToolComponent):
return Tool(name="glean_search_api", description="Search with the Glean API", func=wrapper.run)
def run_model(self) -> Union[Data, list[Data]]:
def run_model(self) -> Data | list[Data]:
wrapper = self._build_wrapper()
results = wrapper.results(
@ -67,7 +67,7 @@ class GleanSearchAPIComponent(LCToolComponent):
self,
query: str,
page_size: int = 10,
request_options: Optional[Dict[str, Any]] = None,
request_options: dict[str, Any] | None = None,
) -> dict:
# Ensure there's a trailing slash
url = self.glean_api_url
@ -97,7 +97,7 @@ class GleanSearchAPIComponent(LCToolComponent):
return results
def _search_api_results(self, query: str, **kwargs: Any) -> Dict[str, Any]:
def _search_api_results(self, query: str, **kwargs: Any) -> dict[str, Any]:
request_details = self._prepare_request(query, **kwargs)
response = httpx.post(

View file

@ -1,5 +1,3 @@
from typing import Union
from langchain_core.tools import Tool
from langflow.base.langchain_utilities.model import LCToolComponent
@ -22,7 +20,7 @@ class GoogleSearchAPIComponent(LCToolComponent):
IntInput(name="k", display_name="Number of results", value=4, required=True),
]
def run_model(self) -> Union[Data, list[Data]]:
def run_model(self) -> Data | list[Data]:
wrapper = self._build_wrapper()
results = wrapper.results(query=self.input_value, num_results=self.k)
data = [Data(data=result, text=result["snippet"]) for result in results]

View file

@ -1,5 +1,3 @@
from typing import Union
from langchain_community.utilities.google_serper import GoogleSerperAPIWrapper
from langflow.base.langchain_utilities.model import LCToolComponent
@ -22,7 +20,7 @@ class GoogleSerperAPIComponent(LCToolComponent):
IntInput(name="k", display_name="Number of results", value=4, required=True),
]
def run_model(self) -> Union[Data, list[Data]]:
def run_model(self) -> Data | list[Data]:
wrapper = self._build_wrapper()
results = wrapper.results(query=self.input_value)
list_results = results.get("organic", [])

View file

@ -1,5 +1,4 @@
import importlib
from typing import List, Union
from langchain.tools import StructuredTool
from langchain_experimental.utilities import PythonREPL
@ -46,7 +45,7 @@ class PythonREPLToolComponent(LCToolComponent):
class PythonREPLSchema(BaseModel):
code: str = Field(..., description="The Python code to execute.")
def get_globals(self, global_imports: Union[str, List[str]]) -> dict:
def get_globals(self, global_imports: str | list[str]) -> dict:
global_dict = {}
if isinstance(global_imports, str):
modules = [module.strip() for module in global_imports.split(",")]
@ -83,7 +82,7 @@ class PythonREPLToolComponent(LCToolComponent):
self.status = f"Python REPL Tool created with global imports: {self.global_imports}"
return tool
def run_model(self) -> List[Data]:
def run_model(self) -> list[Data]:
tool = self.build_tool()
result = tool.run(self.code)
return [Data(data={"result": result})]

View file

@ -1,4 +1,4 @@
from typing import Any, Dict, List, Optional
from typing import Any
from langchain.tools import StructuredTool
from langchain_community.utilities.searchapi import SearchApiAPIWrapper
@ -30,7 +30,7 @@ class SearchAPIComponent(LCToolComponent):
class SearchAPISchema(BaseModel):
query: str = Field(..., description="The search query")
params: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional search parameters")
params: dict[str, Any] | None = Field(default_factory=dict, description="Additional search parameters")
max_results: int = Field(5, description="Maximum number of results to return")
max_snippet_length: int = Field(100, description="Maximum length of each result snippet")
@ -41,8 +41,8 @@ class SearchAPIComponent(LCToolComponent):
wrapper = self._build_wrapper()
def search_func(
query: str, params: Optional[Dict[str, Any]] = None, max_results: int = 5, max_snippet_length: int = 100
) -> List[Dict[str, Any]]:
query: str, params: dict[str, Any] | None = None, max_results: int = 5, max_snippet_length: int = 100
) -> list[dict[str, Any]]:
params = params or {}
full_results = wrapper.results(query=query, **params)
organic_results = full_results.get("organic_results", [])[:max_results]
@ -68,7 +68,7 @@ class SearchAPIComponent(LCToolComponent):
self.status = f"Search API Tool created with engine: {self.engine}"
return tool
def run_model(self) -> List[Data]:
def run_model(self) -> list[Data]:
tool = self.build_tool()
results = tool.run(
{

View file

@ -1,4 +1,4 @@
from typing import Any, Dict, List, Optional
from typing import Any
from langchain.tools import StructuredTool
from langchain_community.utilities.serpapi import SerpAPIWrapper
@ -28,7 +28,7 @@ class SerpAPIComponent(LCToolComponent):
class SerpAPISchema(BaseModel):
query: str = Field(..., description="The search query")
params: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional search parameters")
params: dict[str, Any] | None = Field(default_factory=dict, description="Additional search parameters")
max_results: int = Field(5, description="Maximum number of results to return")
max_snippet_length: int = Field(100, description="Maximum length of each result snippet")
@ -44,8 +44,8 @@ class SerpAPIComponent(LCToolComponent):
wrapper = self._build_wrapper()
def search_func(
query: str, params: Optional[Dict[str, Any]] = None, max_results: int = 5, max_snippet_length: int = 100
) -> List[Dict[str, Any]]:
query: str, params: dict[str, Any] | None = None, max_results: int = 5, max_snippet_length: int = 100
) -> list[dict[str, Any]]:
params = params or {}
full_results = wrapper.results(query, **params)
organic_results = full_results.get("organic_results", [])[:max_results]
@ -71,7 +71,7 @@ class SerpAPIComponent(LCToolComponent):
self.status = "SerpAPI Tool created"
return tool
def run_model(self) -> List[Data]:
def run_model(self) -> list[Data]:
tool = self.build_tool()
try:
results = tool.run(

View file

@ -1,5 +1,3 @@
from typing import List
from langchain_community.vectorstores import Cassandra
from loguru import logger
@ -219,7 +217,7 @@ class CassandraVectorStoreComponent(LCVectorStoreComponent):
else:
return "similarity"
def search_documents(self) -> List[Data]:
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
logger.debug(f"Search input: {self.search_query}")

View file

@ -1,4 +1,3 @@
from typing import List
from uuid import UUID
from langchain_community.graph_vectorstores import CassandraGraphVectorStore
@ -197,7 +196,7 @@ class CassandraGraphVectorStoreComponent(LCVectorStoreComponent):
else:
return "traversal"
def search_documents(self) -> List[Data]:
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
logger.debug(f"Search input: {self.search_query}")

View file

@ -1,5 +1,3 @@
from typing import List
from langchain_community.vectorstores import Clickhouse, ClickhouseSettings
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
@ -117,7 +115,7 @@ class ClickhouseVectorStoreComponent(LCVectorStoreComponent):
return clickhouse_vs
def search_documents(self) -> List[Data]:
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():

View file

@ -1,5 +1,4 @@
from datetime import timedelta
from typing import List
from langchain_community.vectorstores import CouchbaseVectorStore
@ -92,7 +91,7 @@ class CouchbaseVectorStoreComponent(LCVectorStoreComponent):
return couchbase_vs
def search_documents(self) -> List[Data]:
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():

View file

@ -1,5 +1,3 @@
from typing import List
from langchain_community.vectorstores import FAISS
from loguru import logger
@ -79,7 +77,7 @@ class FaissVectorStoreComponent(LCVectorStoreComponent):
return faiss
def search_documents(self) -> List[Data]:
def search_documents(self) -> list[Data]:
"""
Search for documents in the FAISS vector store.
"""

View file

@ -1,5 +1,3 @@
from typing import List
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import (
@ -108,7 +106,7 @@ class MilvusVectorStoreComponent(LCVectorStoreComponent):
return milvus_store
def search_documents(self) -> List[Data]:
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():

View file

@ -1,5 +1,3 @@
from typing import List
from langchain_community.vectorstores import MongoDBAtlasVectorSearch
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
@ -75,7 +73,7 @@ class MongoVectorStoreComponent(LCVectorStoreComponent):
return vector_store
def search_documents(self) -> List[Data]:
def search_documents(self) -> list[Data]:
from bson import ObjectId
vector_store = self.build_vector_store()

View file

@ -1,5 +1,3 @@
from typing import List
from langchain_pinecone import Pinecone
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
@ -85,7 +83,7 @@ class PineconeVectorStoreComponent(LCVectorStoreComponent):
pinecone.add_documents(documents)
return pinecone
def search_documents(self) -> List[Data]:
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():

View file

@ -1,5 +1,3 @@
from typing import List
from langchain.embeddings.base import Embeddings
from langchain_community.vectorstores import Qdrant
@ -99,7 +97,7 @@ class QdrantVectorStoreComponent(LCVectorStoreComponent):
return qdrant
def search_documents(self) -> List[Data]:
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():

View file

@ -1,5 +1,3 @@
from typing import List
from langchain.text_splitter import CharacterTextSplitter
from langchain_community.vectorstores.redis import Redis
@ -79,7 +77,7 @@ class RedisVectorStoreComponent(LCVectorStoreComponent):
)
return redis_vs
def search_documents(self) -> List[Data]:
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():

View file

@ -1,5 +1,3 @@
from typing import List
from langchain_community.vectorstores import SupabaseVectorStore
from supabase.client import Client, create_client
@ -66,7 +64,7 @@ class SupabaseVectorStoreComponent(LCVectorStoreComponent):
return supabase_vs
def search_documents(self) -> List[Data]:
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():

View file

@ -1,5 +1,3 @@
from typing import List
from langchain_community.vectorstores import UpstashVectorStore
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
@ -114,7 +112,7 @@ class UpstashVectorStoreComponent(LCVectorStoreComponent):
return upstash_vs
def search_documents(self) -> List[Data]:
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():

View file

@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, List
from typing import TYPE_CHECKING
from langchain_community.vectorstores import Vectara
from loguru import logger
@ -93,7 +93,7 @@ class VectaraVectorStoreComponent(LCVectorStoreComponent):
logger.debug("No documents to add to Vectara.")
self.status = "No valid documents to add to Vectara"
def search_documents(self) -> List[Data]:
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():

View file

@ -1,5 +1,3 @@
from typing import List
import weaviate # type: ignore
from langchain_community.vectorstores import Weaviate
@ -70,7 +68,7 @@ class WeaviateVectorStoreComponent(LCVectorStoreComponent):
by_text=self.search_by_text,
)
def search_documents(self) -> List[Data]:
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():

View file

@ -1,5 +1,3 @@
from typing import List
from langchain_community.vectorstores import PGVector
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
@ -63,7 +61,7 @@ class PGVectorStoreComponent(LCVectorStoreComponent):
return pgvector
def search_documents(self) -> List[Data]:
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():

View file

@ -41,7 +41,7 @@ def find_class_ast_node(class_obj):
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef) and node.name == class_obj.__name__:
class_node = node
elif isinstance(node, (ast.Import, ast.ImportFrom)):
elif isinstance(node, ast.Import | ast.ImportFrom):
import_nodes.append(node)
return class_node, import_nodes
@ -275,7 +275,7 @@ class CodeParser:
or any(has_return(child) for child in node.handlers)
or any(has_return(child) for child in node.finalbody)
)
elif isinstance(node, (ast.For, ast.While)):
elif isinstance(node, ast.For | ast.While):
return any(has_return(child) for child in node.body) or any(has_return(child) for child in node.orelse)
elif isinstance(node, ast.With):
return any(has_return(child) for child in node.body)
@ -365,7 +365,7 @@ class CodeParser:
elif isinstance(stmt, ast.AnnAssign):
if attr := self.parse_ann_assign(stmt):
class_details.attributes.append(attr)
elif isinstance(stmt, (ast.FunctionDef, ast.AsyncFunctionDef)):
elif isinstance(stmt, ast.FunctionDef | ast.AsyncFunctionDef):
method, is_init = self.parse_function_def(stmt)
if is_init:
class_details.init = method

View file

@ -1,3 +1,5 @@
from __future__ import annotations
import inspect
from collections.abc import Callable
from copy import deepcopy
@ -33,7 +35,7 @@ CONFIG_ATTRIBUTES = ["_display_name", "_description", "_icon", "_name"]
class Component(CustomComponent):
inputs: list["InputTypes"] = []
inputs: list[InputTypes] = []
outputs: list[Output] = []
code_class_base_inheritance: ClassVar[str] = "Component"
_output_logs: dict[str, Log] = {}
@ -51,7 +53,7 @@ class Component(CustomComponent):
config[key[1:]] = value
else:
inputs[key] = value
self._inputs: dict[str, "InputTypes"] = {}
self._inputs: dict[str, InputTypes] = {}
self._outputs_map: dict[str, Output] = {}
self._results: dict[str, Any] = {}
self._attributes: dict[str, Any] = {}
@ -179,7 +181,7 @@ class Component(CustomComponent):
"""
return await self._run()
def set_vertex(self, vertex: "Vertex"):
def set_vertex(self, vertex: Vertex):
"""
Sets the vertex for the component.
@ -258,7 +260,7 @@ class Component(CustomComponent):
# allows each instance of each component to modify its own output
self._outputs_map[output.name] = deepcopy(output)
def map_inputs(self, inputs: list["InputTypes"]):
def map_inputs(self, inputs: list[InputTypes]):
"""
Maps the given inputs to the component.
@ -325,7 +327,7 @@ class Component(CustomComponent):
text += f"{output.name}[{','.join(output.types)}]->{input_.name}[{','.join(input_.input_types or [])}]\n"
return text
def _find_matching_output_method(self, value: "Component"):
def _find_matching_output_method(self, value: Component):
# get all outputs of the value component
outputs = value._outputs_map.values()
# check if the any of the types in the output.types matches ONLY one input in the current component
@ -652,7 +654,7 @@ class Component(CustomComponent):
output.value = result
custom_repr = self.custom_repr()
if custom_repr is None and isinstance(result, (dict, Data, str)):
if custom_repr is None and isinstance(result, dict | Data | str):
custom_repr = result
if not isinstance(custom_repr, str):
custom_repr = str(custom_repr)
@ -670,7 +672,7 @@ class Component(CustomComponent):
elif hasattr(raw, "model_dump") and raw is not None:
raw = raw.model_dump()
if raw is None and isinstance(result, (dict, Data, str)):
if raw is None and isinstance(result, dict | Data | str):
raw = result.data if isinstance(result, Data) else result
artifact_type = get_artifact_type(artifact_value, result)
raw, artifact_type = post_process_raw(raw, artifact_type)

View file

@ -1,6 +1,8 @@
from __future__ import annotations
from collections.abc import Callable, Sequence
from pathlib import Path
from typing import TYPE_CHECKING, Any, ClassVar, Optional
from typing import TYPE_CHECKING, Any, ClassVar
import yaml
from cachetools import TTLCache
@ -69,7 +71,7 @@ class CustomComponent(BaseComponent):
"""The default frozen state of the component. Defaults to False."""
build_parameters: dict | None = None
"""The build parameters of the component. Defaults to None."""
_vertex: Optional["Vertex"] = None
_vertex: Vertex | None = None
"""The edge target parameter of the component. Defaults to None."""
_code_class_base_inheritance: ClassVar[str] = "CustomComponent"
function_entrypoint_name: ClassVar[str] = "build"
@ -81,7 +83,7 @@ class CustomComponent(BaseComponent):
_outputs: list[OutputValue] = []
_logs: list[Log] = []
_output_logs: dict[str, Log] = {}
_tracing_service: Optional["TracingService"] = None
_tracing_service: TracingService | None = None
_tree: dict | None = None
def __init__(self, **data):
@ -158,7 +160,7 @@ class CustomComponent(BaseComponent):
return str(path_object)
def get_full_path(self, path: str) -> str:
storage_svc: "StorageService" = get_storage_service()
storage_svc: StorageService = get_storage_service()
flow_id, file_name = path.split("/", 1)
return storage_svc.build_full_path(flow_id, file_name)
@ -464,7 +466,7 @@ class CustomComponent(BaseComponent):
"""
return validate.create_function(self._code, self._function_entrypoint_name)
async def load_flow(self, flow_id: str, tweaks: dict | None = None) -> "Graph":
async def load_flow(self, flow_id: str, tweaks: dict | None = None) -> Graph:
if not self.user_id:
raise ValueError("Session is invalid")
return await load_flow(user_id=str(self._user_id), flow_id=flow_id, tweaks=tweaks)
@ -517,7 +519,7 @@ class CustomComponent(BaseComponent):
)
return frontend_node
def get_langchain_callbacks(self) -> list["BaseCallbackHandler"]:
def get_langchain_callbacks(self) -> list[BaseCallbackHandler]:
if self._tracing_service:
return self._tracing_service.get_langchain_callbacks()
return []

View file

@ -1,4 +1,5 @@
from typing import Callable, Dict, Text, TypeAlias, TypeVar, Union
from collections.abc import Callable
from typing import Text, TypeAlias, TypeVar
from langchain.agents.agent import AgentExecutor
from langchain.chains.base import Chain
@ -20,7 +21,7 @@ from langchain_text_splitters import TextSplitter
from langflow.schema.data import Data
from langflow.schema.message import Message
NestedDict: TypeAlias = Dict[str, Union[str, Dict]]
NestedDict: TypeAlias = dict[str, str | dict]
LanguageModel = TypeVar("LanguageModel", BaseLanguageModel, BaseLLM, BaseChatModel)
ToolEnabledLanguageModel = TypeVar("ToolEnabledLanguageModel", BaseLanguageModel, BaseLLM, BaseChatModel)
Retriever = TypeVar(
@ -66,7 +67,7 @@ CUSTOM_COMPONENT_SUPPORTED_TYPES = {
"NestedDict": NestedDict,
"Data": Data,
"Message": Message,
"Text": Text,
"Text": Text, # noqa: UP019
"Object": Object,
"Callable": Callable,
"LanguageModel": LanguageModel,

View file

@ -1,3 +1,5 @@
from __future__ import annotations
import asyncio
import copy
import json
@ -8,7 +10,7 @@ from collections.abc import Generator, Iterable
from datetime import datetime, timezone
from functools import partial
from itertools import chain
from typing import TYPE_CHECKING, Any, Optional, cast
from typing import TYPE_CHECKING, Any, cast
import nest_asyncio
from loguru import logger
@ -53,8 +55,8 @@ class Graph:
def __init__(
self,
start: Optional["Component"] = None,
end: Optional["Component"] = None,
start: Component | None = None,
end: Component | None = None,
flow_id: str | None = None,
flow_name: str | None = None,
description: str | None = None,
@ -115,7 +117,7 @@ class Graph:
self._call_order: list[str] = []
self._snapshots: list[dict[str, Any]] = []
try:
self.tracing_service: "TracingService" | None = get_tracing_service()
self.tracing_service: TracingService | None = get_tracing_service()
except Exception as exc:
logger.error(f"Error getting tracing service: {exc}")
self.tracing_service = None
@ -203,7 +205,7 @@ class Graph:
self._edges = self._graph_data["edges"]
self.initialize()
def add_component(self, component: "Component", component_id: Optional[str] = None) -> str:
def add_component(self, component: Component, component_id: str | None = None) -> str:
component_id = component_id or component._id
if component_id in self.vertex_map:
return component_id
@ -225,7 +227,7 @@ class Graph:
return component_id
def _set_start_and_end(self, start: "Component", end: "Component"):
def _set_start_and_end(self, start: Component, end: Component):
if not hasattr(start, "to_frontend_node"):
raise TypeError(f"start must be a Component. Got {type(start)}")
if not hasattr(end, "to_frontend_node"):
@ -613,7 +615,7 @@ class Graph:
stream: bool,
session_id: str,
fallback_to_env_vars: bool,
) -> list[Optional["ResultData"]]:
) -> list[ResultData | None]:
"""
Runs the graph with the given inputs.
@ -808,7 +810,7 @@ class Graph:
"flow_name": self.flow_name,
}
def build_graph_maps(self, edges: list[CycleEdge] | None = None, vertices: list["Vertex"] | None = None):
def build_graph_maps(self, edges: list[CycleEdge] | None = None, vertices: list[Vertex] | None = None):
"""
Builds the adjacency maps for the graph.
"""
@ -878,7 +880,7 @@ class Graph:
return edge
return None
def build_parent_child_map(self, vertices: list["Vertex"]):
def build_parent_child_map(self, vertices: list[Vertex]):
parent_child_map = defaultdict(list)
for vertex in vertices:
parent_child_map[vertex.id] = [child.id for child in self.get_successors(vertex)]
@ -977,7 +979,7 @@ class Graph:
flow_id: str | None = None,
flow_name: str | None = None,
user_id: str | None = None,
) -> "Graph":
) -> Graph:
"""
Creates a graph from a payload.
@ -1016,7 +1018,7 @@ class Graph:
# both graphs have the same vertices and edges
# but the data of the vertices might be different
def update_edges_from_vertex(self, vertex: "Vertex", other_vertex: "Vertex") -> None:
def update_edges_from_vertex(self, vertex: Vertex, other_vertex: Vertex) -> None:
"""Updates the edges of a vertex in the Graph."""
new_edges = []
for edge in self.edges:
@ -1026,13 +1028,13 @@ class Graph:
new_edges += other_vertex.edges
self.edges = new_edges
def vertex_data_is_identical(self, vertex: "Vertex", other_vertex: "Vertex") -> bool:
def vertex_data_is_identical(self, vertex: Vertex, other_vertex: Vertex) -> bool:
data_is_equivalent = vertex == other_vertex
if not data_is_equivalent:
return False
return self.vertex_edges_are_identical(vertex, other_vertex)
def vertex_edges_are_identical(self, vertex: "Vertex", other_vertex: "Vertex") -> bool:
def vertex_edges_are_identical(self, vertex: Vertex, other_vertex: Vertex) -> bool:
same_length = len(vertex.edges) == len(other_vertex.edges)
if not same_length:
return False
@ -1041,7 +1043,7 @@ class Graph:
return False
return True
def update(self, other: "Graph") -> "Graph":
def update(self, other: Graph) -> Graph:
# Existing vertices in self graph
existing_vertex_ids = {vertex.id for vertex in self.vertices}
# Vertex IDs in the other graph
@ -1091,7 +1093,7 @@ class Graph:
self.increment_update_count()
return self
def update_vertex_from_another(self, vertex: "Vertex", other_vertex: "Vertex") -> None:
def update_vertex_from_another(self, vertex: Vertex, other_vertex: Vertex) -> None:
"""
Updates a vertex from another vertex.
@ -1129,12 +1131,12 @@ class Graph:
self.vertices.append(vertex)
self.vertex_map[vertex.id] = vertex
def add_vertex(self, vertex: "Vertex") -> None:
def add_vertex(self, vertex: Vertex) -> None:
"""Adds a new vertex to the graph."""
self._add_vertex(vertex)
self._update_edges(vertex)
def _update_edges(self, vertex: "Vertex") -> None:
def _update_edges(self, vertex: Vertex) -> None:
"""Updates the edges of a vertex."""
# Vertex has edges, so we need to update the edges
for edge in vertex.edges:
@ -1169,19 +1171,19 @@ class Graph:
for vertex in self.vertices:
vertex._build_params()
def _validate_vertex(self, vertex: "Vertex") -> bool:
def _validate_vertex(self, vertex: Vertex) -> bool:
"""Validates a vertex."""
# All vertices that do not have edges are invalid
return len(self.get_vertex_edges(vertex.id)) > 0
def get_vertex(self, vertex_id: str, silent: bool = False) -> "Vertex":
def get_vertex(self, vertex_id: str, silent: bool = False) -> Vertex:
"""Returns a vertex by id."""
try:
return self.vertex_map[vertex_id]
except KeyError:
raise ValueError(f"Vertex {vertex_id} not found")
def get_root_of_group_node(self, vertex_id: str) -> "Vertex":
def get_root_of_group_node(self, vertex_id: str) -> Vertex:
"""Returns the root of a group node."""
if vertex_id in self.top_level_vertices:
# Get all vertices with vertex_id as .parent_node_id
@ -1205,7 +1207,7 @@ class Graph:
async def astep(
self,
inputs: Optional["InputValueRequest"] = None,
inputs: InputValueRequest | None = None,
files: list[str] | None = None,
user_id: str | None = None,
event_manager: EventManager | None = None,
@ -1259,7 +1261,7 @@ class Graph:
def step(
self,
inputs: Optional["InputValueRequest"] = None,
inputs: InputValueRequest | None = None,
files: list[str] | None = None,
user_id: str | None = None,
):
@ -1382,9 +1384,9 @@ class Graph:
or (edge.target_id == vertex_id and is_target is not False)
]
def get_vertices_with_target(self, vertex_id: str) -> list["Vertex"]:
def get_vertices_with_target(self, vertex_id: str) -> list[Vertex]:
"""Returns the vertices connected to a vertex."""
vertices: list["Vertex"] = []
vertices: list[Vertex] = []
for edge in self.edges:
if edge.target_id == vertex_id:
vertex = self.get_vertex(edge.source_id)
@ -1393,7 +1395,7 @@ class Graph:
vertices.append(vertex)
return vertices
async def process(self, fallback_to_env_vars: bool, start_component_id: str | None = None) -> "Graph":
async def process(self, fallback_to_env_vars: bool, start_component_id: str | None = None) -> Graph:
"""Processes the graph with vertices in each layer run in parallel."""
first_layer = self.sort_vertices(start_component_id=start_component_id)
@ -1450,7 +1452,7 @@ class Graph:
return list(next_runnable_vertices)
async def get_next_runnable_vertices(self, lock: asyncio.Lock, vertex: "Vertex", cache: bool = True) -> list[str]:
async def get_next_runnable_vertices(self, lock: asyncio.Lock, vertex: Vertex, cache: bool = True) -> list[str]:
v_id = vertex.id
v_successors_ids = vertex.successors_ids
async with lock:
@ -1471,7 +1473,7 @@ class Graph:
"""Executes tasks in parallel, handling exceptions for each task."""
results = []
completed_tasks = await asyncio.gather(*tasks, return_exceptions=True)
vertices: list["Vertex"] = []
vertices: list[Vertex] = []
for i, result in enumerate(completed_tasks):
task_name = tasks[i].get_name()
@ -1500,7 +1502,7 @@ class Graph:
no_duplicate_results = list(set(results))
return no_duplicate_results
def topological_sort(self) -> list["Vertex"]:
def topological_sort(self) -> list[Vertex]:
"""
Performs a topological sort of the vertices in the graph.
@ -1533,7 +1535,7 @@ class Graph:
return list(reversed(sorted_vertices))
def generator_build(self) -> Generator["Vertex", None, None]:
def generator_build(self) -> Generator[Vertex, None, None]:
"""Builds each vertex in the graph and yields it."""
sorted_vertices = self.topological_sort()
logger.debug("There are %s vertices in the graph", len(sorted_vertices))
@ -1543,7 +1545,7 @@ class Graph:
"""Returns the predecessors of a vertex."""
return [self.get_vertex(source_id) for source_id in self.predecessor_map.get(vertex.id, [])]
def get_all_successors(self, vertex: "Vertex", recursive=True, flat=True, visited=None):
def get_all_successors(self, vertex: Vertex, recursive=True, flat=True, visited=None):
if visited is None:
visited = set()
@ -1576,13 +1578,13 @@ class Graph:
return successors_result
def get_successors(self, vertex: "Vertex") -> list["Vertex"]:
def get_successors(self, vertex: Vertex) -> list[Vertex]:
"""Returns the successors of a vertex."""
return [self.get_vertex(target_id) for target_id in self.successor_map.get(vertex.id, [])]
def get_vertex_neighbors(self, vertex: "Vertex") -> dict["Vertex", int]:
def get_vertex_neighbors(self, vertex: Vertex) -> dict[Vertex, int]:
"""Returns the neighbors of a vertex."""
neighbors: dict["Vertex", int] = {}
neighbors: dict[Vertex, int] = {}
for edge in self.edges:
if edge.source_id == vertex.id:
neighbor = self.get_vertex(edge.target_id)
@ -1638,7 +1640,7 @@ class Graph:
new_edge = Edge(source, target, edge)
return new_edge
def _get_vertex_class(self, node_type: str, node_base_type: str, node_id: str) -> type["Vertex"]:
def _get_vertex_class(self, node_type: str, node_base_type: str, node_id: str) -> type[Vertex]:
"""Returns the node class based on the node type."""
# First we check for the node_base_type
node_name = node_id.split("-")[0]
@ -1655,9 +1657,9 @@ class Graph:
return lazy_load_vertex_dict.VERTEX_TYPE_MAP[node_type]
return Vertex
def _build_vertices(self) -> list["Vertex"]:
def _build_vertices(self) -> list[Vertex]:
"""Builds the vertices of the graph."""
vertices: list["Vertex"] = []
vertices: list[Vertex] = []
for frontend_data in self._vertices:
if frontend_data.get("type") == NodeTypeEnum.NoteNode:
continue
@ -1730,7 +1732,7 @@ class Graph:
def layered_topological_sort(
self,
vertices: list["Vertex"],
vertices: list[Vertex],
filter_graphs: bool = False,
) -> list[list[str]]:
"""Performs a layered topological sort of the vertices in the graph."""
@ -1986,7 +1988,7 @@ class Graph:
runnable_vertices = []
visited = set()
def find_runnable_predecessors(predecessor: "Vertex"):
def find_runnable_predecessors(predecessor: Vertex):
predecessor_id = predecessor.id
if predecessor_id in visited:
return

View file

@ -1,3 +1,5 @@
from __future__ import annotations
from collections.abc import Callable
from typing import TYPE_CHECKING
@ -12,7 +14,7 @@ if TYPE_CHECKING:
class GraphStateManager:
def __init__(self):
try:
self.state_service: "StateService" = get_state_service()
self.state_service: StateService = get_state_service()
except Exception as e:
logger.debug(f"Error getting state service. Defaulting to InMemoryStateService: {e}")
from langflow.services.state.service import InMemoryStateService

Some files were not shown because too many files have changed in this diff Show more