From c0b25fa65167db6fc25813569a573c19a37dbf86 Mon Sep 17 00:00:00 2001 From: Christophe Bornet Date: Sun, 8 Dec 2024 20:13:08 +0100 Subject: [PATCH] ref: Remove some useless asyncio.to_thread (#5149) Remove some useless asyncio.to_thread --- pyproject.toml | 2 +- src/backend/base/langflow/base/io/chat.py | 46 ------------------- .../directory_reader/directory_reader.py | 5 +- .../base/langflow/services/tracing/service.py | 8 ++-- src/backend/tests/conftest.py | 36 +++++++++------ .../tests/unit/api/v1/test_endpoints.py | 12 ++--- .../inputs/test_input_components.py | 6 +-- .../tests/unit/graph/graph/test_base.py | 11 ++--- src/backend/tests/unit/test_process.py | 4 +- uv.lock | 8 ++-- 10 files changed, 44 insertions(+), 94 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d14550662..813cf9cb3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -178,7 +178,7 @@ dev-dependencies = [ "asgi-lifespan>=2.1.0", "pytest-github-actions-annotate-failures>=0.2.0", "pytest-codspeed>=3.0.0", - "blockbuster>=1.3.1,<1.4", + "blockbuster>=1.3.2,<1.4", "types-aiofiles>=24.1.0.20240626", ] diff --git a/src/backend/base/langflow/base/io/chat.py b/src/backend/base/langflow/base/io/chat.py index 17e3e4dc2..c0448e18d 100644 --- a/src/backend/base/langflow/base/io/chat.py +++ b/src/backend/base/langflow/base/io/chat.py @@ -1,56 +1,10 @@ -import asyncio -from typing import cast - from langflow.custom import Component -from langflow.memory import astore_message -from langflow.schema import Data -from langflow.schema.message import Message class ChatComponent(Component): display_name = "Chat Component" description = "Use as base for chat components." - async def build_with_data( - self, - *, - sender: str | None = "User", - sender_name: str | None = "User", - input_value: str | Data | Message | None = None, - files: list[str] | None = None, - session_id: str | None = None, - return_message: bool = False, - ) -> str | Message: - message = await asyncio.to_thread(self._create_message, input_value, sender, sender_name, files, session_id) - message_text = message.text if not return_message else message - - self.status = message_text - if session_id and isinstance(message, Message) and isinstance(message.text, str): - flow_id = self.graph.flow_id if hasattr(self, "graph") else None - messages = await astore_message(message, flow_id=flow_id) - self.status = messages - self._send_messages_events(messages) - - return cast("str | Message", message_text) - - def _create_message(self, input_value, sender, sender_name, files, session_id) -> Message: - if isinstance(input_value, Data): - return Message.from_data(input_value) - return Message( - text=input_value, - sender=sender, - sender_name=sender_name, - files=files, - session_id=session_id, - category="message", - ) - - def _send_messages_events(self, messages) -> None: - if hasattr(self, "_event_manager") and self._event_manager: - for stored_message in messages: - id_ = stored_message.id - self._send_message_event(message=stored_message, id_=id_) - def get_properties_from_source_component(self): if hasattr(self, "_vertex") and hasattr(self._vertex, "incoming_edges") and self._vertex.incoming_edges: source_id = self._vertex.incoming_edges[0].source_id diff --git a/src/backend/base/langflow/custom/directory_reader/directory_reader.py b/src/backend/base/langflow/custom/directory_reader/directory_reader.py index 50adc63da..458a2e2aa 100644 --- a/src/backend/base/langflow/custom/directory_reader/directory_reader.py +++ b/src/backend/base/langflow/custom/directory_reader/directory_reader.py @@ -296,9 +296,6 @@ class DirectoryReader: file_content = str(StringCompressor(file_content).compress_string()) return True, file_content - async def get_output_types_from_code_async(self, code: str): - return await asyncio.to_thread(self.get_output_types_from_code, code) - async def abuild_component_menu_list(self, file_paths): response = {"menu": []} logger.debug("-------------------- Async Building component menu list --------------------") @@ -328,7 +325,7 @@ class DirectoryReader: if validation_result: try: - output_types = await self.get_output_types_from_code_async(result_content) + output_types = await asyncio.to_thread(self.get_output_types_from_code, result_content) except Exception: # noqa: BLE001 logger.exception("Error while getting output types from code") output_types = [component_name_camelcase] diff --git a/src/backend/base/langflow/services/tracing/service.py b/src/backend/base/langflow/services/tracing/service.py index 90119558a..09587b625 100644 --- a/src/backend/base/langflow/services/tracing/service.py +++ b/src/backend/base/langflow/services/tracing/service.py @@ -114,10 +114,10 @@ class TracingService(Service): async def initialize_tracers(self) -> None: try: await self.start() - await asyncio.to_thread(self._initialize_langsmith_tracer) - await asyncio.to_thread(self._initialize_langwatch_tracer) - await asyncio.to_thread(self._initialize_langfuse_tracer) - await asyncio.to_thread(self._initialize_arize_phoenix_tracer) + self._initialize_langsmith_tracer() + self._initialize_langwatch_tracer() + self._initialize_langfuse_tracer() + self._initialize_arize_phoenix_tracer() except Exception as e: # noqa: BLE001 logger.debug(f"Error initializing tracers: {e}") diff --git a/src/backend/tests/conftest.py b/src/backend/tests/conftest.py index e47e4e856..70d73b888 100644 --- a/src/backend/tests/conftest.py +++ b/src/backend/tests/conftest.py @@ -46,20 +46,28 @@ load_dotenv() @pytest.fixture(autouse=True) -def blockbuster(): - with blockbuster_ctx() as bb: - for func in [ - "io.BufferedReader.read", - "io.BufferedWriter.write", - "io.TextIOWrapper.read", - "io.TextIOWrapper.write", - ]: - bb.functions[func].can_block_functions.append(("settings/service.py", {"initialize"})) - for func in bb.functions: - if func.startswith("sqlite3."): - bb.functions[func].deactivate() - bb.functions["threading.Lock.acquire"].deactivate() - yield bb +def blockbuster(request): + if "benchmark" in request.keywords: + yield + else: + with blockbuster_ctx() as bb: + for func in [ + "io.BufferedReader.read", + "io.BufferedWriter.write", + "io.TextIOWrapper.read", + "io.TextIOWrapper.write", + ]: + bb.functions[func].can_block_functions.append(("settings/service.py", {"initialize"})) + for func in [ + "io.BufferedReader.read", + "io.TextIOWrapper.read", + ]: + bb.functions[func].can_block_functions.append(("importlib_metadata/__init__.py", {"metadata"})) + for func in bb.functions: + if func.startswith("sqlite3."): + bb.functions[func].deactivate() + bb.functions["threading.Lock.acquire"].deactivate() + yield bb def pytest_configure(config): diff --git a/src/backend/tests/unit/api/v1/test_endpoints.py b/src/backend/tests/unit/api/v1/test_endpoints.py index 5003bc838..a8ca4f444 100644 --- a/src/backend/tests/unit/api/v1/test_endpoints.py +++ b/src/backend/tests/unit/api/v1/test_endpoints.py @@ -1,18 +1,11 @@ -import asyncio -from pathlib import Path from typing import Any +from aiofile import async_open from fastapi import status from httpx import AsyncClient from langflow.api.v1.schemas import UpdateCustomComponentRequest -async def get_dynamic_output_component_code(): - return await asyncio.to_thread( - Path("src/backend/tests/data/dynamic_output_component.py").read_text, encoding="utf-8" - ) - - async def test_get_version(client: AsyncClient): response = await client.get("api/v1/version") result = response.json() @@ -37,7 +30,8 @@ async def test_get_config(client: AsyncClient): async def test_update_component_outputs(client: AsyncClient, logged_in_headers: dict): - code = await get_dynamic_output_component_code() + async with async_open("src/backend/tests/data/dynamic_output_component.py", encoding="utf-8") as f: + code = await f.read() frontend_node: dict[str, Any] = {"outputs": []} request = UpdateCustomComponentRequest( code=code, diff --git a/src/backend/tests/unit/components/inputs/test_input_components.py b/src/backend/tests/unit/components/inputs/test_input_components.py index 552b9e78b..56b34fee6 100644 --- a/src/backend/tests/unit/components/inputs/test_input_components.py +++ b/src/backend/tests/unit/components/inputs/test_input_components.py @@ -1,6 +1,5 @@ -import asyncio - import pytest +from aiofile import async_open from langflow.components.inputs import ChatInput, TextInputComponent from langflow.schema.message import Message from langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_USER, MESSAGE_SENDER_USER @@ -93,7 +92,8 @@ class TestChatInput(ComponentTestBaseWithClient): """Test message response with file attachments.""" # Create a temporary test file test_file = tmp_path / "test.txt" - await asyncio.to_thread(test_file.write_text, "Test content") + async with async_open(test_file, "w") as f: + await f.write("Test content") kwargs = { "input_value": "Message with file", diff --git a/src/backend/tests/unit/graph/graph/test_base.py b/src/backend/tests/unit/graph/graph/test_base.py index c32ef7f47..3e03f3f30 100644 --- a/src/backend/tests/unit/graph/graph/test_base.py +++ b/src/backend/tests/unit/graph/graph/test_base.py @@ -1,4 +1,3 @@ -import asyncio import logging from collections import deque @@ -14,7 +13,7 @@ from langflow.graph.graph.constants import Finish async def test_graph_not_prepared(): chat_input = ChatInput() chat_output = ChatOutput() - graph = await asyncio.to_thread(Graph) + graph = Graph() graph.add_component(chat_input) graph.add_component(chat_output) with pytest.raises(ValueError, match="Graph not prepared"): @@ -24,7 +23,7 @@ async def test_graph_not_prepared(): async def test_graph(caplog: pytest.LogCaptureFixture): chat_input = ChatInput() chat_output = ChatOutput() - graph = await asyncio.to_thread(Graph) + graph = Graph() graph.add_component(chat_input) graph.add_component(chat_output) caplog.clear() @@ -36,7 +35,7 @@ async def test_graph(caplog: pytest.LogCaptureFixture): async def test_graph_with_edge(): chat_input = ChatInput() chat_output = ChatOutput() - graph = await asyncio.to_thread(Graph) + graph = Graph() input_id = graph.add_component(chat_input) output_id = graph.add_component(chat_output) graph.add_component_edge(input_id, (chat_input.outputs[0].name, chat_input.inputs[0].name), output_id) @@ -58,7 +57,7 @@ async def test_graph_functional(): chat_input.set(should_store_message=False) chat_output = ChatOutput(input_value="test", _id="chat_output") chat_output.set(sender_name=chat_input.message_response) - graph = await asyncio.to_thread(Graph, chat_input, chat_output) + graph = Graph(chat_input, chat_output) assert graph._run_queue == deque(["chat_input"]) await graph.astep() assert graph._run_queue == deque(["chat_output"]) @@ -73,7 +72,7 @@ async def test_graph_functional_async_start(): chat_input = ChatInput(_id="chat_input") chat_output = ChatOutput(input_value="test", _id="chat_output") chat_output.set(sender_name=chat_input.message_response) - graph = await asyncio.to_thread(Graph, chat_input, chat_output) + graph = Graph(chat_input, chat_output) # Now iterate through the graph # and check that the graph is running # correctly diff --git a/src/backend/tests/unit/test_process.py b/src/backend/tests/unit/test_process.py index 37930d9c2..65680cd38 100644 --- a/src/backend/tests/unit/test_process.py +++ b/src/backend/tests/unit/test_process.py @@ -1,5 +1,3 @@ -import asyncio - from langflow.processing.process import process_tweaks from langflow.services.deps import get_session_service @@ -265,7 +263,7 @@ def test_tweak_not_in_template(): async def test_load_langchain_object_with_cached_session(basic_graph_data): # Provide a non-existent session_id - session_service = await asyncio.to_thread(get_session_service) + session_service = get_session_service() session_id1 = "non-existent-session-id" graph1, artifacts1 = await session_service.load_session(session_id1, basic_graph_data) # Use the new session_id to get the langchain_object again diff --git a/uv.lock b/uv.lock index 8cf1ede8e..97eca4fd5 100644 --- a/uv.lock +++ b/uv.lock @@ -478,14 +478,14 @@ wheels = [ [[package]] name = "blockbuster" -version = "1.3.1" +version = "1.3.2" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "forbiddenfruit" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/6a/8c/e7e61396951b9ab1c39c35fa3a9737270a62da41d0b3ff6a15207747e5eb/blockbuster-1.3.1.tar.gz", hash = "sha256:59984286e1cd274d406129f93a42bd62b98fe53f484d544e6e743a5d6d7e42d5", size = 9409 } +sdist = { url = "https://files.pythonhosted.org/packages/e4/0b/e61888f0510374e2fc3c86f8edff5f56c28b896c080af7732ed3e6b2f205/blockbuster-1.3.2.tar.gz", hash = "sha256:196a5f8fef3f22c718fd8e547dafc75a1936b87ff2b5a1f73ea233b00cff0d6a", size = 9463 } wheels = [ - { url = "https://files.pythonhosted.org/packages/a2/de/28b5c37969037ad7dcf08c18dba255e1ba3a42539cf04cae3ef627f706ae/blockbuster-1.3.1-py3-none-any.whl", hash = "sha256:883292a02c1ae9dbd76d637ea8b604a1cc75aebb0544d566aed90491e056963e", size = 8111 }, + { url = "https://files.pythonhosted.org/packages/9a/c1/745a22df92431b878be1949015164c228a523f97d978ea7f2c7e277e2fc8/blockbuster-1.3.2-py3-none-any.whl", hash = "sha256:57fd9fcc51933cb421400f2b2c708c6fee22269a8691a10a9dbfb671638cfdc7", size = 8153 }, ] [[package]] @@ -3781,7 +3781,7 @@ requires-dist = [ [package.metadata.requires-dev] dev = [ { name = "asgi-lifespan", specifier = ">=2.1.0" }, - { name = "blockbuster", specifier = ">=1.3.1,<1.4" }, + { name = "blockbuster", specifier = ">=1.3.2,<1.4" }, { name = "dictdiffer", specifier = ">=0.9.0" }, { name = "httpx", specifier = ">=0.27.0" }, { name = "ipykernel", specifier = ">=6.29.0" },