ref: Some ruff fixes from preview (#5420)

* Some ruff fixes from preview

* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes (attempt 2/3)

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Christophe Bornet 2024-12-28 22:25:35 +01:00 committed by GitHub
commit e91bcc2520
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
79 changed files with 402 additions and 374 deletions

View file

@ -17,8 +17,10 @@ from blockbuster import blockbuster_ctx
from dotenv import load_dotenv
from fastapi.testclient import TestClient
from httpx import ASGITransport, AsyncClient
from langflow.components.inputs import ChatInput
from langflow.graph import Graph
from langflow.initial_setup.constants import STARTER_FOLDER_NAME
from langflow.main import create_app
from langflow.services.auth.utils import get_password_hash
from langflow.services.database.models.api_key.model import ApiKey
from langflow.services.database.models.flow.model import Flow, FlowCreate
@ -155,8 +157,6 @@ def caplog(caplog: pytest.LogCaptureFixture):
@pytest.fixture
async def async_client() -> AsyncGenerator:
from langflow.main import create_app
app = create_app()
async with AsyncClient(app=app, base_url="http://testserver", http2=True) as client:
yield client
@ -228,8 +228,6 @@ def distributed_client_fixture(
# def get_session_override():
# return session
from langflow.main import create_app
app = create_app()
# app.dependency_overrides[get_session] = get_session_override
@ -357,8 +355,6 @@ async def client_fixture(
monkeypatch.setenv("LANGFLOW_LOAD_FLOWS_PATH", load_flows_dir)
monkeypatch.setenv("LANGFLOW_AUTO_LOGIN", "true")
from langflow.main import create_app
app = create_app()
db_service = get_db_service()
db_service.database_url = f"sqlite:///{db_path}"
@ -482,8 +478,6 @@ async def flow(
json_flow: str,
active_user,
):
from langflow.services.database.models.flow.model import FlowCreate
loaded_json = json.loads(json_flow)
flow_data = FlowCreate(name="test_flow", data=loaded_json.get("data"), user_id=active_user.id)
@ -577,8 +571,6 @@ async def added_webhook_test(client, json_webhook_test, logged_in_headers):
@pytest.fixture
async def flow_component(client: AsyncClient, logged_in_headers):
from langflow.components.inputs import ChatInput
chat_input = ChatInput()
graph = Graph(start=chat_input, end=chat_input)
graph_dict = graph.dump(name="Chat Input Component")

View file

@ -1,12 +1,17 @@
import pytest
from langflow.components.astra_assistants import (
AssistantsCreateAssistant,
AssistantsCreateThread,
AssistantsGetAssistantName,
AssistantsListAssistants,
AssistantsRun,
)
from tests.integration.utils import run_single_component
@pytest.mark.api_key_required
async def test_list_assistants():
from langflow.components.astra_assistants import AssistantsListAssistants
results = await run_single_component(
AssistantsListAssistants,
inputs={},
@ -16,8 +21,6 @@ async def test_list_assistants():
@pytest.mark.api_key_required
async def test_create_assistants():
from langflow.components.astra_assistants import AssistantsCreateAssistant
results = await run_single_component(
AssistantsCreateAssistant,
inputs={
@ -36,8 +39,6 @@ async def test_create_assistants():
@pytest.mark.api_key_required
async def test_create_thread():
from langflow.components.astra_assistants import AssistantsCreateThread
results = await run_single_component(
AssistantsCreateThread,
inputs={},
@ -48,8 +49,6 @@ async def test_create_thread():
async def get_assistant_name(assistant_id):
from langflow.components.astra_assistants import AssistantsGetAssistantName
results = await run_single_component(
AssistantsGetAssistantName,
inputs={
@ -60,8 +59,6 @@ async def get_assistant_name(assistant_id):
async def run_assistant(assistant_id, thread_id):
from langflow.components.astra_assistants import AssistantsRun
results = await run_single_component(
AssistantsRun,
inputs={

View file

@ -2,6 +2,7 @@ import os
import pytest
from astrapy.db import AstraDB
from langchain_astradb import AstraDBVectorStore, CollectionVectorServiceOptions
from langchain_core.documents import Document
from langflow.components.embeddings import OpenAIEmbeddingsComponent
from langflow.components.vectorstores import AstraDBVectorStoreComponent
@ -37,8 +38,6 @@ def astradb_client():
@pytest.mark.api_key_required
async def test_base(astradb_client: AstraDB):
from langflow.components.embeddings import OpenAIEmbeddingsComponent
application_token = get_astradb_application_token()
api_endpoint = get_astradb_api_endpoint()
@ -88,8 +87,6 @@ async def test_astra_embeds_and_search():
@pytest.mark.api_key_required
def test_astra_vectorize():
from langchain_astradb import AstraDBVectorStore, CollectionVectorServiceOptions
application_token = get_astradb_application_token()
api_endpoint = get_astradb_api_endpoint()
@ -132,8 +129,6 @@ def test_astra_vectorize():
@pytest.mark.api_key_required
def test_astra_vectorize_with_provider_api_key():
"""Tests vectorize using an openai api key."""
from langchain_astradb import AstraDBVectorStore, CollectionVectorServiceOptions
application_token = get_astradb_application_token()
api_endpoint = get_astradb_api_endpoint()
@ -189,8 +184,6 @@ def test_astra_vectorize_with_provider_api_key():
@pytest.mark.api_key_required
def test_astra_vectorize_passes_authentication():
"""Tests vectorize using the authentication parameter."""
from langchain_astradb import AstraDBVectorStore, CollectionVectorServiceOptions
store = None
try:
application_token = get_astradb_application_token()

View file

@ -10,7 +10,7 @@ from tests.integration.utils import ComponentInputHandle, run_single_component
@pytest.mark.api_key_required
async def test_csv_output_parser_openai():
format_instructions = ComponentInputHandle(
format_instructions_ = ComponentInputHandle(
clazz=OutputParserComponent,
inputs={},
output_name="format_instructions",
@ -24,7 +24,7 @@ async def test_csv_output_parser_openai():
clazz=PromptComponent,
inputs={
"template": "List the first five positive integers.\n\n{format_instructions}",
"format_instructions": format_instructions,
"format_instructions": format_instructions_,
},
output_name="prompt",
)

View file

@ -28,7 +28,7 @@ async def test_initialize_services():
@pytest.mark.benchmark
async def test_setup_llm_caching():
def test_setup_llm_caching():
"""Benchmark LLM caching setup."""
from langflow.interface.utils import setup_llm_caching

View file

@ -11,6 +11,7 @@ import anyio
import pytest
from asgi_lifespan import LifespanManager
from httpx import ASGITransport, AsyncClient
from langflow.main import create_app
from langflow.services.deps import get_storage_service
from langflow.services.storage.service import StorageService
from sqlmodel import Session
@ -60,8 +61,6 @@ async def files_client_fixture(
monkeypatch.setenv("LANGFLOW_LOAD_FLOWS_PATH", load_flows_dir)
monkeypatch.setenv("LANGFLOW_AUTO_LOGIN", "true")
from langflow.main import create_app
app = create_app()
return app, db_path
@ -81,14 +80,14 @@ async def files_client_fixture(
@pytest.fixture
async def max_file_size_upload_fixture(monkeypatch):
def max_file_size_upload_fixture(monkeypatch):
monkeypatch.setenv("LANGFLOW_MAX_FILE_SIZE_UPLOAD", "1")
yield
monkeypatch.undo()
@pytest.fixture
async def max_file_size_upload_10mb_fixture(monkeypatch):
def max_file_size_upload_10mb_fixture(monkeypatch):
monkeypatch.setenv("LANGFLOW_MAX_FILE_SIZE_UPLOAD", "10")
yield
monkeypatch.undo()

View file

@ -11,7 +11,7 @@ from langflow.schema.data import Data
from pydantic import BaseModel
async def test_component_tool():
def test_component_tool():
calculator_component = CalculatorToolComponent()
component_toolkit = ComponentToolkit(component=calculator_component)
component_tool = component_toolkit.get_tools()[0]

View file

@ -118,7 +118,7 @@ class AllInputsComponent(Component):
return data
async def test_component_inputs_toolkit():
def test_component_inputs_toolkit():
component = AllInputsComponent()
component_toolkit = ComponentToolkit(component=component)
component_tool = component_toolkit.get_tools()[0]

View file

@ -1,8 +1,11 @@
import re
from unittest.mock import MagicMock, patch
import pytest
from langchain_core.language_models import BaseLanguageModel
from langflow.components.helpers.structured_output import StructuredOutputComponent
from langflow.helpers.base_model import build_model_from_schema
from langflow.inputs.inputs import TableInput
from langflow.schema.data import Data
from pydantic import BaseModel
from typing_extensions import override
@ -12,8 +15,6 @@ class TestStructuredOutputComponent:
# Ensure that the structured output is successfully generated with the correct BaseModel instance returned by
# the mock function
def test_successful_structured_output_generation_with_patch_with_config(self):
from unittest.mock import patch
class MockLanguageModel(BaseLanguageModel):
@override
def with_structured_output(self, *args, **kwargs):
@ -87,15 +88,11 @@ class TestStructuredOutputComponent:
multiple=False,
)
with pytest.raises(TypeError, match="Language model does not support structured output."):
with pytest.raises(TypeError, match=re.escape("Language model does not support structured output.")):
component.build_structured_output()
# Correctly builds the output model from the provided schema
def test_correctly_builds_output_model(self):
# Import internal organization modules, packages, and libraries
from langflow.helpers.base_model import build_model_from_schema
from langflow.inputs.inputs import TableInput
# Setup
component = StructuredOutputComponent()
schema = [
@ -134,10 +131,6 @@ class TestStructuredOutputComponent:
# Properly handles multiple outputs when 'multiple' is set to True
def test_handles_multiple_outputs(self):
# Import internal organization modules, packages, and libraries
from langflow.helpers.base_model import build_model_from_schema
from langflow.inputs.inputs import TableInput
# Setup
component = StructuredOutputComponent()
schema = [
@ -261,5 +254,5 @@ class TestStructuredOutputComponent:
multiple=False,
)
with pytest.raises(TypeError, match="Language model does not support structured output."):
with pytest.raises(TypeError, match=re.escape("Language model does not support structured output.")):
component.build_structured_output()

View file

@ -44,7 +44,7 @@ def test_operations(sample_dataframe, operation, expected_columns, expected_valu
component.new_column_name = "Z"
elif operation == "Select Columns":
component.columns_to_select = ["A", "C"]
elif operation in ("Head", "Tail"):
elif operation in {"Head", "Tail"}:
component.num_rows = 1
elif operation == "Replace Value":
component.column_name = "C"

View file

@ -1,3 +1,5 @@
import re
import pytest
from langflow.components.processing import CreateDataComponent
from langflow.schema import Data
@ -48,7 +50,7 @@ def test_update_build_config_exceed_limit(create_data_component):
"value": False,
},
}
with pytest.raises(ValueError, match="Number of fields cannot exceed 15."):
with pytest.raises(ValueError, match=re.escape("Number of fields cannot exceed 15.")):
create_data_component.update_build_config(build_config, 16, "number_of_fields")

View file

@ -1,3 +1,5 @@
import re
import pytest
from langflow.components.processing import UpdateDataComponent
from langflow.schema import Data
@ -48,7 +50,7 @@ def test_update_build_config_exceed_limit(update_data_component):
"value": False,
},
}
with pytest.raises(ValueError, match="Number of fields cannot exceed 15."):
with pytest.raises(ValueError, match=re.escape("Number of fields cannot exceed 15.")):
update_data_component.update_build_config(build_config, 16, "number_of_fields")

View file

@ -14,11 +14,6 @@ from langflow.schema.properties import Properties, Source
from langflow.template.field.base import Output
async def create_event_queue():
"""Create a queue for testing events."""
return asyncio.Queue()
def blocking_cb(manager, event_type, data):
time.sleep(0.01)
manager.send_event(event_type=event_type, data=data)
@ -43,7 +38,7 @@ class ComponentForTesting(Component):
async def test_component_message_sending():
"""Test component's message sending functionality."""
# Create event queue and manager
queue = await create_event_queue()
queue = asyncio.Queue()
event_manager = EventManager(queue)
event_manager.register_event("on_message", "message", callback=blocking_cb)
@ -75,7 +70,7 @@ async def test_component_message_sending():
async def test_component_tool_output():
"""Test component's tool output functionality."""
# Create event queue and manager
queue = await create_event_queue()
queue = asyncio.Queue()
event_manager = EventManager(queue)
# Create component
@ -110,7 +105,7 @@ async def test_component_tool_output():
async def test_component_error_handling():
"""Test component's error handling."""
# Create event queue and manager
queue = await create_event_queue()
queue = asyncio.Queue()
event_manager = EventManager(queue)
# Create component
@ -141,7 +136,7 @@ async def test_component_error_handling():
async def test_component_build_results():
"""Test component's build_results functionality."""
# Create event queue and manager
queue = await create_event_queue()
queue = asyncio.Queue()
event_manager = EventManager(queue)
# Create component
@ -173,7 +168,7 @@ async def test_component_build_results():
async def test_component_logging():
"""Test component's logging functionality."""
# Create event queue and manager
queue = await create_event_queue()
queue = asyncio.Queue()
event_manager = EventManager(queue)
# Create component
@ -207,7 +202,7 @@ async def test_component_logging():
@pytest.mark.usefixtures("client")
async def test_component_streaming_message():
"""Test component's streaming message functionality."""
queue = await create_event_queue()
queue = asyncio.Queue()
event_manager = EventManager(queue)
event_manager.register_event("on_token", "token", blocking_cb)

View file

@ -40,7 +40,7 @@ class TestEventManager:
def test_accessing_non_registered_event_callback_with_recommended_fix(self):
queue = asyncio.Queue()
manager = EventManager(queue)
result = manager.__getattr__("non_registered_event")
result = manager.non_registered_event
assert result == manager.noop
# Accessing a registered event callback via __getattr__
@ -130,8 +130,6 @@ class TestEventManager:
assert len(manager.events) == 1000
# Verifying the uniqueness of event IDs for each event triggered using await with asyncio decorator
import pytest
async def test_event_id_uniqueness_with_await(self):
queue = asyncio.Queue()
manager = EventManager(queue)

View file

@ -1,11 +1,10 @@
from unittest.mock import Mock, patch
from langflow.exceptions.api import APIException, ExceptionBody
from langflow.services.database.models.flow.model import Flow
def test_api_exception():
from langflow.exceptions.api import APIException, ExceptionBody
mock_exception = Exception("Test exception")
mock_flow = Mock(spec=Flow)
mock_outdated_components = ["component1", "component2"]
@ -45,8 +44,6 @@ def test_api_exception():
def test_api_exception_no_flow():
from langflow.exceptions.api import APIException, ExceptionBody
# Mock data
mock_exception = Exception("Test exception")

View file

@ -1,3 +1,5 @@
import re
import pytest
from langflow.components.inputs import ChatInput
from langflow.components.models import OpenAIModelComponent
@ -25,5 +27,7 @@ Answer:
chat_output = ChatOutput()
chat_output.set(input_value=openai_component.text_response)
with pytest.raises(ValueError, match="Component OpenAI field 'input_values' might not be a valid input."):
with pytest.raises(
ValueError, match=re.escape("Component OpenAI field 'input_values' might not be a valid input.")
):
Graph(start=chat_input, end=chat_output)

View file

@ -20,7 +20,7 @@ async def test_graph_not_prepared():
await graph.astep()
async def test_graph(caplog: pytest.LogCaptureFixture):
def test_graph(caplog: pytest.LogCaptureFixture):
chat_input = ChatInput()
chat_output = ChatOutput()
graph = Graph()

View file

@ -2,14 +2,14 @@ from typing import TYPE_CHECKING, Literal
import pytest
from langflow.components.inputs import ChatInput
from langflow.inputs.inputs import DropdownInput, FileInput, IntInput, NestedDictInput, StrInput
from langflow.io.schema import create_input_schema
if TYPE_CHECKING:
from pydantic.fields import FieldInfo
def test_create_input_schema():
from langflow.io.schema import create_input_schema
schema = create_input_schema(ChatInput.inputs)
assert schema.__name__ == "InputSchema"
@ -17,9 +17,6 @@ def test_create_input_schema():
class TestCreateInputSchema:
# Single input type is converted to list and processed correctly
def test_single_input_type_conversion(self):
from langflow.inputs.inputs import StrInput
from langflow.io.schema import create_input_schema
input_instance = StrInput(name="test_field")
schema = create_input_schema([input_instance])
assert schema.__name__ == "InputSchema"
@ -27,9 +24,6 @@ class TestCreateInputSchema:
# Multiple input types are processed and included in the schema
def test_multiple_input_types(self):
from langflow.inputs.inputs import IntInput, StrInput
from langflow.io.schema import create_input_schema
inputs = [StrInput(name="str_field"), IntInput(name="int_field")]
schema = create_input_schema(inputs)
assert schema.__name__ == "InputSchema"
@ -38,9 +32,6 @@ class TestCreateInputSchema:
# Fields are correctly created with appropriate types and attributes
def test_fields_creation_with_correct_types_and_attributes(self):
from langflow.inputs.inputs import StrInput
from langflow.io.schema import create_input_schema
input_instance = StrInput(name="test_field", info="Test Info", required=True)
schema = create_input_schema([input_instance])
field_info = schema.model_fields["test_field"]
@ -49,18 +40,12 @@ class TestCreateInputSchema:
# Schema model is created and returned successfully
def test_schema_model_creation(self):
from langflow.inputs.inputs import StrInput
from langflow.io.schema import create_input_schema
input_instance = StrInput(name="test_field")
schema = create_input_schema([input_instance])
assert schema.__name__ == "InputSchema"
# Default values are correctly assigned to fields
def test_default_values_assignment(self):
from langflow.inputs.inputs import StrInput
from langflow.io.schema import create_input_schema
input_instance = StrInput(name="test_field", value="default_value")
schema = create_input_schema([input_instance])
field_info = schema.model_fields["test_field"]
@ -68,16 +53,11 @@ class TestCreateInputSchema:
# Empty list of inputs is handled without errors
def test_empty_list_of_inputs(self):
from langflow.io.schema import create_input_schema
schema = create_input_schema([])
assert schema.__name__ == "InputSchema"
# Input with missing optional attributes (e.g., display_name, info) is processed correctly
def test_missing_optional_attributes(self):
from langflow.inputs.inputs import StrInput
from langflow.io.schema import create_input_schema
input_instance = StrInput(name="test_field")
schema = create_input_schema([input_instance])
field_info = schema.model_fields["test_field"]
@ -86,9 +66,6 @@ class TestCreateInputSchema:
# Input with is_list attribute set to True is processed correctly
def test_is_list_attribute_processing(self):
from langflow.inputs.inputs import StrInput
from langflow.io.schema import create_input_schema
input_instance = StrInput(name="test_field", is_list=True)
schema = create_input_schema([input_instance])
field_info: FieldInfo = schema.model_fields["test_field"]
@ -96,9 +73,6 @@ class TestCreateInputSchema:
# Input with options attribute is processed correctly
def test_options_attribute_processing(self):
from langflow.inputs.inputs import DropdownInput
from langflow.io.schema import create_input_schema
input_instance = DropdownInput(name="test_field", options=["option1", "option2"])
schema = create_input_schema([input_instance])
field_info = schema.model_fields["test_field"]
@ -106,9 +80,6 @@ class TestCreateInputSchema:
# Non-standard field types are handled correctly
def test_non_standard_field_types_handling(self):
from langflow.inputs.inputs import FileInput
from langflow.io.schema import create_input_schema
input_instance = FileInput(name="file_field")
schema = create_input_schema([input_instance])
field_info = schema.model_fields["file_field"]
@ -116,9 +87,6 @@ class TestCreateInputSchema:
# Inputs with mixed required and optional fields are processed correctly
def test_mixed_required_optional_fields_processing(self):
from langflow.inputs.inputs import IntInput, StrInput
from langflow.io.schema import create_input_schema
inputs = [
StrInput(name="required_field", required=True),
IntInput(name="optional_field", required=False),
@ -132,9 +100,6 @@ class TestCreateInputSchema:
# Inputs with complex nested structures are handled correctly
def test_complex_nested_structures_handling(self):
from langflow.inputs.inputs import NestedDictInput
from langflow.io.schema import create_input_schema
nested_input = NestedDictInput(name="nested_field", value={"key": "value"})
schema = create_input_schema([nested_input])
@ -145,9 +110,6 @@ class TestCreateInputSchema:
# Creating a schema from a single input type
def test_single_input_type_replica(self):
from langflow.inputs.inputs import StrInput
from langflow.io.schema import create_input_schema
input_instance = StrInput(name="test_field")
schema = create_input_schema([input_instance])
assert schema.__name__ == "InputSchema"
@ -155,18 +117,12 @@ class TestCreateInputSchema:
# Creating a schema from a list of input types
def test_passing_input_type_directly(self):
from langflow.inputs.inputs import IntInput, StrInput
from langflow.io.schema import create_input_schema
inputs = StrInput(name="str_field"), IntInput(name="int_field")
with pytest.raises(TypeError):
create_input_schema(inputs)
# Handling input types with options correctly
def test_options_handling(self):
from langflow.inputs.inputs import DropdownInput
from langflow.io.schema import create_input_schema
input_instance = DropdownInput(name="test_field", options=["option1", "option2"])
schema = create_input_schema([input_instance])
field_info = schema.model_fields["test_field"]
@ -174,9 +130,6 @@ class TestCreateInputSchema:
# Handling input types with is_list attribute correctly
def test_is_list_handling(self):
from langflow.inputs.inputs import StrInput
from langflow.io.schema import create_input_schema
input_instance = StrInput(name="test_field", is_list=True)
schema = create_input_schema([input_instance])
field_info = schema.model_fields["test_field"]
@ -184,9 +137,6 @@ class TestCreateInputSchema:
# Converting FieldTypes to corresponding Python types
def test_field_types_conversion(self):
from langflow.inputs.inputs import IntInput
from langflow.io.schema import create_input_schema
input_instance = IntInput(name="int_field")
schema = create_input_schema([input_instance])
field_info = schema.model_fields["int_field"]
@ -194,9 +144,6 @@ class TestCreateInputSchema:
# Setting default values for non-required fields
def test_default_values_for_non_required_fields(self):
from langflow.inputs.inputs import StrInput
from langflow.io.schema import create_input_schema
input_instance = StrInput(name="test_field", value="default_value")
schema = create_input_schema([input_instance])
field_info = schema.model_fields["test_field"]
@ -204,9 +151,6 @@ class TestCreateInputSchema:
# Handling input types with missing attributes
def test_missing_attributes_handling(self):
from langflow.inputs.inputs import StrInput
from langflow.io.schema import create_input_schema
input_instance = StrInput(name="test_field")
schema = create_input_schema([input_instance])
field_info = schema.model_fields["test_field"]
@ -217,9 +161,6 @@ class TestCreateInputSchema:
# Handling input types with None as default value
def test_none_default_value_handling(self):
from langflow.inputs.inputs import StrInput
from langflow.io.schema import create_input_schema
input_instance = StrInput(name="test_field", value=None)
schema = create_input_schema([input_instance])
field_info = schema.model_fields["test_field"]
@ -227,9 +168,6 @@ class TestCreateInputSchema:
# Handling input types with special characters in names
def test_special_characters_in_names_handling(self):
from langflow.inputs.inputs import StrInput
from langflow.io.schema import create_input_schema
input_instance = StrInput(name="test@field#name")
schema = create_input_schema([input_instance])
assert "test@field#name" in schema.model_fields

View file

@ -1,3 +1,5 @@
import base64
import pytest
from langchain_core.messages import AIMessage, HumanMessage
from langflow.schema.data import Data
@ -9,8 +11,6 @@ def sample_image(tmp_path):
"""Create a sample image file for testing."""
image_path = tmp_path / "test_image.png"
# Create a small black 1x1 pixel PNG file
import base64
image_content = base64.b64decode(
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAACklEQVR4nGMAAQAABQABDQottAAAAABJRU5ErkJggg=="
)

View file

@ -1,3 +1,4 @@
import base64
import shutil
from datetime import datetime, timezone
from pathlib import Path
@ -29,8 +30,6 @@ def sample_image(langflow_cache_dir):
# Create the image in the flow directory
image_path = flow_dir / "test_image.png"
# Create a small black 1x1 pixel PNG file
import base64
image_content = base64.b64decode(
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAACklEQVR4nGMAAQAABQABDQottAAAAABJRU5ErkJggg=="
)

View file

@ -1,3 +1,5 @@
import copy
import pytest
from langchain_core.documents import Document
from langflow.schema import Data
@ -62,8 +64,6 @@ def test_custom_attribute_get_set_del():
def test_deep_copy():
import copy
record1 = Data(data={"text": "Hello", "number": 10})
record2 = copy.deepcopy(record1)
assert record2.text == "Hello"

View file

@ -13,6 +13,7 @@ from langflow.services.database.models.flow import Flow, FlowCreate, FlowUpdate
from langflow.services.database.models.folder.model import FolderCreate
from langflow.services.database.utils import session_getter
from langflow.services.deps import get_db_service
from sqlalchemy import text
@pytest.fixture(scope="module")
@ -619,8 +620,6 @@ async def test_sqlite_pragmas():
db_service = get_db_service()
async with db_service.with_session() as session:
from sqlalchemy import text
assert (await session.exec(text("PRAGMA journal_mode;"))).scalar() == "wal"
assert (await session.exec(text("PRAGMA synchronous;"))).scalar() == 1

View file

@ -64,7 +64,7 @@ class TestInput:
# Empty lists and edge cases
assert set(post_process_type(list)) == {list}
assert set(post_process_type(Union[int, None])) == {int, NoneType} # noqa: UP007
assert set(post_process_type(Union[None, list[None]])) == {None, NoneType} # noqa: UP007
assert set(post_process_type(Union[list[None], None])) == {None, NoneType} # noqa: UP007
# Handling complex nested structures
assert set(post_process_type(Union[SequenceABC[int | str], list[float]])) == {int, str, float} # noqa: UP007

View file

@ -1,3 +1,4 @@
import re
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
@ -48,7 +49,7 @@ def test_increment_counter_empty_label(opentelemetry_instance):
def test_increment_counter_missing_mandatory_label(opentelemetry_instance):
with pytest.raises(ValueError, match="Missing required labels: {'flow_id'}"):
with pytest.raises(ValueError, match=re.escape("Missing required labels: {'flow_id'}")):
opentelemetry_instance.increment_counter(metric_name="num_files_uploaded", value=5, labels={"service": "one"})

View file

@ -16,51 +16,57 @@ def sample_image(tmp_path):
return image_path
class TestImageUtils:
def test_convert_image_to_base64_success(self, sample_image):
"""Test successful conversion of image to base64."""
base64_str = convert_image_to_base64(sample_image)
assert isinstance(base64_str, str)
# Verify it's valid base64
assert base64.b64decode(base64_str)
def test_convert_image_to_base64_success(sample_image):
"""Test successful conversion of image to base64."""
base64_str = convert_image_to_base64(sample_image)
assert isinstance(base64_str, str)
# Verify it's valid base64
assert base64.b64decode(base64_str)
def test_convert_image_to_base64_empty_path(self):
"""Test conversion with empty path."""
with pytest.raises(ValueError, match="Image path cannot be empty"):
convert_image_to_base64("")
def test_convert_image_to_base64_nonexistent_file(self):
"""Test conversion with non-existent file."""
with pytest.raises(FileNotFoundError, match="Image file not found"):
convert_image_to_base64("nonexistent.png")
def test_convert_image_to_base64_empty_path():
"""Test conversion with empty path."""
with pytest.raises(ValueError, match="Image path cannot be empty"):
convert_image_to_base64("")
def test_convert_image_to_base64_directory(self, tmp_path):
"""Test conversion with directory path instead of file."""
with pytest.raises(ValueError, match="Path is not a file"):
convert_image_to_base64(tmp_path)
def test_create_data_url_success(self, sample_image):
"""Test successful creation of data URL."""
data_url = create_data_url(sample_image)
assert data_url.startswith("data:image/png;base64,")
# Verify the base64 part is valid
base64_part = data_url.split(",")[1]
assert base64.b64decode(base64_part)
def test_convert_image_to_base64_nonexistent_file():
"""Test conversion with non-existent file."""
with pytest.raises(FileNotFoundError, match="Image file not found"):
convert_image_to_base64("nonexistent.png")
def test_create_data_url_with_custom_mime(self, sample_image):
"""Test creation of data URL with custom MIME type."""
custom_mime = "image/custom"
data_url = create_data_url(sample_image, mime_type=custom_mime)
assert data_url.startswith(f"data:{custom_mime};base64,")
def test_create_data_url_invalid_file(self):
"""Test creation of data URL with invalid file."""
with pytest.raises(FileNotFoundError):
create_data_url("nonexistent.jpg")
def test_convert_image_to_base64_directory(tmp_path):
"""Test conversion with directory path instead of file."""
with pytest.raises(ValueError, match="Path is not a file"):
convert_image_to_base64(tmp_path)
def test_create_data_url_unrecognized_extension(self, tmp_path):
"""Test creation of data URL with unrecognized file extension."""
invalid_file = tmp_path / "test.unknown"
invalid_file.touch()
with pytest.raises(ValueError, match="Could not determine MIME type"):
create_data_url(invalid_file)
def test_create_data_url_success(sample_image):
"""Test successful creation of data URL."""
data_url = create_data_url(sample_image)
assert data_url.startswith("data:image/png;base64,")
# Verify the base64 part is valid
base64_part = data_url.split(",")[1]
assert base64.b64decode(base64_part)
def test_create_data_url_with_custom_mime(sample_image):
"""Test creation of data URL with custom MIME type."""
custom_mime = "image/custom"
data_url = create_data_url(sample_image, mime_type=custom_mime)
assert data_url.startswith(f"data:{custom_mime};base64,")
def test_create_data_url_invalid_file():
"""Test creation of data URL with invalid file."""
with pytest.raises(FileNotFoundError):
create_data_url("nonexistent.jpg")
def test_create_data_url_unrecognized_extension(tmp_path):
"""Test creation of data URL with unrecognized file extension."""
invalid_file = tmp_path / "test.unknown"
invalid_file.touch()
with pytest.raises(ValueError, match="Could not determine MIME type"):
create_data_url(invalid_file)

View file

@ -1,6 +1,7 @@
import math
import pytest
from langflow.utils.constants import MAX_TEXT_LENGTH
from langflow.utils.util_strings import truncate_long_strings
@ -48,8 +49,6 @@ def test_truncate_long_strings_negative_max_length():
# Test for None max_length (should use default MAX_TEXT_LENGTH)
def test_truncate_long_strings_none_max_length():
from langflow.utils.constants import MAX_TEXT_LENGTH
long_string = "a" * (MAX_TEXT_LENGTH + 10)
result = truncate_long_strings(long_string, None)
assert len(result) == MAX_TEXT_LENGTH + 3 # +3 for "..."