ref: Add ruff rules for pydocstyle (D) (#4120)

Add ruff rules for pydocstyle (D)
This commit is contained in:
Christophe Bornet 2024-10-14 17:17:50 +02:00 committed by GitHub
commit b1a79c0749
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
119 changed files with 634 additions and 1245 deletions

View file

@ -46,9 +46,7 @@ def get_number_of_workers(workers=None):
def display_results(results):
"""
Display the results of the migration.
"""
"""Display the results of the migration."""
for table_results in results:
table = Table(title=f"Migration {table_results.table_name}")
table.add_column("Name")
@ -149,10 +147,7 @@ def run(
show_default=False,
),
):
"""
Run Langflow.
"""
"""Run Langflow."""
configure(log_level=log_level, log_file=log_file)
set_var_for_macos_issue()
@ -225,9 +220,7 @@ def run(
def wait_for_server_ready(host, port):
"""
Wait for the server to become ready by polling the health endpoint.
"""
"""Wait for the server to become ready by polling the health endpoint."""
status_code = 0
while status_code != httpx.codes.OK:
try:
@ -249,16 +242,13 @@ def run_on_mac_or_linux(host, port, log_level, options, app):
def run_on_windows(host, port, log_level, options, app):
"""
Run the Langflow server on Windows.
"""
"""Run the Langflow server on Windows."""
print_banner(host, port)
run_langflow(host, port, log_level, options, app)
def is_port_in_use(port, host="localhost"):
"""
Check if a port is in use.
"""Check if a port is in use.
Args:
port (int): The port number to check.
@ -272,8 +262,7 @@ def is_port_in_use(port, host="localhost"):
def get_free_port(port):
"""
Given a used port, find a free port.
"""Given a used port, find a free port.
Args:
port (int): The port number to check.
@ -287,9 +276,7 @@ def get_free_port(port):
def get_letter_from_version(version: str):
"""
Get the letter from a pre-release version.
"""
"""Get the letter from a pre-release version."""
if "a" in version:
return "a"
if "b" in version:
@ -308,9 +295,7 @@ def build_version_notice(current_version: str, package_name: str) -> str:
def generate_pip_command(package_names, is_pre_release):
"""
Generate the pip install command based on the packages and whether it's a pre-release.
"""
"""Generate the pip install command based on the packages and whether it's a pre-release."""
base_command = "pip install"
if is_pre_release:
return f"{base_command} {' '.join(package_names)} -U --pre"
@ -369,10 +354,7 @@ def print_banner(host: str, port: int):
def run_langflow(host, port, log_level, options, app):
"""
Run Langflow server on localhost
"""
"""Run Langflow server on localhost."""
if platform.system() == "Windows":
# Run using uvicorn on MacOS and Windows
# Windows doesn't support gunicorn
@ -398,9 +380,7 @@ def superuser(
password: str = typer.Option(..., prompt=True, hide_input=True, help="Password for the superuser."),
log_level: str = typer.Option("error", help="Logging level.", envvar="LANGFLOW_LOG_LEVEL"),
):
"""
Create a superuser.
"""
"""Create a superuser."""
configure(log_level=log_level)
initialize_services()
db_service = get_db_service()
@ -432,8 +412,7 @@ def superuser(
# because now the database is stored per installation
@app.command()
def copy_db():
"""
Copy the database files to the current directory.
"""Copy the database files to the current directory.
This function copies the 'langflow.db' and 'langflow-pre.db' files from the cache directory to the current
directory.
@ -472,9 +451,7 @@ def migration(
help="Fix migrations. This is a destructive operation, and should only be used if you know what you are doing.",
),
):
"""
Run or test migrations.
"""
"""Run or test migrations."""
if fix and not typer.confirm(
"This will delete all data necessary to fix migrations. Are you sure you want to continue?"
):
@ -492,8 +469,7 @@ def migration(
def api_key(
log_level: str = typer.Option("error", help="Logging level."),
):
"""
Creates an API key for the default superuser if AUTO_LOGIN is enabled.
"""Creates an API key for the default superuser if AUTO_LOGIN is enabled.
Args:
log_level (str, optional): Logging level. Defaults to "error".

View file

@ -55,10 +55,10 @@ async def event_generator(request: Request):
async def stream_logs(
request: Request,
):
"""
HTTP/2 Server-Sent-Event (SSE) endpoint for streaming logs
it establishes a long-lived connection to the server and receives log messages in real-time
the client should use the head "Accept: text/event-stream"
"""HTTP/2 Server-Sent-Event (SSE) endpoint for streaming logs.
It establishes a long-lived connection to the server and receives log messages in real-time.
The client should use the header "Accept: text/event-stream".
"""
global log_buffer # noqa: PLW0602
if log_buffer.enabled() is False:

View file

@ -43,7 +43,6 @@ def remove_api_keys(flow: dict):
def build_input_keys_response(langchain_object, artifacts):
"""Build the input keys response."""
input_keys_response = {
"input_keys": dict.fromkeys(langchain_object.input_keys, ""),
"memory_keys": [],
@ -201,8 +200,7 @@ def format_exception_message(exc: Exception) -> str:
def get_top_level_vertices(graph, vertices_ids):
"""
Retrieves the top-level vertices from the given graph based on the provided vertex IDs.
"""Retrieves the top-level vertices from the given graph based on the provided vertex IDs.
Args:
graph (Graph): The graph object containing the vertices.

View file

@ -78,16 +78,17 @@ async def retrieve_vertices_order(
session=Depends(get_session),
telemetry_service: TelemetryService = Depends(get_telemetry_service),
):
"""
Retrieve the vertices order for a given flow.
"""Retrieve the vertices order for a given flow.
Args:
flow_id (str): The ID of the flow.
background_tasks (BackgroundTasks): The background tasks.
data (Optional[FlowDataRequest], optional): The flow data. Defaults to None.
stop_component_id (str, optional): The ID of the stop component. Defaults to None.
start_component_id (str, optional): The ID of the start component. Defaults to None.
chat_service (ChatService, optional): The chat service dependency. Defaults to Depends(get_chat_service).
session (Session, optional): The session dependency. Defaults to Depends(get_session).
telemetry_service (TelemetryService, optional): The telemetry service.
Returns:
VerticesOrderResponse: The response containing the ordered vertex IDs and the run ID.
@ -471,8 +472,10 @@ async def build_vertex(
vertex_id (str): The ID of the vertex to build.
background_tasks (BackgroundTasks): The background tasks dependency.
inputs (Optional[InputValueRequest], optional): The input values for the vertex. Defaults to None.
files (List[str], optional): The files to use. Defaults to None.
chat_service (ChatService, optional): The chat service dependency. Defaults to Depends(get_chat_service).
current_user (Any, optional): The current user dependency. Defaults to Depends(get_current_active_user).
telemetry_service (TelemetryService, optional): The telemetry service.
Returns:
VertexBuildResponse: The response containing the built vertex information.

View file

@ -156,9 +156,7 @@ async def simple_run_flow_task(
stream: bool = False,
api_key_user: User | None = None,
):
"""
Run a flow task as a BackgroundTask, therefore it should not throw exceptions.
"""
"""Run a flow task as a BackgroundTask, therefore it should not throw exceptions."""
try:
return await simple_run_flow(
flow=flow,
@ -180,9 +178,10 @@ async def simplified_run_flow(
api_key_user: UserRead = Depends(api_key_security),
telemetry_service: TelemetryService = Depends(get_telemetry_service),
):
"""
Executes a specified flow by ID with input customization, performance enhancements through caching, and optional
data streaming.
"""Executes a specified flow by ID.
Executes a specified flow by ID with input customization, performance enhancements through caching,
and optional data streaming.
### Parameters:
- `db` (Session): Database session for executing queries.
@ -301,15 +300,14 @@ async def webhook_run_flow(
background_tasks: BackgroundTasks,
telemetry_service: Annotated[TelemetryService, Depends(get_telemetry_service)],
):
"""
Run a flow using a webhook request.
"""Run a flow using a webhook request.
Args:
db (Session): The database session.
flow (Flow, optional): The flow to be executed. Defaults to Depends(get_flow_by_id).
user (User): The flow user.
request (Request): The incoming HTTP request.
background_tasks (BackgroundTasks): The background tasks manager.
session_service (SessionService, optional): The session service. Defaults to Depends(get_session_service).
flow (Flow, optional): The flow to be executed. Defaults to Depends(get_flow_by_id).
telemetry_service (TelemetryService): The telemetry service.
Returns:
dict: A dictionary containing the status of the task.
@ -382,8 +380,8 @@ async def experimental_run_flow(
api_key_user: UserRead = Depends(api_key_security),
session_service: SessionService = Depends(get_session_service),
):
"""
Executes a specified flow by ID with optional input values, output selection, tweaks, and streaming capability.
"""Executes a specified flow by ID with optional input values, output selection, tweaks, and streaming capability.
This endpoint supports running flows with caching to enhance performance and efficiency.
### Parameters:
@ -511,9 +509,7 @@ async def process(
sync: Annotated[bool, Body(embed=True)] = True,
session_service: SessionService = Depends(get_session_service),
):
"""
Endpoint to process an input with a given flow_id.
"""
"""Endpoint to process an input with a given flow_id."""
# Raise a depreciation warning
logger.warning(
"The /process endpoint is deprecated and will be removed in a future version. Please use /run instead."
@ -598,8 +594,7 @@ async def custom_component_update(
code_request: UpdateCustomComponentRequest,
user: Annotated[User, Depends(get_current_active_user)],
):
"""
Update a custom component with the provided code request.
"""Update a custom component with the provided code request.
This endpoint generates the CustomComponentFrontendNode normally but then runs the `update_build_config` method
on the latest version of the template.

View file

@ -137,8 +137,7 @@ def read_flows(
params: Params = Depends(),
header_flows: bool = False,
):
"""
Retrieve a list of flows with pagination support.
"""Retrieve a list of flows with pagination support.
Args:
current_user (User): The current authenticated user.
@ -149,10 +148,11 @@ def read_flows(
folder_id (UUID, optional): The folder ID. Defaults to None.
params (Params): Pagination parameters.
remove_example_flows (bool, optional): Whether to remove example flows. Defaults to False.
header_flows (bool, optional): Whether to return only specific headers of the flows. Defaults to False.
Returns:
Union[list[FlowRead], Page[FlowRead]]: A list of flows or a paginated response containing the list of flows.
"""
try:
auth_settings = settings_service.auth_settings
@ -364,12 +364,12 @@ async def delete_multiple_flows(
user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[Session, Depends(get_session)],
):
"""
Delete multiple flows by their IDs.
"""Delete multiple flows by their IDs.
Args:
flow_ids (List[str]): The list of flow IDs to delete.
user (User, optional): The user making the request. Defaults to the current active user.
db (Session, optional): The database session.
Returns:
dict: A dictionary containing the number of flows deleted.
@ -441,8 +441,7 @@ def read_basic_examples(
*,
session: Session = Depends(get_session),
):
"""
Retrieve a list of basic example flows.
"""Retrieve a list of basic example flows.
Args:
session (Session): The database session.
@ -450,7 +449,6 @@ def read_basic_examples(
Returns:
list[FlowRead]: A list of basic example flows.
"""
try:
# Get the starter folder
starter_folder = session.exec(select(Folder).where(Folder.name == STARTER_FOLDER_NAME)).first()

View file

@ -28,9 +28,7 @@ def add_user(
session: Annotated[Session, Depends(get_session)],
settings_service=Depends(get_settings_service),
) -> User:
"""
Add a new user to the database.
"""
"""Add a new user to the database."""
new_user = User.model_validate(user, from_attributes=True)
try:
new_user.password = get_password_hash(user.password)
@ -52,9 +50,7 @@ def add_user(
def read_current_user(
current_user: Annotated[User, Depends(get_current_active_user)],
) -> User:
"""
Retrieve the current user's data.
"""
"""Retrieve the current user's data."""
return current_user
@ -65,9 +61,7 @@ def read_all_users(
_: Session = Depends(get_current_active_superuser),
session: Session = Depends(get_session),
) -> UsersResponse:
"""
Retrieve a list of users from the database with pagination.
"""
"""Retrieve a list of users from the database with pagination."""
query: SelectOfScalar = select(User).offset(skip).limit(limit)
users = session.exec(query).fetchall()
@ -87,10 +81,7 @@ def patch_user(
user: Annotated[User, Depends(get_current_active_user)],
session: Annotated[Session, Depends(get_session)],
) -> User:
"""
Update an existing user's data.
"""
"""Update an existing user's data."""
update_password = user_update.password is not None and user_update.password != ""
if not user.is_superuser and user_update.is_superuser:
@ -117,9 +108,7 @@ def reset_password(
user: Annotated[User, Depends(get_current_active_user)],
session: Annotated[Session, Depends(get_session)],
) -> User:
"""
Reset a user's password.
"""
"""Reset a user's password."""
if user_id != user.id:
raise HTTPException(status_code=400, detail="You can't change another user's password")
@ -141,9 +130,7 @@ def delete_user(
current_user: Annotated[User, Depends(get_current_active_superuser)],
session: Annotated[Session, Depends(get_session)],
) -> dict:
"""
Delete a user from the database.
"""
"""Delete a user from the database."""
if current_user.id == user_id:
raise HTTPException(status_code=400, detail="You can't delete your own user account")
if not current_user.is_superuser:

View file

@ -36,8 +36,7 @@ class AgentSpec(BaseModel):
def data_to_messages(data: list[Data]) -> list[BaseMessage]:
"""
Convert a list of data to a list of messages.
"""Convert a list of data to a list of messages.
Args:
data (List[Data]): The data to convert.

View file

@ -1,5 +1,4 @@
"""
This module contains constants used in the Langflow base module.
"""This module contains constants used in the Langflow base module.
Constants:
- STREAM_INFO_TEXT: A string representing the information about streaming the response from the model.

View file

@ -1,5 +1,4 @@
"""
This file contains a fix for the implementation of the `uncurl` library, which is available at https://github.com/spulec/uncurl.git.
r"""This file contains a fix for the implementation of the `uncurl` library, which is available at https://github.com/spulec/uncurl.git.
The `uncurl` library provides a way to parse and convert cURL commands into Python requests.
However, there are some issues with the original implementation that this file aims to fix.

View file

@ -36,12 +36,8 @@ class LCDocumentTransformerComponent(Component):
@abstractmethod
def get_data_input(self) -> Any:
"""
Get the data input.
"""
"""Get the data input."""
@abstractmethod
def build_document_transformer(self) -> BaseDocumentTransformer:
"""
Build the text splitter.
"""
"""Build the text splitter."""

View file

@ -6,8 +6,7 @@ from langflow.schema.message import Message
def build_data_from_run_outputs(run_outputs: RunOutputs) -> list[Data]:
"""
Build a list of data from the given RunOutputs.
"""Build a list of data from the given RunOutputs.
Args:
run_outputs (RunOutputs): The RunOutputs object containing the output data.
@ -26,8 +25,7 @@ def build_data_from_run_outputs(run_outputs: RunOutputs) -> list[Data]:
def build_data_from_result_data(result_data: ResultData, get_final_results_only: bool = True) -> list[Data]:
"""
Build a list of data from the given ResultData.
"""Build a list of data from the given ResultData.
Args:
result_data (ResultData): The ResultData object containing the result data.
@ -76,8 +74,7 @@ def build_data_from_result_data(result_data: ResultData, get_final_results_only:
def format_flow_output_data(data: list[Data]) -> str:
"""
Format the flow output data into a string.
"""Format the flow output data into a string.
Args:
data (List[Data]): The list of data to format.

View file

@ -27,12 +27,8 @@ class LCToolComponent(Component):
@abstractmethod
def run_model(self) -> Data | list[Data]:
"""
Run model and return the output.
"""
"""Run model and return the output."""
@abstractmethod
def build_tool(self) -> Tool | Sequence[Tool]:
"""
Build the tool.
"""
"""Build the tool."""

View file

@ -33,6 +33,4 @@ class LCChatMemoryComponent(Component):
@abstractmethod
def build_message_history(self) -> BaseChatMessageHistory:
"""
Builds the chat message history memory.
"""
"""Builds the chat message history memory."""

View file

@ -64,11 +64,10 @@ class LCModelComponent(Component):
return result
def get_result(self, runnable: LLM, stream: bool, input_value: str):
"""
Retrieves the result from the output of a Runnable object.
"""Retrieves the result from the output of a Runnable object.
Args:
output (Runnable): The output object to retrieve the result from.
runnable (Runnable): The runnable to retrieve the result from.
stream (bool): Indicates whether to use streaming or invocation mode.
input_value (str): The input value to pass to the output object.
@ -90,8 +89,7 @@ class LCModelComponent(Component):
return result
def build_status_message(self, message: AIMessage):
"""
Builds a status message from an AIMessage object.
"""Builds a status message from an AIMessage object.
Args:
message (AIMessage): The AIMessage object to build the status message from.
@ -204,6 +202,4 @@ class LCModelComponent(Component):
@abstractmethod
def build_model(self) -> LanguageModel: # type: ignore[type-var]
"""
Implement this method to build the model.
"""
"""Implement this method to build the model."""

View file

@ -6,8 +6,7 @@ from langflow.schema import Data
def data_to_string(record: Data) -> str:
"""
Convert a record to a string.
"""Convert a record to a string.
Args:
record (Data): The record to convert.
@ -19,8 +18,7 @@ def data_to_string(record: Data) -> str:
def dict_values_to_string(d: dict) -> dict:
"""
Converts the values of a dictionary to strings.
"""Converts the values of a dictionary to strings.
Args:
d (dict): The dictionary whose values need to be converted.
@ -52,8 +50,7 @@ def dict_values_to_string(d: dict) -> dict:
def document_to_string(document: Document) -> str:
"""
Convert a document to a string.
"""Convert a document to a string.
Args:
document (Document): The document to convert.

View file

@ -25,6 +25,4 @@ class LCTextSplitterComponent(LCDocumentTransformerComponent):
@abstractmethod
def build_text_splitter(self) -> TextSplitter:
"""
Build the text splitter.
"""
"""Build the text splitter."""

View file

@ -2,8 +2,7 @@ from langflow.field_typing import Tool
def build_status_from_tool(tool: Tool):
"""
Builds a status string representation of a tool.
"""Builds a status string representation of a tool.
Args:
tool (Tool): The tool object to build the status for.

View file

@ -73,7 +73,6 @@ class FlowTool(BaseTool):
def validate_inputs(self, args_names: list[dict[str, str]], args: Any, kwargs: Any):
"""Validate the inputs."""
if len(args) > 0 and len(args) != len(args_names):
msg = "Number of positional arguments does not match the number of inputs. Pass keyword arguments instead."
raise ToolException(msg)

View file

@ -15,8 +15,7 @@ if TYPE_CHECKING:
def check_cached_vector_store(f):
"""
Decorator to check for cached vector stores, and returns them if they exist.
"""Decorator to check for cached vector stores, and returns them if they exist.
Note: caching only occurs during the execution of a component - they do not persist
across separate invocations of the component. This method exists so that components with
@ -42,9 +41,7 @@ class LCVectorStoreComponent(Component):
_cached_vector_store: VectorStore | None = None
def __init_subclass__(cls, **kwargs):
"""
Enforces the check cached decorator on all subclasses
"""
"""Enforces the check cached decorator on all subclasses."""
super().__init_subclass__(**kwargs)
if hasattr(cls, "build_vector_store"):
method = cls.build_vector_store
@ -98,13 +95,14 @@ class LCVectorStoreComponent(Component):
k=10,
**kwargs,
) -> list[Data]:
"""
Search for data in the vector store based on the input value and search type.
"""Search for data in the vector store based on the input value and search type.
Args:
input_value (Text): The input value to search for.
search_type (str): The type of search to perform.
vector_store (VectorStore): The vector store to search in.
k (int): The number of results to return.
**kwargs: Additional keyword arguments to pass to the vector store search method.
Returns:
List[Data]: A list of data matching the search criteria.
@ -112,7 +110,6 @@ class LCVectorStoreComponent(Component):
Raises:
ValueError: If invalid inputs are provided.
"""
docs: list[Document] = []
if input_value and isinstance(input_value, str) and hasattr(vector_store, "search"):
docs = vector_store.search(query=input_value, search_type=search_type.lower(), k=k, **kwargs)
@ -127,9 +124,7 @@ class LCVectorStoreComponent(Component):
return cast(VectorStore, self.build_vector_store())
def build_base_retriever(self) -> Retriever: # type: ignore[type-var]
"""
Builds the BaseRetriever object.
"""
"""Builds the BaseRetriever object."""
if self._cached_vector_store is not None:
vector_store = self._cached_vector_store
else:
@ -145,9 +140,7 @@ class LCVectorStoreComponent(Component):
raise ValueError(msg)
def search_documents(self) -> list[Data]:
"""
Search for documents in the vector store.
"""
"""Search for documents in the vector store."""
search_query: str = self.search_query
if not search_query:
self.status = ""
@ -170,16 +163,12 @@ class LCVectorStoreComponent(Component):
return search_results
def get_retriever_kwargs(self):
"""
Get the retriever kwargs. Implementations can override this method to provide custom retriever kwargs.
"""
"""Get the retriever kwargs. Implementations can override this method to provide custom retriever kwargs."""
return {}
@abstractmethod
@check_cached_vector_store
def build_vector_store(self) -> VectorStore:
"""
Builds the Vector Store object.
"""
"""Builds the Vector Store object."""
msg = "build_vector_store method must be implemented."
raise NotImplementedError(msg)

View file

@ -2,8 +2,7 @@ from langflow.schema import Data
def chroma_collection_to_data(collection_dict: dict):
"""
Converts a collection of chroma vectors into a list of data.
"""Converts a collection of chroma vectors into a list of data.
Args:
collection_dict (dict): A dictionary containing the collection of chroma vectors.

View file

@ -93,9 +93,7 @@ class GoogleDriveSearchComponent(Component):
self.generate_query_string()
def generate_file_url(self, file_id: str, mime_type: str) -> str:
"""
Generates the appropriate Google Drive URL for a file based on its MIME type.
"""
"""Generates the appropriate Google Drive URL for a file based on its MIME type."""
return {
"application/vnd.google-apps.document": f"https://docs.google.com/document/d/{file_id}/edit",
"application/vnd.google-apps.spreadsheet": f"https://docs.google.com/spreadsheets/d/{file_id}/edit",

View file

@ -37,8 +37,8 @@ class URLComponent(Component):
]
def ensure_url(self, string: str) -> str:
"""
Ensures the given string is a URL by adding 'http://' if it doesn't start with 'http://' or 'https://'.
"""Ensures the given string is a URL by adding 'http://' if it doesn't start with 'http://' or 'https://'.
Raises an error if the string is not a valid URL.
Parameters:

View file

@ -23,8 +23,7 @@ class ExtractKeyFromDataComponent(CustomComponent):
}
def build(self, data: Data, keys: list[str], silent_error: bool = True) -> Data:
"""
Extracts the keys from a data.
"""Extracts the keys from a data.
Args:
data (Data): The data from which to extract the keys.

View file

@ -6,8 +6,8 @@ from langflow.schema import Data
class MergeDataComponent(Component):
"""
MergeDataComponent is responsible for combining multiple Data objects into a unified list of Data objects.
"""MergeDataComponent is responsible for combining multiple Data objects into a unified list of Data objects.
It ensures that all keys across the input Data objects are present in each merged Data object.
Missing keys are filled with empty strings to maintain consistency.
"""
@ -36,8 +36,8 @@ class MergeDataComponent(Component):
]
def merge_data(self) -> list[Data]:
"""
Merges multiple Data objects into a single list of Data objects.
"""Merges multiple Data objects into a single list of Data objects.
Ensures that all keys from the input Data objects are present in each merged Data object.
Missing keys are filled with empty strings.

View file

@ -59,8 +59,8 @@ class GitLoaderComponent(Component):
@staticmethod
def is_binary(file_path: str) -> bool:
"""
Check if a file is binary by looking for null bytes.
"""Check if a file is binary by looking for null bytes.
This is necessary because when searches are performed using
the content_filter, binary files need to be ignored.
"""

View file

@ -48,8 +48,9 @@ class GoogleGenerativeAIEmbeddingsComponent(Component):
titles: list[str] | None = None,
output_dimensionality: int | None = 1536,
) -> list[list[float]]:
"""Embed a list of strings. Google Generative AI currently
sets a max batch size of 100 strings.
"""Embed a list of strings.
Google Generative AI currently sets a max batch size of 100 strings.
Args:
texts: List[str] The list of strings to embed.

View file

@ -6,8 +6,8 @@ from langflow.schema import Data
class MergeDataComponent(Component):
"""
MergeDataComponent is responsible for combining multiple Data objects into a unified list of Data objects.
"""MergeDataComponent is responsible for combining multiple Data objects into a unified list of Data objects.
It ensures that all keys across the input Data objects are present in each merged Data object.
Missing keys are filled with empty strings to maintain consistency.
"""
@ -36,8 +36,8 @@ class MergeDataComponent(Component):
]
def merge_data(self) -> list[Data]:
"""
Merges multiple Data objects into a single list of Data objects.
"""Merges multiple Data objects into a single list of Data objects.
Ensures that all keys from the input Data objects are present in each merged Data object.
Missing keys are filled with empty strings.

View file

@ -139,4 +139,4 @@ class SpiderTool(Component):
class SpiderToolError(Exception):
"""SpiderTool error"""
"""SpiderTool error."""

View file

@ -93,11 +93,10 @@ class AIMLModelComponent(LCModelComponent):
)
def _get_exception_message(self, e: Exception):
"""
Get a message from an OpenAI exception.
"""Get a message from an OpenAI exception.
Args:
exception (Exception): The exception to get the message from.
e (Exception): The exception to get the message from.
Returns:
str: The message from the exception.

View file

@ -81,8 +81,7 @@ class AnthropicModelComponent(LCModelComponent):
return output
def _get_exception_message(self, exception: Exception) -> str | None:
"""
Get a message from an Anthropic exception.
"""Get a message from an Anthropic exception.
Args:
exception (Exception): The exception to get the message from.

View file

@ -122,16 +122,14 @@ class OpenAIModelComponent(LCModelComponent):
return output
def _get_exception_message(self, e: Exception):
"""
Get a message from an OpenAI exception.
"""Get a message from an OpenAI exception.
Args:
exception (Exception): The exception to get the message from.
e (Exception): The exception to get the message from.
Returns:
str: The message from the exception.
"""
try:
from openai import BadRequestError
except ImportError:

View file

@ -41,9 +41,7 @@ class PromptComponent(Component):
return frontend_node
def post_code_processing(self, new_frontend_node: dict, current_frontend_node: dict):
"""
This function is called after the code validation is done.
"""
"""This function is called after the code validation is done."""
frontend_node = super().post_code_processing(new_frontend_node, current_frontend_node)
template = frontend_node["template"]["template"]["value"]
# Kept it duplicated for backwards compatibility

View file

@ -88,7 +88,7 @@ class CreateDataComponent(Component):
return return_data
def get_data(self):
"""Function to get the Data from the attributes"""
"""Function to get the Data from the attributes."""
data = {}
for value_dict in self._attributes.values():
if isinstance(value_dict, dict):
@ -100,7 +100,7 @@ class CreateDataComponent(Component):
return data
def validate_text_key(self):
"""This function validates that the Text Key is one of the keys in the Data"""
"""This function validates that the Text Key is one of the keys in the Data."""
data_keys = self.get_data().keys()
if self.text_key not in data_keys and self.text_key != "":
formatted_data_keys = ", ".join(data_keys)

View file

@ -25,8 +25,7 @@ class FlowToolComponent(LCToolComponent):
return [flow_data.data["name"] for flow_data in flow_datas]
def get_flow(self, flow_name: str) -> Data | None:
"""
Retrieves a flow by its name.
"""Retrieves a flow by its name.
Args:
flow_name (str): The name of the flow to retrieve.

View file

@ -48,8 +48,7 @@ class RunnableExecComponent(Component):
]
def get_output(self, result, input_key, output_key):
"""
Retrieves the output value from the given result dictionary based on the specified input and output keys.
"""Retrieves the output value from the given result dictionary based on the specified input and output keys.
Args:
result (dict): The result dictionary containing the output value.
@ -92,8 +91,7 @@ class RunnableExecComponent(Component):
return result_value, status
def get_input_dict(self, runnable, input_key, input_value):
"""
Returns a dictionary containing the input key-value pair for the given runnable.
"""Returns a dictionary containing the input key-value pair for the given runnable.
Args:
runnable: The runnable object.

View file

@ -95,7 +95,7 @@ class UpdateDataComponent(Component):
return self.old_data
def get_data(self):
"""Function to get the Data from the attributes"""
"""Function to get the Data from the attributes."""
data = {}
for value_dict in self._attributes.values():
if isinstance(value_dict, dict):
@ -107,7 +107,7 @@ class UpdateDataComponent(Component):
return data
def validate_text_key(self, data: Data):
"""This function validates that the Text Key is one of the keys in the Data"""
"""This function validates that the Text Key is one of the keys in the Data."""
data_keys = data.data.keys()
if self.text_key not in data_keys and self.text_key != "":
msg = f"Text Key: {self.text_key} not found in the Data keys: {','.join(data_keys)}"

View file

@ -11,9 +11,7 @@ from langflow.field_typing.constants import LanguageModel
class VectaraSelfQueryRetriverComponent(CustomComponent):
"""
A custom component for implementing Vectara Self Query Retriever using a vector store.
"""
"""A custom component for implementing Vectara Self Query Retriever using a vector store."""
display_name: str = "Vectara Self Query Retriever for Vectara Vector Store"
description: str = "Implementation of Vectara Self Query Retriever"

View file

@ -51,8 +51,7 @@ class ComposioAPIComponent(LCToolComponent):
]
def _check_for_authorization(self, app: str) -> str:
"""
Checks if the app is authorized.
"""Checks if the app is authorized.
Args:
app (str): The app name to check authorization for.
@ -71,8 +70,7 @@ class ComposioAPIComponent(LCToolComponent):
return f"{app} CONNECTED"
def _handle_authorization_failure(self, toolset: ComposioToolSet, entity: Any, app: str) -> str:
"""
Handles the authorization failure by attempting to process API key auth or initiate default connection.
"""Handles the authorization failure by attempting to process API key auth or initiate default connection.
Args:
toolset (ComposioToolSet): The toolset instance.
@ -92,8 +90,7 @@ class ComposioAPIComponent(LCToolComponent):
return "Error"
def _process_api_key_auth(self, entity: Any, app: str) -> str:
"""
Processes the API key authentication.
"""Processes the API key authentication.
Args:
entity (Any): The entity instance.

View file

@ -31,9 +31,7 @@ class GleanSearchAPIComponent(LCToolComponent):
]
class GleanAPIWrapper(BaseModel):
"""
Wrapper around Glean API.
"""
"""Wrapper around Glean API."""
glean_api_url: str
glean_access_token: str

View file

@ -209,9 +209,7 @@ class PythonCodeStructuredTool(LCToolComponent):
)
def post_code_processing(self, new_frontend_node: dict, current_frontend_node: dict):
"""
This function is called after the code validation is done.
"""
"""This function is called after the code validation is done."""
frontend_node = super().post_code_processing(new_frontend_node, current_frontend_node)
frontend_node["template"] = self.update_build_config(
frontend_node["template"], frontend_node["template"]["tool_code"]["value"], "tool_code"

View file

@ -11,9 +11,7 @@ from langflow.schema import Data
class ChromaVectorStoreComponent(LCVectorStoreComponent):
"""
Chroma Vector Store with search capabilities
"""
"""Chroma Vector Store with search capabilities."""
display_name: str = "Chroma DB"
description: str = "Chroma Vector Store with search capabilities"
@ -96,9 +94,7 @@ class ChromaVectorStoreComponent(LCVectorStoreComponent):
@check_cached_vector_store
def build_vector_store(self) -> Chroma:
"""
Builds the Chroma object.
"""
"""Builds the Chroma object."""
try:
from chromadb import Client
from langchain_chroma import Chroma
@ -133,9 +129,7 @@ class ChromaVectorStoreComponent(LCVectorStoreComponent):
return chroma
def _add_documents_to_vector_store(self, vector_store: "Chroma") -> None:
"""
Adds documents to the Vector Store.
"""
"""Adds documents to the Vector Store."""
if not self.ingest_data:
self.status = ""
return

View file

@ -19,9 +19,7 @@ from langflow.schema import Data
class ElasticsearchVectorStoreComponent(LCVectorStoreComponent):
"""
Elasticsearch Vector Store with with advanced, customizable search capabilities.
"""
"""Elasticsearch Vector Store with with advanced, customizable search capabilities."""
display_name: str = "Elasticsearch"
description: str = "Elasticsearch Vector Store with with advanced, customizable search capabilities."
@ -116,9 +114,7 @@ class ElasticsearchVectorStoreComponent(LCVectorStoreComponent):
@check_cached_vector_store
def build_vector_store(self) -> ElasticsearchStore:
"""
Builds the Elasticsearch Vector Store object.
"""
"""Builds the Elasticsearch Vector Store object."""
if self.cloud_id and self.elasticsearch_url:
msg = (
"Both 'cloud_id' and 'elasticsearch_url' provided. "
@ -152,9 +148,7 @@ class ElasticsearchVectorStoreComponent(LCVectorStoreComponent):
return elasticsearch
def _prepare_documents(self) -> list[Document]:
"""
Prepares documents from the input data to add to the vector store.
"""
"""Prepares documents from the input data to add to the vector store."""
documents = []
for data in self.ingest_data:
if isinstance(data, Data):
@ -166,9 +160,7 @@ class ElasticsearchVectorStoreComponent(LCVectorStoreComponent):
return documents
def _add_documents_to_vector_store(self, vector_store: "ElasticsearchStore") -> None:
"""
Adds documents to the Vector Store.
"""
"""Adds documents to the Vector Store."""
documents = self._prepare_documents()
if documents and self.embedding:
logger.debug(f"Adding {len(documents)} documents to the Vector Store.")
@ -177,10 +169,7 @@ class ElasticsearchVectorStoreComponent(LCVectorStoreComponent):
logger.debug("No documents to add to the Vector Store.")
def search(self, query: str | None = None) -> list[dict[str, Any]]:
"""
Search for similar documents in the vector store or retrieve all documents
if no query is provided.
"""
"""Search for similar documents in the vector store or retrieve all documents if no query is provided."""
vector_store = self.build_vector_store()
search_kwargs = {
"k": self.number_of_results,
@ -212,9 +201,7 @@ class ElasticsearchVectorStoreComponent(LCVectorStoreComponent):
return [{"page_content": doc.page_content, "metadata": doc.metadata, "score": score} for doc, score in results]
def get_all_documents(self, vector_store: ElasticsearchStore, **kwargs) -> list[tuple[Document, float]]:
"""
Retrieve all documents from the vector store.
"""
"""Retrieve all documents from the vector store."""
client = vector_store.client
index_name = self.index_name
@ -237,8 +224,8 @@ class ElasticsearchVectorStoreComponent(LCVectorStoreComponent):
return results
def search_documents(self) -> list[Data]:
"""
Search for documents in the vector store based on the search input.
"""Search for documents in the vector store based on the search input.
If no search input is provided, retrieve all documents.
"""
results = self.search(self.search_input)
@ -253,9 +240,7 @@ class ElasticsearchVectorStoreComponent(LCVectorStoreComponent):
return retrieved_data
def get_retriever_kwargs(self):
"""
Get the keyword arguments for the retriever.
"""
"""Get the keyword arguments for the retriever."""
return {
"search_type": self.search_type.lower(),
"search_kwargs": {

View file

@ -8,9 +8,7 @@ from langflow.schema import Data
class FaissVectorStoreComponent(LCVectorStoreComponent):
"""
FAISS Vector Store with search capabilities
"""
"""FAISS Vector Store with search capabilities."""
display_name: str = "FAISS"
description: str = "FAISS Vector Store with search capabilities"
@ -58,9 +56,7 @@ class FaissVectorStoreComponent(LCVectorStoreComponent):
@check_cached_vector_store
def build_vector_store(self) -> FAISS:
"""
Builds the FAISS object.
"""
"""Builds the FAISS object."""
if not self.persist_directory:
msg = "Folder path is required to save the FAISS index."
raise ValueError(msg)
@ -80,9 +76,7 @@ class FaissVectorStoreComponent(LCVectorStoreComponent):
return faiss
def search_documents(self) -> list[Data]:
"""
Search for documents in the FAISS vector store.
"""
"""Search for documents in the FAISS vector store."""
if not self.persist_directory:
msg = "Folder path is required to load the FAISS index."
raise ValueError(msg)

View file

@ -16,7 +16,7 @@ from langflow.schema import Data
class MilvusVectorStoreComponent(LCVectorStoreComponent):
"""Milvus vector store with search capabilities"""
"""Milvus vector store with search capabilities."""
display_name: str = "Milvus"
description: str = "Milvus vector store with search capabilities"

View file

@ -20,9 +20,7 @@ from langflow.schema import Data
class OpenSearchVectorStoreComponent(LCVectorStoreComponent):
"""
OpenSearch Vector Store with advanced, customizable search capabilities.
"""
"""OpenSearch Vector Store with advanced, customizable search capabilities."""
display_name: str = "OpenSearch"
description: str = "OpenSearch Vector Store with advanced, customizable search capabilities."
@ -117,9 +115,7 @@ class OpenSearchVectorStoreComponent(LCVectorStoreComponent):
@check_cached_vector_store
def build_vector_store(self) -> OpenSearchVectorSearch:
"""
Builds the OpenSearch Vector Store object.
"""
"""Builds the OpenSearch Vector Store object."""
try:
from langchain_community.vectorstores import OpenSearchVectorSearch
except ImportError as e:
@ -149,9 +145,7 @@ class OpenSearchVectorStoreComponent(LCVectorStoreComponent):
return opensearch
def _add_documents_to_vector_store(self, vector_store: "OpenSearchVectorSearch") -> None:
"""
Adds documents to the Vector Store.
"""
"""Adds documents to the Vector Store."""
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
@ -173,9 +167,7 @@ class OpenSearchVectorStoreComponent(LCVectorStoreComponent):
logger.debug("No documents to add to the Vector Store.")
def search(self, query: str | None = None) -> list[dict[str, Any]]:
"""
Search for similar documents in the vector store or retrieve all documents if no query is provided.
"""
"""Search for similar documents in the vector store or retrieve all documents if no query is provided."""
try:
vector_store = self.build_vector_store()
@ -239,8 +231,8 @@ class OpenSearchVectorStoreComponent(LCVectorStoreComponent):
raise ValueError(error_message)
def search_documents(self) -> list[Data]:
"""
Search for documents in the vector store based on the search input.
"""Search for documents in the vector store based on the search input.
If no search input is provided, retrieve all documents.
"""
try:

View file

@ -10,9 +10,7 @@ from langflow.schema import Data
class RedisVectorStoreComponent(LCVectorStoreComponent):
"""
A custom component for implementing a Vector Store using Redis.
"""
"""A custom component for implementing a Vector Store using Redis."""
display_name: str = "Redis"
description: str = "Implementation of Vector Store using Redis"

View file

@ -13,9 +13,7 @@ if TYPE_CHECKING:
class VectaraVectorStoreComponent(LCVectorStoreComponent):
"""
Vectara Vector Store with search capabilities
"""
"""Vectara Vector Store with search capabilities."""
display_name: str = "Vectara"
description: str = "Vectara Vector Store with search capabilities"
@ -53,9 +51,7 @@ class VectaraVectorStoreComponent(LCVectorStoreComponent):
@check_cached_vector_store
def build_vector_store(self) -> "Vectara":
"""
Builds the Vectara object.
"""
"""Builds the Vectara object."""
try:
from langchain_community.vectorstores import Vectara
except ImportError as e:
@ -72,9 +68,7 @@ class VectaraVectorStoreComponent(LCVectorStoreComponent):
return vectara
def _add_documents_to_vector_store(self, vector_store: "Vectara") -> None:
"""
Adds documents to the Vector Store.
"""
"""Adds documents to the Vector Store."""
if not self.ingest_data:
self.status = "No documents to add to Vectara"
return

View file

@ -58,14 +58,10 @@ def imports_key(*args, **kwargs):
class CodeParser:
"""
A parser for Python source code, extracting code details.
"""
"""A parser for Python source code, extracting code details."""
def __init__(self, code: str | type) -> None:
"""
Initializes the parser with the provided code.
"""
"""Initializes the parser with the provided code."""
self.cache: TTLCache = TTLCache(maxsize=1024, ttl=60)
if isinstance(code, type):
if not inspect.isclass(code):
@ -89,8 +85,8 @@ class CodeParser:
}
def get_tree(self):
"""
Parses the provided code to validate its syntax.
"""Parses the provided code to validate its syntax.
It tries to parse the code into an abstract syntax tree (AST).
"""
try:
@ -104,17 +100,12 @@ class CodeParser:
return tree
def parse_node(self, node: ast.stmt | ast.AST) -> None:
"""
Parses an AST node and updates the data
dictionary with the relevant information.
"""
"""Parses an AST node and updates the data dictionary with the relevant information."""
if handler := self.handlers.get(type(node)):
handler(node) # type: ignore[operator]
def parse_imports(self, node: ast.Import | ast.ImportFrom) -> None:
"""
Extracts "imports" from the code, including aliases.
"""
"""Extracts "imports" from the code, including aliases."""
if isinstance(node, ast.Import):
for alias in node.names:
if alias.asname:
@ -129,15 +120,11 @@ class CodeParser:
self.data["imports"].append((node.module, alias.name))
def parse_functions(self, node: ast.FunctionDef) -> None:
"""
Extracts "functions" from the code.
"""
"""Extracts "functions" from the code."""
self.data["functions"].append(self.parse_callable_details(node))
def parse_arg(self, arg, default):
"""
Parses an argument and its default value.
"""
"""Parses an argument and its default value."""
arg_dict = {"name": arg.arg, "default": default}
if arg.annotation:
arg_dict["type"] = ast.unparse(arg.annotation)
@ -145,7 +132,8 @@ class CodeParser:
# @cachedmethod(operator.attrgetter("cache"))
def construct_eval_env(self, return_type_str: str, imports) -> dict:
"""
"""Constructs an evaluation environment.
Constructs an evaluation environment with the necessary imports for the return type,
taking into account module aliases.
"""
@ -166,9 +154,7 @@ class CodeParser:
return eval_env
def parse_callable_details(self, node: ast.FunctionDef) -> dict[str, Any]:
"""
Extracts details from a single function or method node.
"""
"""Extracts details from a single function or method node."""
return_type = None
if node.returns:
return_type_str = ast.unparse(node.returns)
@ -190,9 +176,7 @@ class CodeParser:
return func.model_dump()
def parse_function_args(self, node: ast.FunctionDef) -> list[dict[str, Any]]:
"""
Parses the arguments of a function or method node.
"""
"""Parses the arguments of a function or method node."""
args = []
args += self.parse_positional_args(node)
@ -205,9 +189,7 @@ class CodeParser:
return args
def parse_positional_args(self, node: ast.FunctionDef) -> list[dict[str, Any]]:
"""
Parses the positional arguments of a function or method node.
"""
"""Parses the positional arguments of a function or method node."""
num_args = len(node.args.args)
num_defaults = len(node.args.defaults)
num_missing_defaults = num_args - num_defaults
@ -222,9 +204,7 @@ class CodeParser:
return list(starmap(self.parse_arg, zip(node.args.args, defaults, strict=True)))
def parse_varargs(self, node: ast.FunctionDef) -> list[dict[str, Any]]:
"""
Parses the *args argument of a function or method node.
"""
"""Parses the *args argument of a function or method node."""
args = []
if node.args.vararg:
@ -233,9 +213,7 @@ class CodeParser:
return args
def parse_keyword_args(self, node: ast.FunctionDef) -> list[dict[str, Any]]:
"""
Parses the keyword-only arguments of a function or method node.
"""
"""Parses the keyword-only arguments of a function or method node."""
kw_defaults = [None] * (len(node.args.kwonlyargs) - len(node.args.kw_defaults)) + [
ast.unparse(default) if default else None for default in node.args.kw_defaults
]
@ -243,9 +221,7 @@ class CodeParser:
return list(starmap(self.parse_arg, zip(node.args.kwonlyargs, kw_defaults, strict=True)))
def parse_kwargs(self, node: ast.FunctionDef) -> list[dict[str, Any]]:
"""
Parses the **kwargs argument of a function or method node.
"""
"""Parses the **kwargs argument of a function or method node."""
args = []
if node.args.kwarg:
@ -254,15 +230,11 @@ class CodeParser:
return args
def parse_function_body(self, node: ast.FunctionDef) -> list[str]:
"""
Parses the body of a function or method node.
"""
"""Parses the body of a function or method node."""
return [ast.unparse(line) for line in node.body]
def parse_return_statement(self, node: ast.FunctionDef) -> bool:
"""
Parses the return statement of a function or method node, including nested returns.
"""
"""Parses the return statement of a function or method node, including nested returns."""
def has_return(node):
if isinstance(node, ast.Return):
@ -284,20 +256,14 @@ class CodeParser:
return any(has_return(child) for child in node.body)
def parse_assign(self, stmt):
"""
Parses an Assign statement and returns a dictionary
with the target's name and value.
"""
"""Parses an Assign statement and returns a dictionary with the target's name and value."""
for target in stmt.targets:
if isinstance(target, ast.Name):
return {"name": target.id, "value": ast.unparse(stmt.value)}
return None
def parse_ann_assign(self, stmt):
"""
Parses an AnnAssign statement and returns a dictionary
with the target's name, value, and annotation.
"""
"""Parses an AnnAssign statement and returns a dictionary with the target's name, value, and annotation."""
if isinstance(stmt.target, ast.Name):
return {
"name": stmt.target.id,
@ -307,17 +273,15 @@ class CodeParser:
return None
def parse_function_def(self, stmt):
"""
Parses a FunctionDef statement and returns the parsed
method and a boolean indicating if it's an __init__ method.
"""Parse a FunctionDef statement.
Parse a FunctionDef statement and return the parsed method and a boolean indicating if it's an __init__ method.
"""
method = self.parse_callable_details(stmt)
return (method, True) if stmt.name == "__init__" else (method, False)
def get_base_classes(self):
"""
Returns the base classes of the custom component class.
"""
"""Returns the base classes of the custom component class."""
try:
bases = self.execute_and_inspect_classes(self.code)
except Exception:
@ -327,9 +291,7 @@ class CodeParser:
return bases
def parse_classes(self, node: ast.ClassDef) -> None:
"""
Extracts "classes" from the code, including inheritance and init methods.
"""
"""Extracts "classes" from the code, including inheritance and init methods."""
bases = self.get_base_classes()
nodes = []
for base in bases:
@ -373,9 +335,7 @@ class CodeParser:
class_details.methods.append(method)
def parse_global_vars(self, node: ast.Assign) -> None:
"""
Extracts global variables from the code.
"""
"""Extracts global variables from the code."""
global_var = {
"targets": [t.id if hasattr(t, "id") else ast.dump(t) for t in node.targets],
"value": ast.unparse(node.value),
@ -394,9 +354,7 @@ class CodeParser:
return bases
def parse_code(self) -> dict[str, Any]:
"""
Runs all parsing operations and returns the resulting data.
"""
"""Runs all parsing operations and returns the resulting data."""
tree = self.get_tree()
for node in ast.walk(tree):

View file

@ -69,9 +69,7 @@ class BaseComponent:
@staticmethod
def get_template_config(component):
"""
Gets the template configuration for the custom component itself.
"""
"""Gets the template configuration for the custom component itself."""
template_config = {}
for attribute, func in ATTR_FUNC_MAPPING.items():
@ -87,8 +85,7 @@ class BaseComponent:
return template_config
def build_template_config(self) -> dict:
"""
Builds the template configuration for the custom component.
"""Builds the template configuration for the custom component.
Returns:
A dictionary representing the template configuration.

View file

@ -170,8 +170,7 @@ class Component(CustomComponent):
raise ValueError(msg) from e
def set(self, **kwargs):
"""
Connects the component to other components or sets parameters and attributes.
"""Connects the component to other components or sets parameters and attributes.
Args:
**kwargs: Keyword arguments representing the connections, parameters, and attributes.
@ -187,20 +186,15 @@ class Component(CustomComponent):
return self
def list_inputs(self):
"""
Returns a list of input names.
"""
"""Returns a list of input names."""
return [_input.name for _input in self.inputs]
def list_outputs(self):
"""
Returns a list of output names.
"""
"""Returns a list of output names."""
return [_output.name for _output in self._outputs_map.values()]
async def run(self):
"""
Executes the component's logic and returns the result.
"""Executes the component's logic and returns the result.
Returns:
The result of executing the component's logic.
@ -208,8 +202,7 @@ class Component(CustomComponent):
return await self._run()
def set_vertex(self, vertex: Vertex):
"""
Sets the vertex for the component.
"""Sets the vertex for the component.
Args:
vertex (Vertex): The vertex to set.
@ -220,8 +213,7 @@ class Component(CustomComponent):
self._vertex = vertex
def get_input(self, name: str) -> Any:
"""
Retrieves the value of the input with the specified name.
"""Retrieves the value of the input with the specified name.
Args:
name (str): The name of the input.
@ -238,8 +230,7 @@ class Component(CustomComponent):
raise ValueError(msg)
def get_output(self, name: str) -> Any:
"""
Retrieves the output with the specified name.
"""Retrieves the output with the specified name.
Args:
name (str): The name of the output to retrieve.
@ -271,8 +262,7 @@ class Component(CustomComponent):
raise ValueError(msg)
def map_outputs(self, outputs: list[Output]):
"""
Maps the given list of outputs to the component.
"""Maps the given list of outputs to the component.
Args:
outputs (List[Output]): The list of outputs to be mapped.
@ -292,8 +282,7 @@ class Component(CustomComponent):
self._outputs_map[output.name] = deepcopy(output)
def map_inputs(self, inputs: list[InputTypes]):
"""
Maps the given inputs to the component.
"""Maps the given inputs to the component.
Args:
inputs (List[InputTypes]): A list of InputTypes objects representing the inputs.
@ -309,8 +298,7 @@ class Component(CustomComponent):
self._inputs[input_.name] = deepcopy(input_)
def validate(self, params: dict):
"""
Validates the component parameters.
"""Validates the component parameters.
Args:
params (dict): A dictionary containing the component parameters.
@ -377,10 +365,10 @@ class Component(CustomComponent):
return text
def _find_matching_output_method(self, input_name: str, value: Component):
"""
"""Find the output method from the given component and input name.
Find the output method from the given component (`value`) that matches the specified input (`input_name`)
in the current component.
This method searches through all outputs of the provided component to find outputs whose types match
the input types of the specified input in the current component. If exactly one matching output is found,
it returns the corresponding method. If multiple matching outputs are found, it raises an error indicating
@ -788,8 +776,7 @@ class Component(CustomComponent):
return self.repr_value
def build_inputs(self, user_id: str | UUID | None = None):
"""
Builds the inputs for the custom component.
"""Builds the inputs for the custom component.
Args:
user_id (Optional[Union[str, UUID]], optional): The user ID. Defaults to None.
@ -827,11 +814,11 @@ class Component(CustomComponent):
return "Langflow"
def log(self, message: LoggableType | list[LoggableType], name: str | None = None):
"""
Logs a message.
"""Logs a message.
Args:
message (LoggableType | list[LoggableType]): The message to log.
name (str, optional): The name of the log. Defaults to None.
"""
if name is None:
name = f"Log {len(self._logs) + 1}"

View file

@ -31,8 +31,7 @@ if TYPE_CHECKING:
class CustomComponent(BaseComponent):
"""
Represents a custom component in Langflow.
"""Represents a custom component in Langflow.
Attributes:
name (Optional[str]): This attribute helps the frontend apply styles to known components.
@ -87,8 +86,7 @@ class CustomComponent(BaseComponent):
_tree: dict | None = None
def __init__(self, **data):
"""
Initializes a new instance of the CustomComponent class.
"""Initializes a new instance of the CustomComponent class.
Args:
**data: Additional keyword arguments to initialize the custom component.
@ -196,8 +194,7 @@ class CustomComponent(BaseComponent):
return self.field_order or list(self.field_config.keys())
def custom_repr(self):
"""
Returns the custom representation of the custom component.
"""Returns the custom representation of the custom component.
Returns:
str: The custom representation of the custom component.
@ -213,8 +210,7 @@ class CustomComponent(BaseComponent):
return self.repr_value
def build_config(self):
"""
Builds the configuration for the custom component.
"""Builds the configuration for the custom component.
Returns:
dict: The configuration for the custom component.
@ -232,8 +228,7 @@ class CustomComponent(BaseComponent):
@property
def tree(self):
"""
Gets the code tree of the custom component.
"""Gets the code tree of the custom component.
Returns:
dict: The code tree of the custom component.
@ -241,17 +236,18 @@ class CustomComponent(BaseComponent):
return self.get_code_tree(self._code or "")
def to_data(self, data: Any, keys: list[str] | None = None, silent_errors: bool = False) -> list[Data]:
"""
Converts input data into a list of Data objects.
"""Converts input data into a list of Data objects.
Args:
data (Any): The input data to be converted. It can be a single item or a sequence of items.
If the input data is a Langchain Document, text_key and data_key are ignored.
If the input data is a Langchain Document, text_key and data_key are ignored.
keys (List[str], optional): The keys to access the text and data values in each item.
It should be a list of strings where the first element is the text key and the second element
is the data key.
Defaults to None, in which case the default keys "text" and "data" are used.
silent_errors (bool, optional): Whether to suppress errors when the specified keys are not found
in the data.
Returns:
List[Data]: A list of Data objects.
@ -303,8 +299,7 @@ class CustomComponent(BaseComponent):
return self._extract_return_type(return_type)
def create_references_from_data(self, data: list[Data], include_data: bool = False) -> str:
"""
Create references from a list of data.
"""Create references from a list of data.
Args:
data (List[dict]): A list of data, where each record is a dictionary.
@ -325,8 +320,7 @@ class CustomComponent(BaseComponent):
@property
def get_function_entrypoint_args(self) -> list:
"""
Gets the arguments of the function entrypoint for the custom component.
"""Gets the arguments of the function entrypoint for the custom component.
Returns:
list: The arguments of the function entrypoint.
@ -343,8 +337,7 @@ class CustomComponent(BaseComponent):
return args
def get_method(self, method_name: str):
"""
Gets the build method for the custom component.
"""Gets the build method for the custom component.
Returns:
dict: The build method for the custom component.
@ -366,8 +359,7 @@ class CustomComponent(BaseComponent):
@property
def get_function_entrypoint_return_type(self) -> list[Any]:
"""
Gets the return type of the function entrypoint for the custom component.
"""Gets the return type of the function entrypoint for the custom component.
Returns:
List[Any]: The return type of the function entrypoint.
@ -379,8 +371,7 @@ class CustomComponent(BaseComponent):
@property
def get_main_class_name(self):
"""
Gets the main class name of the custom component.
"""Gets the main class name of the custom component.
Returns:
str: The main class name of the custom component.
@ -403,8 +394,7 @@ class CustomComponent(BaseComponent):
@property
def template_config(self):
"""
Gets the template configuration for the custom component.
"""Gets the template configuration for the custom component.
Returns:
dict: The template configuration for the custom component.
@ -415,8 +405,7 @@ class CustomComponent(BaseComponent):
@property
def variables(self):
"""
Returns the variable for the current user with the specified name.
"""Returns the variable for the current user with the specified name.
Raises:
ValueError: If the user id is not set.
@ -438,8 +427,7 @@ class CustomComponent(BaseComponent):
return get_variable
def list_key_names(self):
"""
Lists the names of the variables for the current user.
"""Lists the names of the variables for the current user.
Raises:
ValueError: If the user id is not set.
@ -456,8 +444,7 @@ class CustomComponent(BaseComponent):
return variable_service.list_variables(user_id=self.user_id, session=session)
def index(self, value: int = 0):
"""
Returns a function that returns the value at the given index in the iterable.
"""Returns a function that returns the value at the given index in the iterable.
Args:
value (int): The index value.
@ -472,8 +459,7 @@ class CustomComponent(BaseComponent):
return get_index
def get_function(self):
"""
Gets the function associated with the custom component.
"""Gets the function associated with the custom component.
Returns:
Callable: The function associated with the custom component.
@ -515,8 +501,7 @@ class CustomComponent(BaseComponent):
raise ValueError(msg) from e
def build(self, *args: Any, **kwargs: Any) -> Any:
"""
Builds the custom component.
"""Builds the custom component.
Args:
*args: The positional arguments.
@ -528,9 +513,7 @@ class CustomComponent(BaseComponent):
raise NotImplementedError
def post_code_processing(self, new_frontend_node: dict, current_frontend_node: dict):
"""
This function is called after the code validation is done.
"""
"""This function is called after the code validation is done."""
return update_frontend_node_with_template_values(
frontend_node=new_frontend_node, raw_frontend_node=current_frontend_node
)

View file

@ -18,9 +18,7 @@ class StringCompressor:
self.input_string = input_string
def compress_string(self):
"""
Compress the initial string and return the compressed data.
"""
"""Compress the initial string and return the compressed data."""
# Convert string to bytes
byte_data = self.input_string.encode("utf-8")
# Compress the bytes
@ -29,9 +27,7 @@ class StringCompressor:
return self.compressed_data
def decompress_string(self):
"""
Decompress the compressed data and return the original string.
"""
"""Decompress the compressed data and return the original string."""
# Decompress the bytes
decompressed_data = zlib.decompress(self.compressed_data)
# Convert bytes back to string
@ -44,10 +40,7 @@ class DirectoryReader:
base_path = ""
def __init__(self, directory_path, compress_code_field=False):
"""
Initialize DirectoryReader with a directory path
and a flag indicating whether to compress the code.
"""
"""Initialize DirectoryReader with a directory path and a flag indicating whether to compress the code."""
self.directory_path = directory_path
self.compress_code_field = compress_code_field
@ -61,9 +54,7 @@ class DirectoryReader:
return not self.base_path or fullpath.is_relative_to(self.base_path)
def is_empty_file(self, file_content):
"""
Check if the file content is empty.
"""
"""Check if the file content is empty."""
return len(file_content.strip()) == 0
def filter_loaded_components(self, data: dict, with_errors: bool) -> dict:
@ -86,9 +77,7 @@ class DirectoryReader:
return {"menu": filtered}
def validate_code(self, file_content):
"""
Validate the Python code by trying to parse it with ast.parse.
"""
"""Validate the Python code by trying to parse it with ast.parse."""
try:
ast.parse(file_content)
except SyntaxError:
@ -96,15 +85,11 @@ class DirectoryReader:
return True
def validate_build(self, file_content):
"""
Check if the file content contains a function named 'build'.
"""
"""Check if the file content contains a function named 'build'."""
return "def build" in file_content
def read_file_content(self, file_path):
"""
Read and return the content of a file.
"""
"""Read and return the content of a file."""
_file_path = Path(file_path)
if not _file_path.is_file():
return None
@ -119,9 +104,7 @@ class DirectoryReader:
return f.read().decode("utf-8")
def get_files(self):
"""
Walk through the directory path and return a list of all .py files.
"""
"""Walk through the directory path and return a list of all .py files."""
if not (safe_path := self.get_safe_path()):
msg = f"The path needs to start with '{self.base_path}'."
raise CustomComponentPathValueError(msg)
@ -143,19 +126,14 @@ class DirectoryReader:
return file_list
def find_menu(self, response, menu_name):
"""
Find and return a menu by its name in the response.
"""
"""Find and return a menu by its name in the response."""
return next(
(menu for menu in response["menu"] if menu["name"] == menu_name),
None,
)
def _is_type_hint_imported(self, type_hint_name: str, code: str) -> bool:
"""
Check if a specific type hint is imported
from the typing module in the given code.
"""
"""Check if a specific type hint is imported from the typing module in the given code."""
module = ast.parse(code)
return any(
@ -166,10 +144,7 @@ class DirectoryReader:
)
def _is_type_hint_used_in_args(self, type_hint_name: str, code: str) -> bool:
"""
Check if a specific type hint is used in the
function definitions within the given code.
"""
"""Check if a specific type hint is used in the function definitions within the given code."""
try:
module = ast.parse(code)
@ -184,9 +159,7 @@ class DirectoryReader:
return False
def _is_type_hint_in_arg_annotation(self, annotation, type_hint_name: str) -> bool:
"""
Helper function to check if a type hint exists in an annotation.
"""
"""Helper function to check if a type hint exists in an annotation."""
return (
annotation is not None
and isinstance(annotation, ast.Subscript)
@ -195,9 +168,7 @@ class DirectoryReader:
)
def is_type_hint_used_but_not_imported(self, type_hint_name: str, code: str) -> bool:
"""
Check if a type hint is used but not imported in the given code.
"""
"""Check if a type hint is used but not imported in the given code."""
try:
return self._is_type_hint_used_in_args(type_hint_name, code) and not self._is_type_hint_imported(
type_hint_name, code
@ -208,10 +179,7 @@ class DirectoryReader:
return True
def process_file(self, file_path):
"""
Process a file by validating its content and
returning the result and content/error message.
"""
"""Process a file by validating its content and returning the result and content/error message."""
try:
file_content = self.read_file_content(file_path)
except Exception: # noqa: BLE001
@ -236,10 +204,7 @@ class DirectoryReader:
return True, file_content
def build_component_menu_list(self, file_paths):
"""
Build a list of menus with their components
from the .py files in the directory.
"""
"""Build a list of menus with their components from the .py files in the directory."""
response = {"menu": []}
logger.debug("-------------------- Building component menu list --------------------")
@ -369,9 +334,7 @@ class DirectoryReader:
@staticmethod
def get_output_types_from_code(code: str) -> list:
"""
Get the output types from the code.
"""
"""Get the output types from the code."""
custom_component = Component(_code=code)
types_list = custom_component.get_function_entrypoint_return_type

View file

@ -42,7 +42,7 @@ def build_valid_menu(valid_components):
def build_and_validate_all_files(reader: DirectoryReader, file_list):
"""Build and validate all files"""
"""Build and validate all files."""
data = reader.build_component_menu_list(file_list)
valid_components = reader.filter_loaded_components(data=data, with_errors=False)
@ -52,7 +52,7 @@ def build_and_validate_all_files(reader: DirectoryReader, file_list):
async def abuild_and_validate_all_files(reader: DirectoryReader, file_list):
"""Build and validate all files"""
"""Build and validate all files."""
data = await reader.abuild_component_menu_list(file_list)
valid_components = reader.filter_loaded_components(data=data, with_errors=False)
@ -62,14 +62,14 @@ async def abuild_and_validate_all_files(reader: DirectoryReader, file_list):
def load_files_from_path(path: str):
"""Load all files from a given path"""
"""Load all files from a given path."""
reader = DirectoryReader(path, False)
return reader.get_files()
def build_custom_component_list_from_path(path: str):
"""Build a list of custom components for the langchain from a given path"""
"""Build a list of custom components for the langchain from a given path."""
file_list = load_files_from_path(path)
reader = DirectoryReader(path, False)
@ -82,7 +82,7 @@ def build_custom_component_list_from_path(path: str):
async def abuild_custom_component_list_from_path(path: str):
"""Build a list of custom components for the langchain from a given path"""
"""Build a list of custom components for the langchain from a given path."""
file_list = load_files_from_path(path)
reader = DirectoryReader(path, False)

View file

@ -7,6 +7,6 @@ if TYPE_CHECKING:
def eval_custom_component_code(code: str) -> type["CustomComponent"]:
"""Evaluate custom component code"""
"""Evaluate custom component code."""
class_name = validate.extract_class_name(code)
return validate.create_class(code, class_name)

View file

@ -4,9 +4,7 @@ from pydantic import BaseModel, Field
class ClassCodeDetails(BaseModel):
"""
A dataclass for storing details about a class.
"""
"""A dataclass for storing details about a class."""
name: str
doc: str | None = None
@ -17,9 +15,7 @@ class ClassCodeDetails(BaseModel):
class CallableCodeDetails(BaseModel):
"""
A dataclass for storing details about a callable.
"""
"""A dataclass for storing details about a callable."""
name: str
doc: str | None = None
@ -30,9 +26,7 @@ class CallableCodeDetails(BaseModel):
class MissingDefault:
"""
A class to represent a missing default value.
"""
"""A class to represent a missing default value."""
def __repr__(self):
return "MISSING"

View file

@ -33,7 +33,7 @@ class UpdateBuildConfigError(Exception):
def add_output_types(frontend_node: CustomComponentFrontendNode, return_types: list[str]):
"""Add output types to the frontend node"""
"""Add output types to the frontend node."""
for return_type in return_types:
if return_type is None:
raise HTTPException(
@ -70,7 +70,7 @@ def reorder_fields(frontend_node: CustomComponentFrontendNode, field_order: list
def add_base_classes(frontend_node: CustomComponentFrontendNode, return_types: list[str]):
"""Add base classes to the frontend node"""
"""Add base classes to the frontend node."""
for return_type_instance in return_types:
if return_type_instance is None:
raise HTTPException(
@ -90,8 +90,7 @@ def add_base_classes(frontend_node: CustomComponentFrontendNode, return_types: l
def extract_type_from_optional(field_type):
"""
Extract the type from a string formatted as "Optional[<type>]".
"""Extract the type from a string formatted as "Optional[<type>]".
Parameters:
field_type (str): The string from which to extract the type.
@ -106,7 +105,7 @@ def extract_type_from_optional(field_type):
def get_field_properties(extra_field):
"""Get the properties of an extra field"""
"""Get the properties of an extra field."""
field_name = extra_field["name"]
field_type = extra_field.get("type", "str")
field_value = extra_field.get("default", "")
@ -197,7 +196,7 @@ def add_new_custom_field(
def add_extra_fields(frontend_node, field_config, function_args):
"""Add extra fields to the frontend node"""
"""Add extra fields to the frontend node."""
if not function_args:
return
_field_config = field_config.copy()
@ -240,7 +239,7 @@ def add_extra_fields(frontend_node, field_config, function_args):
def get_field_dict(field: Input | dict):
"""Get the field dictionary from a Input or a dict"""
"""Get the field dictionary from a Input or a dict."""
if isinstance(field, Input):
return dotdict(field.model_dump(by_alias=True, exclude_none=True))
return field
@ -298,8 +297,7 @@ def run_build_config(
custom_component: CustomComponent,
user_id: str | UUID | None = None,
) -> tuple[dict, CustomComponent]:
"""Build the field configuration for a custom component"""
"""Build the field configuration for a custom component."""
if custom_component._code is None:
error = "Code is None"
elif not isinstance(custom_component._code, str):
@ -392,7 +390,7 @@ def build_custom_component_template(
custom_component: CustomComponent,
user_id: str | UUID | None = None,
) -> tuple[dict[str, Any], CustomComponent | Component]:
"""Build a custom component template"""
"""Build a custom component template."""
try:
has_template_config = hasattr(custom_component, "template_config")
except Exception as exc:
@ -516,7 +514,7 @@ def update_field_dict(
update_field_value: Any | None = None,
call: bool = False,
):
"""Update the field dictionary by calling options() or value() if they are callable"""
"""Update the field dictionary by calling options() or value() if they are callable."""
if (
("real_time_refresh" in field_dict or "refresh_button" in field_dict)
and any(
@ -573,7 +571,7 @@ def build_component(component):
def get_function(code):
"""Get the function"""
"""Get the function."""
function_name = validate.extract_function_name(code)
return validate.create_function(code, function_name)

View file

@ -218,8 +218,8 @@ class CycleEdge(Edge):
target._has_cycle_edges = True
async def honor(self, source: Vertex, target: Vertex) -> None:
"""
Fulfills the contract by setting the result of the source vertex to the target vertex's parameter.
"""Fulfills the contract by setting the result of the source vertex to the target vertex's parameter.
If the edge is runnable, the source vertex is run with the message text and the target vertex's
root_field param is set to the
result. If the edge is not runnable, the target vertex's parameter is set to the result.

View file

@ -1,5 +1,4 @@
"""
This code is adapted from the DVC project.
"""This code is adapted from the DVC project.
Original source:
https://github.com/iterative/dvc/blob/c5bac1c8cfdb2c0f54d52ac61ff754e6f583822a/dvc/dagascii.py

View file

@ -65,13 +65,16 @@ class Graph:
user_id: str | None = None,
log_config: LogConfig | None = None,
) -> None:
"""
Initializes a new instance of the Graph class.
"""Initializes a new instance of the Graph class.
Args:
nodes (List[Dict]): A list of dictionaries representing the vertices of the graph.
edges (List[Dict[str, str]]): A list of dictionaries representing the edges of the graph.
flow_id (Optional[str], optional): The ID of the flow. Defaults to None.
start: The start component.
end: The end component.
flow_id: The ID of the flow. Defaults to None.
flow_name: The flow name.
description: The graph description.
user_id: The user ID.
log_config: The log configuration.
"""
if log_config:
configure(**log_config)
@ -394,8 +397,7 @@ class Graph:
self.define_vertices_lists()
def get_state(self, name: str) -> Data | None:
"""
Returns the state of the graph with the given name.
"""Returns the state of the graph with the given name.
Args:
name (str): The name of the state.
@ -406,8 +408,7 @@ class Graph:
return self.state_manager.get_state(name, run_id=self._run_id)
def update_state(self, name: str, record: str | Data, caller: str | None = None) -> None:
"""
Updates the state of the graph with the given name.
"""Updates the state of the graph with the given name.
Args:
name (str): The name of the state.
@ -424,8 +425,7 @@ class Graph:
self.state_manager.update_state(name, record, run_id=self._run_id)
def activate_state_vertices(self, name: str, caller: str):
"""
Activates the state vertices in the graph with the given name and caller.
"""Activates the state vertices in the graph with the given name and caller.
Args:
name (str): The name of the state.
@ -474,14 +474,11 @@ class Graph:
)
def reset_activated_vertices(self):
"""
Resets the activated vertices in the graph.
"""
"""Resets the activated vertices in the graph."""
self.activated_vertices = []
def append_state(self, name: str, record: str | Data, caller: str | None = None) -> None:
"""
Appends the state of the graph with the given name.
"""Appends the state of the graph with the given name.
Args:
name (str): The name of the state.
@ -494,8 +491,7 @@ class Graph:
self.state_manager.append_state(name, record, run_id=self._run_id)
def validate_stream(self):
"""
Validates the stream configuration of the graph.
"""Validates the stream configuration of the graph.
If there are two vertices in the same graph (connected by edges)
that have `stream=True` or `streaming=True`, raises a `ValueError`.
@ -523,8 +519,7 @@ class Graph:
@property
def is_cyclic(self):
"""
Check if the graph has any cycles.
"""Check if the graph has any cycles.
Returns:
bool: True if the graph has any cycles, False otherwise.
@ -540,8 +535,7 @@ class Graph:
@property
def run_id(self):
"""
The ID of the current run.
"""The ID of the current run.
Returns:
str: The run ID.
@ -555,8 +549,7 @@ class Graph:
return self._run_id
def set_run_id(self, run_id: uuid.UUID | None = None):
"""
Sets the ID of the current run.
"""Sets the ID of the current run.
Args:
run_id (str): The run ID.
@ -600,8 +593,7 @@ class Graph:
@property
def sorted_vertices_layers(self) -> list[list[str]]:
"""
The sorted layers of vertices in the graph.
"""The sorted layers of vertices in the graph.
Returns:
List[List[str]]: The sorted layers of vertices.
@ -611,9 +603,7 @@ class Graph:
return self._sorted_vertices_layers
def define_vertices_lists(self):
"""
Defines the lists of vertices that are inputs, outputs, and have session_id.
"""
"""Defines the lists of vertices that are inputs, outputs, and have session_id."""
attributes = ["is_input", "is_output", "has_session_id", "is_state"]
for vertex in self.vertices:
for attribute in attributes:
@ -645,20 +635,20 @@ class Graph:
session_id: str,
fallback_to_env_vars: bool,
) -> list[ResultData | None]:
"""
Runs the graph with the given inputs.
"""Runs the graph with the given inputs.
Args:
inputs (Dict[str, str]): The input values for the graph.
input_components (list[str]): The components to run for the inputs.
input_type: (Optional[InputType]): The input type.
outputs (list[str]): The outputs to retrieve from the graph.
stream (bool): Whether to stream the results or not.
session_id (str): The session ID for the graph.
fallback_to_env_vars (bool): Whether to fallback to environment variables.
Returns:
List[Optional["ResultData"]]: The outputs of the graph.
"""
if input_components and not isinstance(input_components, list):
msg = f"Invalid components value: {input_components}. Expected list"
raise ValueError(msg)
@ -722,8 +712,7 @@ class Graph:
stream: bool = False,
fallback_to_env_vars: bool = False,
) -> list[RunOutputs]:
"""
Run the graph with the given inputs and return the outputs.
"""Run the graph with the given inputs and return the outputs.
Args:
inputs (Dict[str, str]): A dictionary of input values.
@ -732,6 +721,7 @@ class Graph:
outputs (Optional[list[str]]): A list of output components.
session_id (Optional[str]): The session ID.
stream (bool): Whether to stream the outputs.
fallback_to_env_vars (bool): Whether to fallback to environment variables.
Returns:
List[RunOutputs]: A list of RunOutputs objects representing the outputs.
@ -773,15 +763,16 @@ class Graph:
stream: bool = False,
fallback_to_env_vars: bool = False,
) -> list[RunOutputs]:
"""
Runs the graph with the given inputs.
"""Runs the graph with the given inputs.
Args:
inputs (list[Dict[str, str]]): The input values for the graph.
inputs_components (Optional[list[list[str]]], optional): Components to run for the inputs. Defaults to None.
types (Optional[list[Optional[InputType]]], optional): The types of the inputs. Defaults to None.
outputs (Optional[list[str]], optional): The outputs to retrieve from the graph. Defaults to None.
session_id (Optional[str], optional): The session ID for the graph. Defaults to None.
stream (bool, optional): Whether to stream the results or not. Defaults to False.
fallback_to_env_vars (bool, optional): Whether to fallback to environment variables. Defaults to False.
Returns:
List[RunOutputs]: The outputs of the graph.
@ -821,8 +812,7 @@ class Graph:
return vertex_outputs
def next_vertex_to_build(self):
"""
Returns the next vertex to be built.
"""Returns the next vertex to be built.
Yields:
str: The ID of the next vertex to be built.
@ -831,8 +821,7 @@ class Graph:
@property
def metadata(self):
"""
The metadata of the graph.
"""The metadata of the graph.
Returns:
dict: The metadata of the graph.
@ -847,9 +836,7 @@ class Graph:
}
def build_graph_maps(self, edges: list[CycleEdge] | None = None, vertices: list[Vertex] | None = None):
"""
Builds the adjacency maps for the graph.
"""
"""Builds the adjacency maps for the graph."""
if edges is None:
edges = self.edges
@ -862,9 +849,7 @@ class Graph:
self.parent_child_map = self.build_parent_child_map(vertices)
def reset_inactivated_vertices(self):
"""
Resets the inactivated vertices in the graph.
"""
"""Resets the inactivated vertices in the graph."""
for vertex_id in self.inactivated_vertices.copy():
self.mark_vertex(vertex_id, "ACTIVE")
self.inactivated_vertices = []
@ -1016,11 +1001,13 @@ class Graph:
flow_name: str | None = None,
user_id: str | None = None,
) -> Graph:
"""
Creates a graph from a payload.
"""Creates a graph from a payload.
Args:
payload (Dict): The payload to create the graph from.`
payload: The payload to create the graph from.
flow_id: The ID of the flow.
flow_name: The flow name.
user_id: The user ID.
Returns:
Graph: The created graph.
@ -1125,8 +1112,7 @@ class Graph:
return self
def update_vertex_from_another(self, vertex: Vertex, other_vertex: Vertex) -> None:
"""
Updates a vertex from another vertex.
"""Updates a vertex from another vertex.
Args:
vertex (Vertex): The vertex to be updated.
@ -1329,15 +1315,17 @@ class Graph:
fallback_to_env_vars: bool = False,
event_manager: EventManager | None = None,
) -> VertexBuildResult:
"""
Builds a vertex in the graph.
"""Builds a vertex in the graph.
Args:
lock (asyncio.Lock): A lock to synchronize access to the graph.
set_cache_coro (Coroutine): A coroutine to set the cache.
vertex_id (str): The ID of the vertex to build.
inputs (Optional[Dict[str, str]]): Optional dictionary of inputs for the vertex. Defaults to None.
get_cache (GetCache): A coroutine to get the cache.
set_cache (SetCache): A coroutine to set the cache.
inputs_dict (Optional[Dict[str, str]]): Optional dictionary of inputs for the vertex. Defaults to None.
files: (Optional[List[str]]): Optional list of files. Defaults to None.
user_id (Optional[str]): Optional user ID. Defaults to None.
fallback_to_env_vars (bool): Whether to fallback to environment variables. Defaults to False.
event_manager (Optional[EventManager]): Optional event manager. Defaults to None.
Returns:
Tuple: A tuple containing the next runnable vertices, top level vertices, result dictionary,
@ -1448,7 +1436,6 @@ class Graph:
async def process(self, fallback_to_env_vars: bool, start_component_id: str | None = None) -> Graph:
"""Processes the graph with vertices in each layer run in parallel."""
first_layer = self.sort_vertices(start_component_id=start_component_id)
vertex_task_run_count: dict[str, int] = {}
to_process = deque(first_layer)
@ -1554,8 +1541,7 @@ class Graph:
return list(set(results))
def topological_sort(self) -> list[Vertex]:
"""
Performs a topological sort of the vertices in the graph.
"""Performs a topological sort of the vertices in the graph.
Returns:
List[Vertex]: A list of vertices in topological order.
@ -2019,8 +2005,7 @@ class Graph:
return self.run_manager.is_vertex_runnable(vertex_id, is_active)
def build_run_map(self):
"""
Builds the run map for the graph.
"""Builds the run map for the graph.
This method is responsible for building the run map for the graph,
which maps each node in the graph to its corresponding run function.
@ -2031,8 +2016,8 @@ class Graph:
self.run_manager.build_run_map(predecessor_map=self.predecessor_map, vertices_to_run=self.vertices_to_run)
def find_runnable_predecessors_for_successors(self, vertex_id: str) -> list[str]:
"""
For each successor of the current vertex, find runnable predecessors if any.
"""For each successor of the current vertex, find runnable predecessors if any.
This checks the direct predecessors of each successor to identify any that are
immediately runnable, expanding the search to ensure progress can be made.
"""
@ -2069,8 +2054,7 @@ class Graph:
self.run_manager.remove_vertex_from_runnables(vertex_id)
def get_top_level_vertices(self, vertices_ids):
"""
Retrieves the top-level vertices from the given graph based on the provided vertex IDs.
"""Retrieves the top-level vertices from the given graph based on the provided vertex IDs.
Args:
vertices_ids (list): A list of vertex IDs.

View file

@ -9,8 +9,7 @@ def camel_to_snake(camel_str: str) -> str:
def create_state_model_from_graph(graph: BaseModel) -> type[BaseModel]:
"""
Create a Pydantic state model from a graph representation.
"""Create a Pydantic state model from a graph representation.
This function generates a Pydantic model that represents the state of an entire graph.
It creates getter methods for each vertex in the graph, allowing access to the state

View file

@ -7,8 +7,7 @@ PRIORITY_LIST_OF_INPUTS = ["webhook", "chat"]
def find_start_component_id(vertices):
"""
Finds the component ID from a list of vertices based on a priority list of input types.
"""Finds the component ID from a list of vertices based on a priority list of input types.
Args:
vertices (list): A list of vertex IDs.
@ -24,24 +23,18 @@ def find_start_component_id(vertices):
def find_last_node(nodes, edges):
"""
This function receives a flow and returns the last node.
"""
"""This function receives a flow and returns the last node."""
return next((n for n in nodes if all(e["source"] != n["id"] for e in edges)), None)
def add_parent_node_id(nodes, parent_node_id):
"""
This function receives a list of nodes and adds a parent_node_id to each node.
"""
"""This function receives a list of nodes and adds a parent_node_id to each node."""
for node in nodes:
node["parent_node_id"] = parent_node_id
def add_frozen(nodes, frozen):
"""
This function receives a list of nodes and adds a frozen to each node.
"""
"""This function receives a list of nodes and adds a frozen to each node."""
for node in nodes:
node["data"]["node"]["frozen"] = frozen
@ -108,8 +101,7 @@ def process_flow(flow_object):
def update_template(template, g_nodes):
"""
Updates the template of a node in a graph with the given template.
"""Updates the template of a node in a graph with the given template.
Args:
template (dict): The new template to update the node with.
@ -140,8 +132,7 @@ def update_template(template, g_nodes):
def update_target_handle(new_edge, g_nodes, group_node_id):
"""
Updates the target handle of a given edge if it is a proxy node.
"""Updates the target handle of a given edge if it is a proxy node.
Args:
new_edge (dict): The edge to update.
@ -160,8 +151,7 @@ def update_target_handle(new_edge, g_nodes, group_node_id):
def set_new_target_handle(proxy_id, new_edge, target_handle, node):
"""
Sets a new target handle for a given edge.
"""Sets a new target handle for a given edge.
Args:
proxy_id (str): The ID of the proxy.
@ -195,12 +185,12 @@ def set_new_target_handle(proxy_id, new_edge, target_handle, node):
def update_source_handle(new_edge, g_nodes, g_edges):
"""
Updates the source handle of a given edge to the last node in the flow data.
"""Updates the source handle of a given edge to the last node in the flow data.
Args:
new_edge (dict): The edge to update.
flow_data (dict): The flow data containing the nodes and edges.
g_nodes: The graph nodes.
g_edges: The graph edges.
Returns:
dict: The updated edge with the new source handle.
@ -214,13 +204,15 @@ def update_source_handle(new_edge, g_nodes, g_edges):
def get_updated_edges(base_flow, g_nodes, g_edges, group_node_id):
"""
"""Get updated edges.
Given a base flow, a list of graph nodes and a group node id, returns a list of updated edges.
An updated edge is an edge that has its target or source handle updated based on the group node id.
Args:
base_flow (dict): The base flow containing a list of edges.
g_nodes (list): A list of graph nodes.
g_edges (list): A list of graph edges.
group_node_id (str): The id of the group node.
Returns:
@ -323,8 +315,7 @@ def sort_up_to_vertex(
def has_cycle(vertex_ids: list[str], edges: list[tuple[str, str]]) -> bool:
"""
Determines whether a directed graph represented by a list of vertices and edges contains a cycle.
"""Determines whether a directed graph represented by a list of vertices and edges contains a cycle.
Args:
vertex_ids (list[str]): A list of vertex IDs.
@ -360,8 +351,7 @@ def has_cycle(vertex_ids: list[str], edges: list[tuple[str, str]]) -> bool:
def find_cycle_edge(entry_point: str, edges: list[tuple[str, str]]) -> tuple[str, str]:
"""
Find the edge that causes a cycle in a directed graph starting from a given entry point.
"""Find the edge that causes a cycle in a directed graph starting from a given entry point.
Args:
entry_point (str): The vertex ID from which to start the search.
@ -398,8 +388,7 @@ def find_cycle_edge(entry_point: str, edges: list[tuple[str, str]]) -> tuple[str
def find_all_cycle_edges(entry_point: str, edges: list[tuple[str, str]]) -> list[tuple[str, str]]:
"""
Find all edges that cause cycles in a directed graph starting from a given entry point.
"""Find all edges that cause cycles in a directed graph starting from a given entry point.
Args:
entry_point (str): The vertex ID from which to start the search.

View file

@ -6,8 +6,7 @@ from pydantic.fields import FieldInfo
def __validate_method(method: Callable) -> None:
"""
Validates a method by checking if it has the required attributes.
"""Validates a method by checking if it has the required attributes.
This function ensures that the given method belongs to a class with the necessary
structure for output handling. It checks for the presence of a __self__ attribute
@ -38,8 +37,7 @@ def __validate_method(method: Callable) -> None:
def build_output_getter(method: Callable, validate: bool = True) -> Callable:
"""
Builds an output getter function for a given method in a graph component.
"""Builds an output getter function for a given method in a graph component.
This function creates a new callable that, when invoked, retrieves the output
of the specified method using the get_output_by_method of the method's class.
@ -91,8 +89,7 @@ def build_output_getter(method: Callable, validate: bool = True) -> Callable:
def build_output_setter(method: Callable, validate: bool = True) -> Callable:
"""
Build an output setter function for a given method in a graph component.
"""Build an output setter function for a given method in a graph component.
This function creates a new callable that, when invoked, sets the output
of the specified method using the get_output_by_method of the method's class.
@ -140,8 +137,7 @@ def build_output_setter(method: Callable, validate: bool = True) -> Callable:
def create_state_model(model_name: str = "State", validate: bool = True, **kwargs) -> type:
"""
Create a dynamic Pydantic state model based on the provided keyword arguments.
"""Create a dynamic Pydantic state model based on the provided keyword arguments.
This function generates a Pydantic model class with fields corresponding to the
provided keyword arguments. It can handle various types of field definitions,

View file

@ -69,9 +69,11 @@ def flatten_list(list_of_lists: list[list | Any]) -> list:
def serialize_field(value):
"""Unified serialization function for handling both BaseModel and Document types,
including handling lists of these types."""
"""Serialize field.
Unified serialization function for handling both BaseModel and Document types,
including handling lists of these types.
"""
if isinstance(value, list | tuple):
return [serialize_field(v) for v in value]
if isinstance(value, Document):
@ -121,9 +123,7 @@ def post_process_raw(raw, artifact_type: str):
def _vertex_to_primitive_dict(target: Vertex) -> dict:
"""
Cleans the parameters of the target vertex.
"""
"""Cleans the parameters of the target vertex."""
# Removes all keys that the values aren't python types like str, int, bool, etc.
params = {
key: value for key, value in target.params.items() if isinstance(value, str | int | bool | float | list | dict)

View file

@ -438,11 +438,12 @@ class Vertex:
self._raw_params = params.copy()
def update_raw_params(self, new_params: Mapping[str, str | list[str]], overwrite: bool = False):
"""
Update the raw parameters of the vertex with the given new parameters.
"""Update the raw parameters of the vertex with the given new parameters.
Args:
new_params (Dict[str, Any]): The new parameters to update.
overwrite (bool, optional): Whether to overwrite the existing parameters.
Defaults to False.
Raises:
ValueError: If any key in new_params is not found in self._raw_params.
@ -461,9 +462,7 @@ class Vertex:
self.updated_raw_params = True
def has_cycle_edges(self):
"""
Checks if the vertex has any cycle edges.
"""
"""Checks if the vertex has any cycle edges."""
return self._has_cycle_edges
async def instantiate_component(self, user_id=None):
@ -479,9 +478,7 @@ class Vertex:
user_id=None,
event_manager: EventManager | None = None,
):
"""
Initiate the build process.
"""
"""Initiate the build process."""
logger.debug(f"Building {self.display_name}")
await self._build_each_vertex_in_params_dict(user_id)
@ -509,8 +506,7 @@ class Vertex:
self._built = True
def extract_messages_from_artifacts(self, artifacts: dict[str, Any]) -> list[dict]:
"""
Extracts messages from the artifacts.
"""Extracts messages from the artifacts.
Args:
artifacts (Dict[str, Any]): The artifacts to extract messages from.
@ -567,9 +563,7 @@ class Vertex:
self.set_result(result_dict)
async def _build_each_vertex_in_params_dict(self, user_id=None):
"""
Iterates over each vertex in the params dictionary and builds it.
"""
"""Iterates over each vertex in the params dictionary and builds it."""
for key, value in self._raw_params.items():
if self._is_vertex(value):
if value == self:
@ -594,9 +588,7 @@ class Vertex:
key,
vertices_dict: dict[str, Vertex],
):
"""
Iterates over a dictionary of vertices, builds each and updates the params dictionary.
"""
"""Iterates over a dictionary of vertices, builds each and updates the params dictionary."""
for sub_key, value in vertices_dict.items():
if not self._is_vertex(value):
self.params[key][sub_key] = value
@ -605,20 +597,15 @@ class Vertex:
self.params[key][sub_key] = result
def _is_vertex(self, value):
"""
Checks if the provided value is an instance of Vertex.
"""
"""Checks if the provided value is an instance of Vertex."""
return isinstance(value, Vertex)
def _is_list_of_vertices(self, value):
"""
Checks if the provided value is a list of Vertex instances.
"""
"""Checks if the provided value is a list of Vertex instances."""
return all(self._is_vertex(vertex) for vertex in value)
async def get_result(self, requester: Vertex, target_handle_name: str | None = None) -> Any:
"""
Retrieves the result of the vertex.
"""Retrieves the result of the vertex.
This is a read-only method so it raises an error if the vertex has not been built yet.
@ -636,8 +623,7 @@ class Vertex:
task.add_done_callback(self.log_transaction_tasks.discard)
async def _get_result(self, requester: Vertex, target_handle_name: str | None = None) -> Any:
"""
Retrieves the result of the built component.
"""Retrieves the result of the built component.
If the component has not been built yet, a ValueError is raised.
@ -657,10 +643,7 @@ class Vertex:
return result
async def _build_vertex_and_update_params(self, key, vertex: Vertex):
"""
Builds a given vertex and updates the params dictionary accordingly.
"""
"""Builds a given vertex and updates the params dictionary accordingly."""
result = await vertex.get_result(self, target_handle_name=key)
self._handle_func(key, result)
if isinstance(result, list):
@ -672,9 +655,7 @@ class Vertex:
key,
vertices: list[Vertex],
):
"""
Iterates over a list of vertices, builds each and updates the params dictionary.
"""
"""Iterates over a list of vertices, builds each and updates the params dictionary."""
self.params[key] = []
for vertex in vertices:
result = await vertex.get_result(self, target_handle_name=key)
@ -700,9 +681,7 @@ class Vertex:
raise ValueError(msg) from e
def _handle_func(self, key, result):
"""
Handles 'func' key by checking if the result is a function and setting it as coroutine.
"""
"""Handles 'func' key by checking if the result is a function and setting it as coroutine."""
if key == "func":
if not isinstance(result, types.FunctionType):
if hasattr(result, "run"):
@ -715,9 +694,7 @@ class Vertex:
self.params["coroutine"] = sync_to_async(result)
def _extend_params_list_with_result(self, key, result):
"""
Extends a list in the params dictionary with the given result if it exists.
"""
"""Extends a list in the params dictionary with the given result if it exists."""
if isinstance(self.params[key], list):
self.params[key].extend(result)
@ -741,9 +718,7 @@ class Vertex:
raise ComponentBuildException(msg, tb) from exc
def _update_built_object_and_artifacts(self, result: Any | tuple[Any, dict] | tuple[Component, Any, dict]):
"""
Updates the built object and its artifacts.
"""
"""Updates the built object and its artifacts."""
if isinstance(result, tuple):
if len(result) == 2: # noqa: PLR2004
self._built_object, self.artifacts = result
@ -759,9 +734,7 @@ class Vertex:
self._built_object = result
def _validate_built_object(self):
"""
Checks if the built object is None and raises a ValueError if so.
"""
"""Checks if the built object is None and raises a ValueError if so."""
if isinstance(self._built_object, UnbuiltObject):
msg = f"{self.display_name}: {self._built_object_repr()}"
raise TypeError(msg)

View file

@ -58,9 +58,7 @@ class ComponentVertex(Vertex):
return None
def _update_built_object_and_artifacts(self, result):
"""
Updates the built object and its artifacts.
"""
"""Updates the built object and its artifacts."""
if isinstance(result, tuple):
if len(result) == 2: # noqa: PLR2004
self._built_object, self.artifacts = result
@ -77,8 +75,7 @@ class ComponentVertex(Vertex):
self.add_result(key, value)
def get_edge_with_target(self, target_id: str) -> Generator[CycleEdge, None, None]:
"""
Get the edge with the target id.
"""Get the edge with the target id.
Args:
target_id: The target id of the edge.
@ -91,8 +88,7 @@ class ComponentVertex(Vertex):
yield edge
async def _get_result(self, requester: Vertex, target_handle_name: str | None = None) -> Any:
"""
Retrieves the result of the built component.
"""Retrieves the result of the built component.
If the component has not been built yet, a ValueError is raised.
@ -151,8 +147,7 @@ class ComponentVertex(Vertex):
return result
def extract_messages_from_artifacts(self, artifacts: dict[str, Any]) -> list[dict]:
"""
Extracts messages from the artifacts.
"""Extracts messages from the artifacts.
Args:
artifacts (Dict[str, Any]): The artifacts to extract messages from.
@ -236,8 +231,7 @@ class InterfaceVertex(ComponentVertex):
return super()._built_object_repr()
def _process_chat_component(self):
"""
Process the chat component and return the message.
"""Process the chat component and return the message.
This method processes the chat component by extracting the necessary parameters
such as sender, sender_name, and message from the `params` dictionary. It then
@ -324,8 +318,7 @@ class InterfaceVertex(ComponentVertex):
return message
def _process_data_component(self):
"""
Process the record component of the vertex.
"""Process the record component of the vertex.
If the built object is an instance of `Data`, it calls the `model_dump` method
and assigns the result to the `artifacts` attribute.

View file

@ -7,9 +7,7 @@ if TYPE_CHECKING:
def build_clean_params(target: Vertex) -> dict:
"""
Cleans the parameters of the target vertex.
"""
"""Cleans the parameters of the target vertex."""
# Removes all keys that the values aren't python types like str, int, bool, etc.
params = {
key: value for key, value in target.params.items() if isinstance(value, str | int | bool | float | list | dict)

View file

@ -5,8 +5,7 @@ from langflow.schema.message import Message
def docs_to_data(documents: list[Document]) -> list[Data]:
"""
Converts a list of Documents to a list of Data.
"""Converts a list of Documents to a list of Data.
Args:
documents (list[Document]): The list of Documents to convert.
@ -18,11 +17,12 @@ def docs_to_data(documents: list[Document]) -> list[Data]:
def data_to_text(template: str, data: Data | list[Data], sep: str = "\n") -> str:
"""
Converts a list of Data to a list of texts.
"""Converts a list of Data to a list of texts.
Args:
template (str): The template to use for the conversion.
data (list[Data]): The list of Data to convert.
sep (str): The separator used to join the data.
Returns:
list[str]: The converted list of texts.
@ -41,10 +41,10 @@ def data_to_text(template: str, data: Data | list[Data], sep: str = "\n") -> str
def messages_to_text(template: str, messages: Message | list[Message]) -> str:
"""
Converts a list of Messages to a list of texts.
"""Converts a list of Messages to a list of texts.
Args:
template (str): The template to use for the conversion.
messages (list[Message]): The list of Messages to convert.
Returns:

View file

@ -125,12 +125,12 @@ async def run_flow(
def generate_function_for_flow(
inputs: list[Vertex], flow_id: str, user_id: str | UUID | None
) -> Callable[..., Awaitable[Any]]:
"""
Generate a dynamic flow function based on the given inputs and flow ID.
"""Generate a dynamic flow function based on the given inputs and flow ID.
Args:
inputs (List[Vertex]): The list of input vertices for the flow.
flow_id (str): The ID of the flow.
user_id (str | UUID | None): The user ID associated with the flow.
Returns:
Coroutine: The dynamic flow function.
@ -201,12 +201,12 @@ async def flow_function({func_args}):
def build_function_and_schema(
flow_data: Data, graph: Graph, user_id: str | UUID | None
) -> tuple[Callable[..., Awaitable[Any]], type[BaseModel]]:
"""
Builds a dynamic function and schema for a given flow.
"""Builds a dynamic function and schema for a given flow.
Args:
flow_data (Data): The flow record containing information about the flow.
graph (Graph): The graph representing the flow.
user_id (str): The user ID associated with the flow.
Returns:
Tuple[Callable, BaseModel]: A tuple containing the dynamic function and the schema.
@ -219,8 +219,7 @@ def build_function_and_schema(
def get_flow_inputs(graph: Graph) -> list[Vertex]:
"""
Retrieves the flow inputs from the given graph.
"""Retrieves the flow inputs from the given graph.
Args:
graph (Graph): The graph object representing the flow.
@ -232,8 +231,7 @@ def get_flow_inputs(graph: Graph) -> list[Vertex]:
def build_schema_from_inputs(name: str, inputs: list[Vertex]) -> type[BaseModel]:
"""
Builds a schema from the given inputs.
"""Builds a schema from the given inputs.
Args:
name (str): The name of the schema.
@ -253,8 +251,7 @@ def build_schema_from_inputs(name: str, inputs: list[Vertex]) -> type[BaseModel]
def get_arg_names(inputs: list[Vertex]) -> list[dict[str, str]]:
"""
Returns a list of dictionaries containing the component name and its corresponding argument name.
"""Returns a list of dictionaries containing the component name and its corresponding argument name.
Args:
inputs (List[Vertex]): A list of Vertex objects representing the inputs.

View file

@ -518,8 +518,7 @@ def _is_valid_uuid(val):
def load_flows_from_directory():
"""
On langflow startup, this loads all flows from the directory specified in the settings.
"""On langflow startup, this loads all flows from the directory specified in the settings.
All flows are uploaded into the default folder for the superuser.
Note that this feature currently only works if AUTO_LOGIN is enabled in the settings.

View file

@ -49,8 +49,7 @@ class TableInput(BaseInputMixin, MetadataTraceMixin, TableMixin, ListableInputMi
class HandleInput(BaseInputMixin, ListableInputMixin, MetadataTraceMixin):
"""
Represents an Input that has a Handle to a specific type (e.g. BaseLanguageModel, BaseRetriever, etc.)
"""Represents an Input that has a Handle to a specific type (e.g. BaseLanguageModel, BaseRetriever, etc.).
This class inherits from the `BaseInputMixin` and `ListableInputMixin` classes.
@ -64,8 +63,7 @@ class HandleInput(BaseInputMixin, ListableInputMixin, MetadataTraceMixin):
class DataInput(HandleInput, InputTraceMixin, ListableInputMixin):
"""
Represents an Input that has a Handle that receives a Data object.
"""Represents an Input that has a Handle that receives a Data object.
Attributes:
input_types (list[str]): A list of input types supported by this data input.
@ -90,8 +88,7 @@ class StrInput(BaseInputMixin, ListableInputMixin, DatabaseLoadMixin, MetadataTr
@staticmethod
def _validate_value(v: Any, _info):
"""
Validates the given value and returns the processed value.
"""Validates the given value and returns the processed value.
Args:
v (Any): The value to be validated.
@ -121,8 +118,7 @@ class StrInput(BaseInputMixin, ListableInputMixin, DatabaseLoadMixin, MetadataTr
@field_validator("value")
@classmethod
def validate_value(cls, v: Any, _info):
"""
Validates the given value and returns the processed value.
"""Validates the given value and returns the processed value.
Args:
v (Any): The value to be validated.
@ -155,8 +151,7 @@ class MessageInput(StrInput, InputTraceMixin):
class MessageTextInput(StrInput, MetadataTraceMixin, InputTraceMixin):
"""
Represents a text input component for the Langflow system.
"""Represents a text input component for the Langflow system.
This component is used to handle text inputs in the Langflow system.
It provides methods for validating and processing text values.
@ -170,8 +165,7 @@ class MessageTextInput(StrInput, MetadataTraceMixin, InputTraceMixin):
@staticmethod
def _validate_value(v: Any, _info):
"""
Validates the given value and returns the processed value.
"""Validates the given value and returns the processed value.
Args:
v (Any): The value to be validated.
@ -211,8 +205,7 @@ class MessageTextInput(StrInput, MetadataTraceMixin, InputTraceMixin):
class MultilineInput(MessageTextInput, MultilineMixin, InputTraceMixin):
"""
Represents a multiline input field.
"""Represents a multiline input field.
Attributes:
field_type (SerializableFieldTypes): The type of the field. Defaults to FieldTypes.TEXT.
@ -224,8 +217,7 @@ class MultilineInput(MessageTextInput, MultilineMixin, InputTraceMixin):
class MultilineSecretInput(MessageTextInput, MultilineMixin, InputTraceMixin):
"""
Represents a multiline input field.
"""Represents a multiline input field.
Attributes:
field_type (SerializableFieldTypes): The type of the field. Defaults to FieldTypes.TEXT.
@ -238,8 +230,7 @@ class MultilineSecretInput(MessageTextInput, MultilineMixin, InputTraceMixin):
class SecretStrInput(BaseInputMixin, DatabaseLoadMixin):
"""
Represents a field with password field type.
"""Represents a field with password field type.
This class inherits from `BaseInputMixin` and `DatabaseLoadMixin`.
@ -257,8 +248,7 @@ class SecretStrInput(BaseInputMixin, DatabaseLoadMixin):
@field_validator("value")
@classmethod
def validate_value(cls, v: Any, _info):
"""
Validates the given value and returns the processed value.
"""Validates the given value and returns the processed value.
Args:
v (Any): The value to be validated.
@ -298,8 +288,7 @@ class SecretStrInput(BaseInputMixin, DatabaseLoadMixin):
class IntInput(BaseInputMixin, ListableInputMixin, RangeMixin, MetadataTraceMixin):
"""
Represents an integer field.
"""Represents an integer field.
This class represents an integer input and provides functionality for handling integer values.
It inherits from the `BaseInputMixin`, `ListableInputMixin`, and `RangeMixin` classes.
@ -313,8 +302,7 @@ class IntInput(BaseInputMixin, ListableInputMixin, RangeMixin, MetadataTraceMixi
@field_validator("value")
@classmethod
def validate_value(cls, v: Any, _info):
"""
Validates the given value and returns the processed value.
"""Validates the given value and returns the processed value.
Args:
v (Any): The value to be validated.
@ -326,7 +314,6 @@ class IntInput(BaseInputMixin, ListableInputMixin, RangeMixin, MetadataTraceMixi
Raises:
ValueError: If the value is not of a valid type or if the input is missing a required key.
"""
if v and not isinstance(v, int | float):
msg = f"Invalid value type {type(v)} for input {_info.data.get('name')}."
raise ValueError(msg)
@ -336,8 +323,7 @@ class IntInput(BaseInputMixin, ListableInputMixin, RangeMixin, MetadataTraceMixi
class FloatInput(BaseInputMixin, ListableInputMixin, RangeMixin, MetadataTraceMixin):
"""
Represents a float field.
"""Represents a float field.
This class represents a float input and provides functionality for handling float values.
It inherits from the `BaseInputMixin`, `ListableInputMixin`, and `RangeMixin` classes.
@ -351,8 +337,7 @@ class FloatInput(BaseInputMixin, ListableInputMixin, RangeMixin, MetadataTraceMi
@field_validator("value")
@classmethod
def validate_value(cls, v: Any, _info):
"""
Validates the given value and returns the processed value.
"""Validates the given value and returns the processed value.
Args:
v (Any): The value to be validated.
@ -373,8 +358,7 @@ class FloatInput(BaseInputMixin, ListableInputMixin, RangeMixin, MetadataTraceMi
class BoolInput(BaseInputMixin, ListableInputMixin, MetadataTraceMixin):
"""
Represents a boolean field.
"""Represents a boolean field.
This class represents a boolean input and provides functionality for handling boolean values.
It inherits from the `BaseInputMixin` and `ListableInputMixin` classes.
@ -389,8 +373,7 @@ class BoolInput(BaseInputMixin, ListableInputMixin, MetadataTraceMixin):
class NestedDictInput(BaseInputMixin, ListableInputMixin, MetadataTraceMixin, InputTraceMixin):
"""
Represents a nested dictionary field.
"""Represents a nested dictionary field.
This class represents a nested dictionary input and provides functionality for handling dictionary values.
It inherits from the `BaseInputMixin` and `ListableInputMixin` classes.
@ -405,8 +388,7 @@ class NestedDictInput(BaseInputMixin, ListableInputMixin, MetadataTraceMixin, In
class DictInput(BaseInputMixin, ListableInputMixin, InputTraceMixin):
"""
Represents a dictionary field.
"""Represents a dictionary field.
This class represents a dictionary input and provides functionality for handling dictionary values.
It inherits from the `BaseInputMixin` and `ListableInputMixin` classes.
@ -421,8 +403,7 @@ class DictInput(BaseInputMixin, ListableInputMixin, InputTraceMixin):
class DropdownInput(BaseInputMixin, DropDownMixin, MetadataTraceMixin):
"""
Represents a dropdown input field.
"""Represents a dropdown input field.
This class represents a dropdown input field and provides functionality for handling dropdown values.
It inherits from the `BaseInputMixin` and `DropDownMixin` classes.
@ -439,8 +420,7 @@ class DropdownInput(BaseInputMixin, DropDownMixin, MetadataTraceMixin):
class MultiselectInput(BaseInputMixin, ListableInputMixin, DropDownMixin, MetadataTraceMixin):
"""
Represents a multiselect input field.
"""Represents a multiselect input field.
This class represents a multiselect input field and provides functionality for handling multiselect values.
It inherits from the `BaseInputMixin`, `ListableInputMixin` and `DropDownMixin` classes.
@ -471,8 +451,7 @@ class MultiselectInput(BaseInputMixin, ListableInputMixin, DropDownMixin, Metada
class FileInput(BaseInputMixin, ListableInputMixin, FileMixin, MetadataTraceMixin):
"""
Represents a file field.
"""Represents a file field.
This class represents a file input and provides functionality for handling file values.
It inherits from the `BaseInputMixin`, `ListableInputMixin`, and `FileMixin` classes.

View file

@ -5,7 +5,7 @@ from typing import Any
def import_module(module_path: str) -> Any:
"""Import module from module path"""
"""Import module from module path."""
if "from" not in module_path:
# Import the module using the module path
return importlib.import_module(module_path)
@ -19,7 +19,7 @@ def import_module(module_path: str) -> Any:
def import_class(class_path: str) -> Any:
"""Import class from class path"""
"""Import class from class path."""
module_path, class_name = class_path.rsplit(".", 1)
module = import_module(module_path)
return getattr(module, class_name)

View file

@ -25,8 +25,7 @@ async def instantiate_class(
user_id=None,
event_manager: EventManager | None = None,
) -> Any:
"""Instantiate class from module type and key, and params"""
"""Instantiate class from module type and key, and params."""
vertex_type = vertex.vertex_type
base_type = vertex.base_type
logger.debug(f"Instantiating {vertex_type} of type {base_type}")
@ -77,7 +76,7 @@ def get_params(vertex_params):
def convert_params_to_sets(params):
"""Convert certain params to sets"""
"""Convert certain params to sets."""
if "allowed_special" in params:
params["allowed_special"] = set(params["allowed_special"])
if "disallowed_special" in params:

View file

@ -2,7 +2,8 @@ from loguru import logger
def get_memory_key(langchain_object):
"""
"""Get the memory key from the LangChain object's memory attribute.
Given a LangChain object, this function retrieves the current memory key from the object's memory attribute.
It then checks if the key exists in a dictionary of known memory keys and returns the corresponding key,
or None if the current key is not recognized.
@ -19,7 +20,8 @@ def get_memory_key(langchain_object):
def update_memory_keys(langchain_object, possible_new_mem_key):
"""
"""Update the memory keys in the LangChain object's memory attribute.
Given a LangChain object and a possible new memory key, this function updates the input and output keys in the
object's memory attribute to exclude the current memory key and the possible new key. It then sets the memory key
to the possible new key.

View file

@ -20,8 +20,7 @@ def load_flow_from_json(
cache: str | None = None,
disable_logs: bool | None = True,
) -> Graph:
"""
Load a flow graph from a JSON file or a JSON object.
"""Load a flow graph from a JSON file or a JSON object.
Args:
flow (Union[Path, str, dict]): The flow to load. It can be a file path (str or Path object)
@ -84,8 +83,7 @@ def run_flow_from_json(
disable_logs: bool | None = True,
fallback_to_env_vars: bool = False,
) -> list[RunOutputs]:
"""
Run a flow from a JSON file or dictionary.
"""Run a flow from a JSON file or dictionary.
Args:
flow (Union[Path, str, dict]): The path to the JSON file or the JSON dictionary representing the flow.

View file

@ -10,8 +10,7 @@ class UploadError(Exception):
def upload(file_path: str, host: str, flow_id: str):
"""
Upload a file to Langflow and return the file path.
"""Upload a file to Langflow and return the file path.
Args:
file_path (str): The path to the file to be uploaded.
@ -39,8 +38,7 @@ def upload(file_path: str, host: str, flow_id: str):
def upload_file(file_path: str, host: str, flow_id: str, components: list[str], tweaks: dict | None = None):
"""
Upload a file to Langflow and return the file path.
"""Upload a file to Langflow and return the file path.
Args:
file_path (str): The path to the file to be uploaded.

View file

@ -23,10 +23,10 @@ class SizedLogBuffer:
self,
max_readers: int = 20, # max number of concurrent readers for the buffer
):
"""
a buffer for storing log messages for the log retrieval API
the buffer can be overwritten by an env variable LANGFLOW_LOG_RETRIEVER_BUFFER_SIZE
because the logger is initialized before the settings_service are loaded
"""A buffer for storing log messages for the log retrieval API.
The buffer can be overwritten by an env variable LANGFLOW_LOG_RETRIEVER_BUFFER_SIZE
because the logger is initialized before the settings_service are loaded.
"""
self.max: int = 0
env_buffer_size = os.getenv("LANGFLOW_LOG_RETRIEVER_BUFFER_SIZE", "0")
@ -217,9 +217,9 @@ def setup_gunicorn_logger():
class InterceptHandler(logging.Handler):
"""
Default handler from examples in loguru documentaion.
See https://loguru.readthedocs.io/en/stable/overview.html#entirely-compatible-with-standard-logging
"""Default handler from examples in loguru documentation.
See https://loguru.readthedocs.io/en/stable/overview.html#entirely-compatible-with-standard-logging.
"""
def emit(self, record):

View file

@ -239,11 +239,11 @@ def setup_sentry(app: FastAPI):
def setup_static_files(app: FastAPI, static_files_dir: Path):
"""
Setup the static files directory.
"""Setup the static files directory.
Args:
app (FastAPI): FastAPI app.
path (str): Path to the static files directory.
static_files_dir (str): Path to the static files directory.
"""
app.mount(
"/",

View file

@ -21,14 +21,15 @@ def get_messages(
flow_id: UUID | None = None,
limit: int | None = None,
) -> list[Message]:
"""
Retrieves messages from the monitor service based on the provided filters.
"""Retrieves messages from the monitor service based on the provided filters.
Args:
sender (Optional[str]): The sender of the messages (e.g., "Machine" or "User")
sender_name (Optional[str]): The name of the sender.
session_id (Optional[str]): The session ID associated with the messages.
order_by (Optional[str]): The field to order the messages by. Defaults to "timestamp".
order (Optional[str]): The order in which to retrieve the messages. Defaults to "DESC".
flow_id (Optional[UUID]): The flow ID associated with the messages.
limit (Optional[int]): The maximum number of messages to retrieve.
Returns:
@ -54,10 +55,7 @@ def get_messages(
def add_messages(messages: Message | list[Message], flow_id: str | None = None):
"""
Add a message to the monitor service.
"""
"""Add a message to the monitor service."""
if not isinstance(messages, list):
messages = [messages]
@ -89,8 +87,7 @@ def add_messagetables(messages: list[MessageTable], session: Session):
def delete_messages(session_id: str):
"""
Delete messages from the monitor service based on the provided session ID.
"""Delete messages from the monitor service based on the provided session ID.
Args:
session_id (str): The session ID associated with the messages to delete.
@ -107,8 +104,7 @@ def store_message(
message: Message,
flow_id: str | None = None,
) -> list[Message]:
"""
Stores a message in the memory.
"""Stores a message in the memory.
Args:
message (Message): The message to store.

View file

@ -12,7 +12,7 @@ if TYPE_CHECKING:
def setup_callbacks(sync, trace_id, **kwargs):
"""Setup callbacks for langchain object"""
"""Setup callbacks for langchain object."""
callbacks = []
plugin_service = get_plugins_service()
plugin_callbacks = plugin_service.get_callbacks(_id=trace_id)
@ -37,9 +37,7 @@ def get_langfuse_callback(trace_id):
def flush_langfuse_callback_if_present(callbacks: list[BaseCallbackHandler | CallbackHandler]):
"""
If langfuse callback is present, run callback.langfuse.flush()
"""
"""If langfuse callback is present, run callback.langfuse.flush()."""
for callback in callbacks:
if hasattr(callback, "langfuse") and hasattr(callback.langfuse, "flush"):
callback.langfuse.flush()

View file

@ -29,7 +29,7 @@ async def run_graph_internal(
inputs: list[InputValueRequest] | None = None,
outputs: list[str] | None = None,
) -> tuple[list[RunOutputs], str]:
"""Run the graph and generate the result"""
"""Run the graph and generate the result."""
inputs = inputs or []
session_id_str = flow_id if session_id is None else session_id
components = []
@ -66,8 +66,7 @@ def run_graph(
fallback_to_env_vars: bool = False,
output_component: str | None = None,
) -> list[RunOutputs]:
"""
Runs the given Langflow Graph with the specified input and returns the outputs.
"""Runs the given Langflow Graph with the specified input and returns the outputs.
Args:
graph (Graph): The graph to be executed.
@ -75,6 +74,8 @@ def run_graph(
input_type (str): The type of the input value.
output_type (str): The type of the desired output.
session_id (str | None, optional): The session ID to be used for the flow. Defaults to None.
fallback_to_env_vars (bool, optional): Whether to fallback to environment variables.
Defaults to False.
output_component (Optional[str], optional): The specific output component to retrieve. Defaults to None.
Returns:
@ -157,8 +158,7 @@ def apply_tweaks_on_vertex(vertex: Vertex, node_tweaks: dict[str, Any]) -> None:
def process_tweaks(
graph_data: dict[str, Any], tweaks: Tweaks | dict[str, dict[str, Any]], stream: bool = False
) -> dict[str, Any]:
"""
This function is used to tweak the graph data using the node id and the tweaks dict.
"""This function is used to tweak the graph data using the node id and the tweaks dict.
:param graph_data: The dictionary containing the graph data. It must contain a 'data' key with
'nodes' as its child or directly contain 'nodes' key. Each node should have an 'id' and 'data'.

View file

@ -15,8 +15,7 @@ if TYPE_CHECKING:
class Data(BaseModel):
"""
Represents a record with text and optional data.
"""Represents a record with text and optional data.
Attributes:
data (dict, optional): Additional data associated with the record.
@ -45,8 +44,7 @@ class Data(BaseModel):
return {k: v.to_json() if hasattr(v, "to_json") else v for k, v in self.data.items()}
def get_text(self):
"""
Retrieves the text value from the data dictionary.
"""Retrieves the text value from the data dictionary.
If the text key is present in the data dictionary, the corresponding value is returned.
Otherwise, the default value is returned.
@ -58,8 +56,7 @@ class Data(BaseModel):
@classmethod
def from_document(cls, document: Document) -> "Data":
"""
Converts a Document to a Data.
"""Converts a Document to a Data.
Args:
document (Document): The Document to convert.
@ -73,8 +70,7 @@ class Data(BaseModel):
@classmethod
def from_lc_message(cls, message: BaseMessage) -> "Data":
"""
Converts a BaseMessage to a Data.
"""Converts a BaseMessage to a Data.
Args:
message (BaseMessage): The BaseMessage to convert.
@ -87,7 +83,8 @@ class Data(BaseModel):
return cls(data=data, text_key="text")
def __add__(self, other: "Data") -> "Data":
"""
"""Combines the data of two data by attempting to add values for overlapping keys.
Combines the data of two data by attempting to add values for overlapping keys
for all types that support the addition operation. Falls back to the value from 'other'
record when addition is not supported.
@ -108,8 +105,7 @@ class Data(BaseModel):
return Data(data=combined_data)
def to_lc_document(self) -> Document:
"""
Converts the Data to a Document.
"""Converts the Data to a Document.
Returns:
Document: The converted Document.
@ -121,8 +117,7 @@ class Data(BaseModel):
def to_lc_message(
self,
) -> BaseMessage:
"""
Converts the Data to a BaseMessage.
"""Converts the Data to a BaseMessage.
Returns:
BaseMessage: The converted BaseMessage.
@ -158,9 +153,7 @@ class Data(BaseModel):
return AIMessage(content=text)
def __getattr__(self, key):
"""
Allows attribute-like access to the data dictionary.
"""
"""Allows attribute-like access to the data dictionary."""
try:
if key.startswith("__"):
return self.__getattribute__(key)
@ -173,8 +166,9 @@ class Data(BaseModel):
raise AttributeError(msg) from e
def __setattr__(self, key, value):
"""
Allows attribute-like setting of values in the data dictionary,
"""Set attribute-like values in the data dictionary.
Allows attribute-like setting of values in the data dictionary.
while still allowing direct assignment to class attributes.
"""
if key in {"data", "text_key"} or key.startswith("_"):
@ -186,18 +180,14 @@ class Data(BaseModel):
self.data[key] = value
def __delattr__(self, key):
"""
Allows attribute-like deletion from the data dictionary.
"""
"""Allows attribute-like deletion from the data dictionary."""
if key in {"data", "text_key"} or key.startswith("_"):
super().__delattr__(key)
else:
del self.data[key]
def __deepcopy__(self, memo):
"""
Custom deepcopy implementation to handle copying of the Data object.
"""
"""Custom deepcopy implementation to handle copying of the Data object."""
# Create a new Data object with a deep copy of the data dictionary
return Data(data=copy.deepcopy(self.data, memo), text_key=self.text_key, default_value=self.default_value)

View file

@ -1,6 +1,6 @@
class dotdict(dict):
"""
dotdict allows accessing dictionary elements using dot notation (e.g., dict.key instead of dict['key']).
"""dotdict allows accessing dictionary elements using dot notation (e.g., dict.key instead of dict['key']).
It automatically converts nested dictionaries into dotdict instances, enabling dot notation on them as well.
Note:
@ -11,8 +11,7 @@ class dotdict(dict):
"""
def __getattr__(self, attr):
"""
Override dot access to behave like dictionary lookup. Automatically convert nested dicts to dotdicts.
"""Override dot access to behave like dictionary lookup. Automatically convert nested dicts to dotdicts.
Args:
attr (str): Attribute to access.
@ -35,8 +34,7 @@ class dotdict(dict):
return value
def __setattr__(self, key, value):
"""
Override attribute setting to work as dictionary item assignment.
"""Override attribute setting to work as dictionary item assignment.
Args:
key (str): The key under which to store the value.
@ -47,8 +45,7 @@ class dotdict(dict):
self[key] = value
def __delattr__(self, key):
"""
Override attribute deletion to work as dictionary item deletion.
"""Override attribute deletion to work as dictionary item deletion.
Args:
key (str): The key of the item to delete from the dictionary.
@ -63,8 +60,7 @@ class dotdict(dict):
raise AttributeError(msg) from e
def __missing__(self, key):
"""
Handle missing keys by returning an empty dotdict. This allows chaining access without raising KeyError.
"""Handle missing keys by returning an empty dotdict. This allows chaining access without raising KeyError.
Args:
key: The missing key.

View file

@ -94,8 +94,7 @@ class Message(Data):
def to_lc_message(
self,
) -> BaseMessage:
"""
Converts the Data to a BaseMessage.
"""Converts the Data to a BaseMessage.
Returns:
BaseMessage: The converted BaseMessage.
@ -139,16 +138,14 @@ class Message(Data):
@classmethod
def from_data(cls, data: Data) -> Message:
"""
Converts a BaseMessage to a Data.
"""Converts Data to a Message.
Args:
record (BaseMessage): The BaseMessage to convert.
data: The Data to convert.
Returns:
Data: The converted Data.
The converted Message.
"""
return cls(
text=data.text,
sender=data.sender,

View file

@ -18,7 +18,6 @@ class LangflowUvicornWorker(UvicornWorker):
- https://github.com/encode/uvicorn/issues/1116
- https://github.com/benoitc/gunicorn/issues/2604
"""
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGINT, self.handle_exit, signal.SIGINT, None)

View file

@ -7,7 +7,6 @@ class Service(ABC):
def get_schema(self):
"""Build a dictionary listing all methods, their parameters, types, return types and documentation."""
schema = {}
ignore = ["teardown", "set_ready"]
for method in dir(self):

View file

@ -10,19 +10,17 @@ AsyncLockType = TypeVar("AsyncLockType", bound=asyncio.Lock)
class CacheService(Service, Generic[LockType]):
"""
Abstract base class for a cache.
"""
"""Abstract base class for a cache."""
name = "cache_service"
@abc.abstractmethod
def get(self, key, lock: LockType | None = None):
"""
Retrieve an item from the cache.
"""Retrieve an item from the cache.
Args:
key: The key of the item to retrieve.
lock: A lock to use for the operation.
Returns:
The value associated with the key, or None if the key is not found.
@ -30,43 +28,40 @@ class CacheService(Service, Generic[LockType]):
@abc.abstractmethod
def set(self, key, value, lock: LockType | None = None):
"""
Add an item to the cache.
"""Add an item to the cache.
Args:
key: The key of the item.
value: The value to cache.
lock: A lock to use for the operation.
"""
@abc.abstractmethod
def upsert(self, key, value, lock: LockType | None = None):
"""
Add an item to the cache if it doesn't exist, or update it if it does.
"""Add an item to the cache if it doesn't exist, or update it if it does.
Args:
key: The key of the item.
value: The value to cache.
lock: A lock to use for the operation.
"""
@abc.abstractmethod
def delete(self, key, lock: LockType | None = None):
"""
Remove an item from the cache.
"""Remove an item from the cache.
Args:
key: The key of the item to remove.
lock: A lock to use for the operation.
"""
@abc.abstractmethod
def clear(self, lock: LockType | None = None):
"""
Clear all items from the cache.
"""
"""Clear all items from the cache."""
@abc.abstractmethod
def __contains__(self, key):
"""
Check if the key is in the cache.
"""Check if the key is in the cache.
Args:
key: The key of the item to check.
@ -77,8 +72,7 @@ class CacheService(Service, Generic[LockType]):
@abc.abstractmethod
def __getitem__(self, key):
"""
Retrieve an item from the cache using the square bracket notation.
"""Retrieve an item from the cache using the square bracket notation.
Args:
key: The key of the item to retrieve.
@ -86,8 +80,7 @@ class CacheService(Service, Generic[LockType]):
@abc.abstractmethod
def __setitem__(self, key, value):
"""
Add an item to the cache using the square bracket notation.
"""Add an item to the cache using the square bracket notation.
Args:
key: The key of the item.
@ -96,8 +89,7 @@ class CacheService(Service, Generic[LockType]):
@abc.abstractmethod
def __delitem__(self, key):
"""
Remove an item from the cache using the square bracket notation.
"""Remove an item from the cache using the square bracket notation.
Args:
key: The key of the item to remove.
@ -105,19 +97,17 @@ class CacheService(Service, Generic[LockType]):
class AsyncBaseCacheService(Service, Generic[AsyncLockType]):
"""
Abstract base class for a async cache.
"""
"""Abstract base class for a async cache."""
name = "cache_service"
@abc.abstractmethod
async def get(self, key, lock: AsyncLockType | None = None):
"""
Retrieve an item from the cache.
"""Retrieve an item from the cache.
Args:
key: The key of the item to retrieve.
lock: A lock to use for the operation.
Returns:
The value associated with the key, or None if the key is not found.
@ -125,43 +115,40 @@ class AsyncBaseCacheService(Service, Generic[AsyncLockType]):
@abc.abstractmethod
async def set(self, key, value, lock: AsyncLockType | None = None):
"""
Add an item to the cache.
"""Add an item to the cache.
Args:
key: The key of the item.
value: The value to cache.
lock: A lock to use for the operation.
"""
@abc.abstractmethod
async def upsert(self, key, value, lock: AsyncLockType | None = None):
"""
Add an item to the cache if it doesn't exist, or update it if it does.
"""Add an item to the cache if it doesn't exist, or update it if it does.
Args:
key: The key of the item.
value: The value to cache.
lock: A lock to use for the operation.
"""
@abc.abstractmethod
async def delete(self, key, lock: AsyncLockType | None = None):
"""
Remove an item from the cache.
"""Remove an item from the cache.
Args:
key: The key of the item to remove.
lock: A lock to use for the operation.
"""
@abc.abstractmethod
async def clear(self, lock: AsyncLockType | None = None):
"""
Clear all items from the cache.
"""
"""Clear all items from the cache."""
@abc.abstractmethod
def __contains__(self, key):
"""
Check if the key is in the cache.
"""Check if the key is in the cache.
Args:
key: The key of the item to check.

View file

@ -12,8 +12,7 @@ from langflow.services.cache.utils import CACHE_MISS
class ThreadingInMemoryCache(CacheService, Generic[LockType]):
"""
A simple in-memory cache using an OrderedDict.
"""A simple in-memory cache using an OrderedDict.
This cache supports setting a maximum size and expiration time for cached items.
When the cache is full, it uses a Least Recently Used (LRU) eviction policy.
@ -24,7 +23,6 @@ class ThreadingInMemoryCache(CacheService, Generic[LockType]):
expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour.
Example:
cache = InMemoryCache(max_size=3, expiration_time=5)
# setting cache values
@ -38,8 +36,7 @@ class ThreadingInMemoryCache(CacheService, Generic[LockType]):
"""
def __init__(self, max_size=None, expiration_time=60 * 60):
"""
Initialize a new InMemoryCache instance.
"""Initialize a new InMemoryCache instance.
Args:
max_size (int, optional): Maximum number of items to store in the cache.
@ -51,11 +48,11 @@ class ThreadingInMemoryCache(CacheService, Generic[LockType]):
self.expiration_time = expiration_time
def get(self, key, lock: Union[threading.Lock, None] = None): # noqa: UP007
"""
Retrieve an item from the cache.
"""Retrieve an item from the cache.
Args:
key: The key of the item to retrieve.
lock: A lock to use for the operation.
Returns:
The value associated with the key, or None if the key is not found or the item has expired.
@ -64,9 +61,7 @@ class ThreadingInMemoryCache(CacheService, Generic[LockType]):
return self._get_without_lock(key)
def _get_without_lock(self, key):
"""
Retrieve an item from the cache without acquiring the lock.
"""
"""Retrieve an item from the cache without acquiring the lock."""
if item := self._cache.get(key):
if self.expiration_time is None or time.time() - item["time"] < self.expiration_time:
# Move the key to the end to make it recently used
@ -77,14 +72,14 @@ class ThreadingInMemoryCache(CacheService, Generic[LockType]):
return None
def set(self, key, value, lock: Union[threading.Lock, None] = None): # noqa: UP007
"""
Add an item to the cache.
"""Add an item to the cache.
If the cache is full, the least recently used item is evicted.
Args:
key: The key of the item.
value: The value to cache.
lock: A lock to use for the operation.
"""
with lock or self._lock:
if key in self._cache:
@ -98,13 +93,14 @@ class ThreadingInMemoryCache(CacheService, Generic[LockType]):
self._cache[key] = {"value": value, "time": time.time()}
def upsert(self, key, value, lock: Union[threading.Lock, None] = None): # noqa: UP007
"""
Inserts or updates a value in the cache.
"""Inserts or updates a value in the cache.
If the existing value and the new value are both dictionaries, they are merged.
Args:
key: The key of the item.
value: The value to insert or update.
lock: A lock to use for the operation.
"""
with lock or self._lock:
existing_value = self._get_without_lock(key)
@ -115,13 +111,14 @@ class ThreadingInMemoryCache(CacheService, Generic[LockType]):
self.set(key, value)
def get_or_set(self, key, value, lock: Union[threading.Lock, None] = None): # noqa: UP007
"""
Retrieve an item from the cache. If the item does not exist,
set it with the provided value.
"""Retrieve an item from the cache.
If the item does not exist, set it with the provided value.
Args:
key: The key of the item.
value: The value to cache if the item doesn't exist.
lock: A lock to use for the operation.
Returns:
The cached value associated with the key.
@ -133,19 +130,11 @@ class ThreadingInMemoryCache(CacheService, Generic[LockType]):
return value
def delete(self, key, lock: Union[threading.Lock, None] = None): # noqa: UP007
"""
Remove an item from the cache.
Args:
key: The key of the item to remove.
"""
with lock or self._lock:
self._cache.pop(key, None)
def clear(self, lock: Union[threading.Lock, None] = None): # noqa: UP007
"""
Clear all items from the cache.
"""
"""Clear all items from the cache."""
with lock or self._lock:
self._cache.clear()
@ -175,8 +164,7 @@ class ThreadingInMemoryCache(CacheService, Generic[LockType]):
class RedisCache(AsyncBaseCacheService, Generic[LockType]):
"""
A Redis-based cache implementation.
"""A Redis-based cache implementation.
This cache supports setting an expiration time for cached items.
@ -184,7 +172,6 @@ class RedisCache(AsyncBaseCacheService, Generic[LockType]):
expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour.
Example:
cache = RedisCache(expiration_time=5)
# setting cache values
@ -198,15 +185,15 @@ class RedisCache(AsyncBaseCacheService, Generic[LockType]):
"""
def __init__(self, host="localhost", port=6379, db=0, url=None, expiration_time=60 * 60):
"""
Initialize a new RedisCache instance.
"""Initialize a new RedisCache instance.
Args:
host (str, optional): Redis host.
port (int, optional): Redis port.
db (int, optional): Redis DB.
url (str, optional): Redis URL.
expiration_time (int, optional): Time in seconds after which a
ached item expires. Default is 1 hour.
cached item expires. Default is 1 hour.
"""
try:
import redis
@ -228,9 +215,7 @@ class RedisCache(AsyncBaseCacheService, Generic[LockType]):
# check connection
def is_connected(self):
"""
Check if the Redis client is connected.
"""
"""Check if the Redis client is connected."""
import redis
try:
@ -241,28 +226,12 @@ class RedisCache(AsyncBaseCacheService, Generic[LockType]):
return True
async def get(self, key, lock=None):
"""
Retrieve an item from the cache.
Args:
key: The key of the item to retrieve.
Returns:
The value associated with the key, or None if the key is not found.
"""
if key is None:
return None
value = self._client.get(str(key))
return pickle.loads(value) if value else None
async def set(self, key, value, lock=None):
"""
Add an item to the cache.
Args:
key: The key of the item.
value: The value to cache.
"""
try:
if pickled := pickle.dumps(value):
result = self._client.setex(str(key), self.expiration_time, pickled)
@ -274,13 +243,14 @@ class RedisCache(AsyncBaseCacheService, Generic[LockType]):
raise TypeError(msg) from exc
async def upsert(self, key, value, lock=None):
"""
Inserts or updates a value in the cache.
"""Inserts or updates a value in the cache.
If the existing value and the new value are both dictionaries, they are merged.
Args:
key: The key of the item.
value: The value to insert or update.
lock: A lock to use for the operation.
"""
if key is None:
return
@ -292,18 +262,10 @@ class RedisCache(AsyncBaseCacheService, Generic[LockType]):
await self.set(key, value)
async def delete(self, key, lock=None):
"""
Remove an item from the cache.
Args:
key: The key of the item to remove.
"""
self._client.delete(key)
async def clear(self, lock=None):
"""
Clear all items from the cache.
"""
"""Clear all items from the cache."""
self._client.flushdb()
def __contains__(self, key):

View file

@ -78,12 +78,12 @@ def filter_json(json_data):
@create_cache_folder
def save_binary_file(content: str, file_name: str, accepted_types: list[str]) -> str:
"""
Save a binary file to the specified folder.
"""Save a binary file to the specified folder.
Args:
content: The content of the file as a bytes object.
file_name: The name of the file, including its extension.
accepted_types: A list of accepted file types.
Returns:
The path to the saved file.
@ -112,8 +112,7 @@ def save_binary_file(content: str, file_name: str, accepted_types: list[str]) ->
@create_cache_folder
def save_uploaded_file(file: UploadFile, folder_name):
"""
Save an uploaded file to the specified folder with a hash of its content as the file name.
"""Save an uploaded file to the specified folder with a hash of its content as the file name.
Args:
file: The uploaded file object.

View file

@ -65,8 +65,7 @@ class CacheService(Subject, Service):
@contextmanager
def set_client_id(self, client_id: str):
"""
Context manager to set the current client_id and associated cache.
"""Context manager to set the current client_id and associated cache.
Args:
client_id (str): The client identifier.
@ -81,13 +80,13 @@ class CacheService(Subject, Service):
self.current_cache = self._cache.get(self.current_client_id, {})
def add(self, name: str, obj: Any, obj_type: str, extension: str | None = None):
"""
Add an object to the current client's cache.
"""Add an object to the current client's cache.
Args:
name (str): The cache key.
obj (Any): The object to cache.
obj_type (str): The type of the object.
extension: The file extension of the object.
"""
object_extensions = {
"image": "png",
@ -102,8 +101,7 @@ class CacheService(Subject, Service):
self.notify()
def add_pandas(self, name: str, obj: Any):
"""
Add a pandas DataFrame or Series to the current client's cache.
"""Add a pandas DataFrame or Series to the current client's cache.
Args:
name (str): The cache key.
@ -116,12 +114,12 @@ class CacheService(Subject, Service):
raise TypeError(msg)
def add_image(self, name: str, obj: Any, extension: str = "png"):
"""
Add a PIL Image to the current client's cache.
"""Add a PIL Image to the current client's cache.
Args:
name (str): The cache key.
obj (Any): The PIL Image object.
extension: The file extension of the image.
"""
if isinstance(obj, Image.Image):
self.add(name, obj, "image", extension=extension)
@ -130,8 +128,7 @@ class CacheService(Subject, Service):
raise TypeError(msg)
def get(self, name: str):
"""
Get an object from the current client's cache.
"""Get an object from the current client's cache.
Args:
name (str): The cache key.
@ -142,8 +139,7 @@ class CacheService(Subject, Service):
return self.current_cache[name]
def get_last(self):
"""
Get the last added item in the current client's cache.
"""Get the last added item in the current client's cache.
Returns:
The last added item in the cache.

View file

@ -9,9 +9,7 @@ from langflow.services.deps import get_cache_service
class ChatService(Service):
"""
Service class for managing chat-related operations.
"""
"""Service class for managing chat-related operations."""
name = "chat_service"
@ -21,8 +19,7 @@ class ChatService(Service):
self.cache_service = get_cache_service()
def _get_lock(self, key: str):
"""
Retrieves the lock associated with the given key.
"""Retrieves the lock associated with the given key.
Args:
key (str): The key to retrieve the lock for.
@ -37,8 +34,7 @@ class ChatService(Service):
async def _perform_cache_operation(
self, operation: str, key: str, data: Any = None, lock: asyncio.Lock | None = None
):
"""
Perform a cache operation based on the given operation type.
"""Perform a cache operation based on the given operation type.
Args:
operation (str): The type of cache operation to perform. Possible values are "upsert", "get", or "delete".
@ -76,8 +72,7 @@ class ChatService(Service):
return None
async def set_cache(self, key: str, data: Any, lock: asyncio.Lock | None = None) -> bool:
"""
Set the cache for a client.
"""Set the cache for a client.
Args:
key (str): The cache key.
@ -95,8 +90,7 @@ class ChatService(Service):
return key in self.cache_service
async def get_cache(self, key: str, lock: asyncio.Lock | None = None) -> Any:
"""
Get the cache for a client.
"""Get the cache for a client.
Args:
key (str): The cache key.
@ -108,8 +102,7 @@ class ChatService(Service):
return await self._perform_cache_operation("get", key, lock=lock or self._get_lock(key))
async def clear_cache(self, key: str, lock: asyncio.Lock | None = None):
"""
Clear the cache for a client.
"""Clear the cache for a client.
Args:
key (str): The cache key.

View file

@ -9,7 +9,6 @@ from .model import Flow
def get_flow_by_id(session: Session = Depends(get_session), flow_id: str | None = None) -> Flow | None:
"""Get flow by id."""
if flow_id is None:
msg = "Flow id is required."
raise ValueError(msg)
@ -19,7 +18,6 @@ def get_flow_by_id(session: Session = Depends(get_session), flow_id: str | None
def get_webhook_component_in_flow(flow_data: dict):
"""Get webhook component in flow data."""
for node in flow_data.get("nodes", []):
if "Webhook" in node.get("id"):
return node

View file

@ -29,11 +29,12 @@ if TYPE_CHECKING:
def get_service(service_type: ServiceType, default=None):
"""
Retrieves the service instance for the given service type.
"""Retrieves the service instance for the given service type.
Args:
service_type (ServiceType): The type of service to retrieve.
default (ServiceFactory, optional): The default ServiceFactory to use if the service is not found.
Defaults to None.
Returns:
Any: The service instance.
@ -49,8 +50,7 @@ def get_service(service_type: ServiceType, default=None):
def get_telemetry_service() -> TelemetryService:
"""
Retrieves the TelemetryService instance from the service manager.
"""Retrieves the TelemetryService instance from the service manager.
Returns:
TelemetryService: The TelemetryService instance.
@ -61,8 +61,7 @@ def get_telemetry_service() -> TelemetryService:
def get_tracing_service() -> TracingService:
"""
Retrieves the TracingService instance from the service manager.
"""Retrieves the TracingService instance from the service manager.
Returns:
TracingService: The TracingService instance.
@ -73,8 +72,7 @@ def get_tracing_service() -> TracingService:
def get_state_service() -> StateService:
"""
Retrieves the StateService instance from the service manager.
"""Retrieves the StateService instance from the service manager.
Returns:
The StateService instance.
@ -85,8 +83,7 @@ def get_state_service() -> StateService:
def get_socket_service() -> SocketIOService:
"""
Get the SocketIOService instance from the service manager.
"""Get the SocketIOService instance from the service manager.
Returns:
SocketIOService: The SocketIOService instance.
@ -95,8 +92,7 @@ def get_socket_service() -> SocketIOService:
def get_storage_service() -> StorageService:
"""
Retrieves the storage service instance.
"""Retrieves the storage service instance.
Returns:
The storage service instance.
@ -107,8 +103,7 @@ def get_storage_service() -> StorageService:
def get_variable_service() -> VariableService:
"""
Retrieves the VariableService instance from the service manager.
"""Retrieves the VariableService instance from the service manager.
Returns:
The VariableService instance.
@ -120,8 +115,7 @@ def get_variable_service() -> VariableService:
def get_plugins_service() -> PluginService:
"""
Get the PluginService instance from the service manager.
"""Get the PluginService instance from the service manager.
Returns:
PluginService: The PluginService instance.
@ -130,8 +124,7 @@ def get_plugins_service() -> PluginService:
def get_settings_service() -> SettingsService:
"""
Retrieves the SettingsService instance.
"""Retrieves the SettingsService instance.
If the service is not yet initialized, it will be initialized before returning.
@ -147,8 +140,7 @@ def get_settings_service() -> SettingsService:
def get_db_service() -> DatabaseService:
"""
Retrieves the DatabaseService instance from the service manager.
"""Retrieves the DatabaseService instance from the service manager.
Returns:
The DatabaseService instance.
@ -160,8 +152,7 @@ def get_db_service() -> DatabaseService:
def get_session() -> Generator[Session, None, None]:
"""
Retrieves a session from the database service.
"""Retrieves a session from the database service.
Yields:
Session: A session object.
@ -173,8 +164,7 @@ def get_session() -> Generator[Session, None, None]:
@contextmanager
def session_scope() -> Generator[Session, None, None]:
"""
Context manager for managing a session scope.
"""Context manager for managing a session scope.
This context manager is used to manage a session scope for database operations.
It ensures that the session is properly committed if no exceptions occur,
@ -199,8 +189,7 @@ def session_scope() -> Generator[Session, None, None]:
def get_cache_service() -> CacheService:
"""
Retrieves the cache service from the service manager.
"""Retrieves the cache service from the service manager.
Returns:
The cache service instance.
@ -211,8 +200,7 @@ def get_cache_service() -> CacheService:
def get_shared_component_cache_service() -> CacheService:
"""
Retrieves the cache service from the service manager.
"""Retrieves the cache service from the service manager.
Returns:
The cache service instance.
@ -223,8 +211,7 @@ def get_shared_component_cache_service() -> CacheService:
def get_session_service() -> SessionService:
"""
Retrieves the session service from the service manager.
"""Retrieves the session service from the service manager.
Returns:
The session service instance.
@ -235,8 +222,7 @@ def get_session_service() -> SessionService:
def get_task_service() -> TaskService:
"""
Retrieves the TaskService instance from the service manager.
"""Retrieves the TaskService instance from the service manager.
Returns:
The TaskService instance.
@ -248,8 +234,7 @@ def get_task_service() -> TaskService:
def get_chat_service() -> ChatService:
"""
Get the chat service instance.
"""Get the chat service instance.
Returns:
ChatService: The chat service instance.
@ -258,8 +243,7 @@ def get_chat_service() -> ChatService:
def get_store_service() -> StoreService:
"""
Retrieves the StoreService instance from the service manager.
"""Retrieves the StoreService instance from the service manager.
Returns:
StoreService: The StoreService instance.

View file

@ -20,9 +20,7 @@ class NoFactoryRegisteredError(Exception):
class ServiceManager:
"""
Manages the creation of different services.
"""
"""Manages the creation of different services."""
def __init__(self):
self.services: dict[str, Service] = {}
@ -41,18 +39,12 @@ class ServiceManager:
self,
service_factory: ServiceFactory,
):
"""
Registers a new factory with dependencies.
"""
"""Registers a new factory with dependencies."""
service_name = service_factory.service_class.name
self.factories[service_name] = service_factory
def get(self, service_name: ServiceType, default: ServiceFactory | None = None) -> Service:
"""
Get (or create) a service by its name.
"""
"""Get (or create) a service by its name."""
with self.keyed_lock.lock(service_name):
if service_name not in self.services:
self._create_service(service_name, default)
@ -60,9 +52,7 @@ class ServiceManager:
return self.services[service_name]
def _create_service(self, service_name: ServiceType, default: ServiceFactory | None = None):
"""
Create a new service given its name, handling dependencies.
"""
"""Create a new service given its name, handling dependencies."""
logger.debug(f"Create service {service_name}")
self._validate_service_creation(service_name, default)
@ -83,26 +73,20 @@ class ServiceManager:
self.services[service_name].set_ready()
def _validate_service_creation(self, service_name: ServiceType, default: ServiceFactory | None = None):
"""
Validate whether the service can be created.
"""
"""Validate whether the service can be created."""
if service_name not in self.factories and default is None:
msg = f"No factory registered for the service class '{service_name.name}'"
raise NoFactoryRegisteredError(msg)
def update(self, service_name: ServiceType):
"""
Update a service by its name.
"""
"""Update a service by its name."""
if service_name in self.services:
logger.debug(f"Update service {service_name}")
self.services.pop(service_name, None)
self.get(service_name)
async def teardown(self):
"""
Teardown all the services.
"""
"""Teardown all the services."""
for service in self.services.values():
if service is None:
continue
@ -148,18 +132,14 @@ service_manager = ServiceManager()
def initialize_settings_service():
"""
Initialize the settings manager.
"""
"""Initialize the settings manager."""
from langflow.services.settings import factory as settings_factory
service_manager.register_factory(settings_factory.SettingsServiceFactory())
def initialize_session_service():
"""
Initialize the session manager.
"""
"""Initialize the session manager."""
from langflow.services.cache import factory as cache_factory
from langflow.services.session import factory as session_service_factory

View file

@ -2,10 +2,7 @@ from enum import Enum
class ServiceType(str, Enum):
"""
Enum for the different types of services that can be
registered with the service manager.
"""
"""Enum for the different types of services that can be registered with the service manager."""
AUTH_SERVICE = "auth_service"
CACHE_SERVICE = "cache_service"

View file

@ -19,8 +19,7 @@ BASE_COMPONENTS_PATH = str(Path(__file__).parent.parent.parent / "components")
def is_list_of_any(field: FieldInfo) -> bool:
"""
Check if the given field is a list or an optional list of any type.
"""Check if the given field is a list or an optional list of any type.
Args:
field (FieldInfo): The field to be checked.

View file

@ -2,8 +2,6 @@ from langflow.services.cache import ThreadingInMemoryCache
class SharedComponentCacheService(ThreadingInMemoryCache):
"""
A caching service shared across components.
"""
"""A caching service shared across components."""
name = "shared_component_cache_service"

View file

@ -66,15 +66,11 @@ class SocketIOService(Service):
)
def get_cache(self, sid: str) -> Any:
"""
Get the cache for a client.
"""
"""Get the cache for a client."""
return self.cache_service.get(sid)
def set_cache(self, sid: str, build_result: Any) -> bool:
"""
Set the cache for a client.
"""
"""Set the cache for a client."""
# client_id is the flow id but that already exists in the cache
# so we need to change it to something else

View file

@ -20,8 +20,7 @@ class LocalStorageService(StorageService):
return str(self.data_dir / flow_id / file_name)
async def save_file(self, flow_id: str, file_name: str, data: bytes):
"""
Save a file in the local storage.
"""Save a file in the local storage.
:param flow_id: The identifier for the flow.
:param file_name: The name of the file to be saved.
@ -46,8 +45,7 @@ class LocalStorageService(StorageService):
raise
async def get_file(self, flow_id: str, file_name: str) -> bytes:
"""
Retrieve a file from the local storage.
"""Retrieve a file from the local storage.
:param flow_id: The identifier for the flow.
:param file_name: The name of the file to be retrieved.
@ -69,8 +67,7 @@ class LocalStorageService(StorageService):
return content
async def list_files(self, flow_id: str):
"""
List all files in a specified flow.
"""List all files in a specified flow.
:param flow_id: The identifier for the flow.
:return: A list of file names.
@ -87,8 +84,7 @@ class LocalStorageService(StorageService):
return files
async def delete_file(self, flow_id: str, file_name: str):
"""
Delete a file from the local storage.
"""Delete a file from the local storage.
:param flow_id: The identifier for the flow.
:param file_name: The name of the file to be deleted.

View file

@ -16,8 +16,7 @@ class S3StorageService(StorageService):
self.set_ready()
async def save_file(self, folder: str, file_name: str, data):
"""
Save a file to the S3 bucket.
"""Save a file to the S3 bucket.
:param folder: The folder in the bucket to save the file.
:param file_name: The name of the file to be saved.
@ -35,8 +34,7 @@ class S3StorageService(StorageService):
raise
async def get_file(self, folder: str, file_name: str):
"""
Retrieve a file from the S3 bucket.
"""Retrieve a file from the S3 bucket.
:param folder: The folder in the bucket where the file is stored.
:param file_name: The name of the file to be retrieved.
@ -52,8 +50,7 @@ class S3StorageService(StorageService):
raise
async def list_files(self, folder: str):
"""
List all files in a specified folder of the S3 bucket.
"""List all files in a specified folder of the S3 bucket.
:param folder: The folder in the bucket to list files from.
:return: A list of file names.
@ -70,8 +67,7 @@ class S3StorageService(StorageService):
return files
async def delete_file(self, folder: str, file_name: str):
"""
Delete a file from the S3 bucket.
"""Delete a file from the S3 bucket.
:param folder: The folder in the bucket where the file is stored.
:param file_name: The name of the file to be deleted.

Some files were not shown because too many files have changed in this diff Show more