ref: Remove some useless asyncio.to_thread (#5149)
Remove some useless asyncio.to_thread
This commit is contained in:
parent
0b39554795
commit
c0b25fa651
10 changed files with 44 additions and 94 deletions
|
|
@ -178,7 +178,7 @@ dev-dependencies = [
|
|||
"asgi-lifespan>=2.1.0",
|
||||
"pytest-github-actions-annotate-failures>=0.2.0",
|
||||
"pytest-codspeed>=3.0.0",
|
||||
"blockbuster>=1.3.1,<1.4",
|
||||
"blockbuster>=1.3.2,<1.4",
|
||||
"types-aiofiles>=24.1.0.20240626",
|
||||
]
|
||||
|
||||
|
|
|
|||
|
|
@ -1,56 +1,10 @@
|
|||
import asyncio
|
||||
from typing import cast
|
||||
|
||||
from langflow.custom import Component
|
||||
from langflow.memory import astore_message
|
||||
from langflow.schema import Data
|
||||
from langflow.schema.message import Message
|
||||
|
||||
|
||||
class ChatComponent(Component):
|
||||
display_name = "Chat Component"
|
||||
description = "Use as base for chat components."
|
||||
|
||||
async def build_with_data(
|
||||
self,
|
||||
*,
|
||||
sender: str | None = "User",
|
||||
sender_name: str | None = "User",
|
||||
input_value: str | Data | Message | None = None,
|
||||
files: list[str] | None = None,
|
||||
session_id: str | None = None,
|
||||
return_message: bool = False,
|
||||
) -> str | Message:
|
||||
message = await asyncio.to_thread(self._create_message, input_value, sender, sender_name, files, session_id)
|
||||
message_text = message.text if not return_message else message
|
||||
|
||||
self.status = message_text
|
||||
if session_id and isinstance(message, Message) and isinstance(message.text, str):
|
||||
flow_id = self.graph.flow_id if hasattr(self, "graph") else None
|
||||
messages = await astore_message(message, flow_id=flow_id)
|
||||
self.status = messages
|
||||
self._send_messages_events(messages)
|
||||
|
||||
return cast("str | Message", message_text)
|
||||
|
||||
def _create_message(self, input_value, sender, sender_name, files, session_id) -> Message:
|
||||
if isinstance(input_value, Data):
|
||||
return Message.from_data(input_value)
|
||||
return Message(
|
||||
text=input_value,
|
||||
sender=sender,
|
||||
sender_name=sender_name,
|
||||
files=files,
|
||||
session_id=session_id,
|
||||
category="message",
|
||||
)
|
||||
|
||||
def _send_messages_events(self, messages) -> None:
|
||||
if hasattr(self, "_event_manager") and self._event_manager:
|
||||
for stored_message in messages:
|
||||
id_ = stored_message.id
|
||||
self._send_message_event(message=stored_message, id_=id_)
|
||||
|
||||
def get_properties_from_source_component(self):
|
||||
if hasattr(self, "_vertex") and hasattr(self._vertex, "incoming_edges") and self._vertex.incoming_edges:
|
||||
source_id = self._vertex.incoming_edges[0].source_id
|
||||
|
|
|
|||
|
|
@ -296,9 +296,6 @@ class DirectoryReader:
|
|||
file_content = str(StringCompressor(file_content).compress_string())
|
||||
return True, file_content
|
||||
|
||||
async def get_output_types_from_code_async(self, code: str):
|
||||
return await asyncio.to_thread(self.get_output_types_from_code, code)
|
||||
|
||||
async def abuild_component_menu_list(self, file_paths):
|
||||
response = {"menu": []}
|
||||
logger.debug("-------------------- Async Building component menu list --------------------")
|
||||
|
|
@ -328,7 +325,7 @@ class DirectoryReader:
|
|||
|
||||
if validation_result:
|
||||
try:
|
||||
output_types = await self.get_output_types_from_code_async(result_content)
|
||||
output_types = await asyncio.to_thread(self.get_output_types_from_code, result_content)
|
||||
except Exception: # noqa: BLE001
|
||||
logger.exception("Error while getting output types from code")
|
||||
output_types = [component_name_camelcase]
|
||||
|
|
|
|||
|
|
@ -114,10 +114,10 @@ class TracingService(Service):
|
|||
async def initialize_tracers(self) -> None:
|
||||
try:
|
||||
await self.start()
|
||||
await asyncio.to_thread(self._initialize_langsmith_tracer)
|
||||
await asyncio.to_thread(self._initialize_langwatch_tracer)
|
||||
await asyncio.to_thread(self._initialize_langfuse_tracer)
|
||||
await asyncio.to_thread(self._initialize_arize_phoenix_tracer)
|
||||
self._initialize_langsmith_tracer()
|
||||
self._initialize_langwatch_tracer()
|
||||
self._initialize_langfuse_tracer()
|
||||
self._initialize_arize_phoenix_tracer()
|
||||
except Exception as e: # noqa: BLE001
|
||||
logger.debug(f"Error initializing tracers: {e}")
|
||||
|
||||
|
|
|
|||
|
|
@ -46,20 +46,28 @@ load_dotenv()
|
|||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def blockbuster():
|
||||
with blockbuster_ctx() as bb:
|
||||
for func in [
|
||||
"io.BufferedReader.read",
|
||||
"io.BufferedWriter.write",
|
||||
"io.TextIOWrapper.read",
|
||||
"io.TextIOWrapper.write",
|
||||
]:
|
||||
bb.functions[func].can_block_functions.append(("settings/service.py", {"initialize"}))
|
||||
for func in bb.functions:
|
||||
if func.startswith("sqlite3."):
|
||||
bb.functions[func].deactivate()
|
||||
bb.functions["threading.Lock.acquire"].deactivate()
|
||||
yield bb
|
||||
def blockbuster(request):
|
||||
if "benchmark" in request.keywords:
|
||||
yield
|
||||
else:
|
||||
with blockbuster_ctx() as bb:
|
||||
for func in [
|
||||
"io.BufferedReader.read",
|
||||
"io.BufferedWriter.write",
|
||||
"io.TextIOWrapper.read",
|
||||
"io.TextIOWrapper.write",
|
||||
]:
|
||||
bb.functions[func].can_block_functions.append(("settings/service.py", {"initialize"}))
|
||||
for func in [
|
||||
"io.BufferedReader.read",
|
||||
"io.TextIOWrapper.read",
|
||||
]:
|
||||
bb.functions[func].can_block_functions.append(("importlib_metadata/__init__.py", {"metadata"}))
|
||||
for func in bb.functions:
|
||||
if func.startswith("sqlite3."):
|
||||
bb.functions[func].deactivate()
|
||||
bb.functions["threading.Lock.acquire"].deactivate()
|
||||
yield bb
|
||||
|
||||
|
||||
def pytest_configure(config):
|
||||
|
|
|
|||
|
|
@ -1,18 +1,11 @@
|
|||
import asyncio
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from aiofile import async_open
|
||||
from fastapi import status
|
||||
from httpx import AsyncClient
|
||||
from langflow.api.v1.schemas import UpdateCustomComponentRequest
|
||||
|
||||
|
||||
async def get_dynamic_output_component_code():
|
||||
return await asyncio.to_thread(
|
||||
Path("src/backend/tests/data/dynamic_output_component.py").read_text, encoding="utf-8"
|
||||
)
|
||||
|
||||
|
||||
async def test_get_version(client: AsyncClient):
|
||||
response = await client.get("api/v1/version")
|
||||
result = response.json()
|
||||
|
|
@ -37,7 +30,8 @@ async def test_get_config(client: AsyncClient):
|
|||
|
||||
|
||||
async def test_update_component_outputs(client: AsyncClient, logged_in_headers: dict):
|
||||
code = await get_dynamic_output_component_code()
|
||||
async with async_open("src/backend/tests/data/dynamic_output_component.py", encoding="utf-8") as f:
|
||||
code = await f.read()
|
||||
frontend_node: dict[str, Any] = {"outputs": []}
|
||||
request = UpdateCustomComponentRequest(
|
||||
code=code,
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
import asyncio
|
||||
|
||||
import pytest
|
||||
from aiofile import async_open
|
||||
from langflow.components.inputs import ChatInput, TextInputComponent
|
||||
from langflow.schema.message import Message
|
||||
from langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_USER, MESSAGE_SENDER_USER
|
||||
|
|
@ -93,7 +92,8 @@ class TestChatInput(ComponentTestBaseWithClient):
|
|||
"""Test message response with file attachments."""
|
||||
# Create a temporary test file
|
||||
test_file = tmp_path / "test.txt"
|
||||
await asyncio.to_thread(test_file.write_text, "Test content")
|
||||
async with async_open(test_file, "w") as f:
|
||||
await f.write("Test content")
|
||||
|
||||
kwargs = {
|
||||
"input_value": "Message with file",
|
||||
|
|
|
|||
|
|
@ -1,4 +1,3 @@
|
|||
import asyncio
|
||||
import logging
|
||||
from collections import deque
|
||||
|
||||
|
|
@ -14,7 +13,7 @@ from langflow.graph.graph.constants import Finish
|
|||
async def test_graph_not_prepared():
|
||||
chat_input = ChatInput()
|
||||
chat_output = ChatOutput()
|
||||
graph = await asyncio.to_thread(Graph)
|
||||
graph = Graph()
|
||||
graph.add_component(chat_input)
|
||||
graph.add_component(chat_output)
|
||||
with pytest.raises(ValueError, match="Graph not prepared"):
|
||||
|
|
@ -24,7 +23,7 @@ async def test_graph_not_prepared():
|
|||
async def test_graph(caplog: pytest.LogCaptureFixture):
|
||||
chat_input = ChatInput()
|
||||
chat_output = ChatOutput()
|
||||
graph = await asyncio.to_thread(Graph)
|
||||
graph = Graph()
|
||||
graph.add_component(chat_input)
|
||||
graph.add_component(chat_output)
|
||||
caplog.clear()
|
||||
|
|
@ -36,7 +35,7 @@ async def test_graph(caplog: pytest.LogCaptureFixture):
|
|||
async def test_graph_with_edge():
|
||||
chat_input = ChatInput()
|
||||
chat_output = ChatOutput()
|
||||
graph = await asyncio.to_thread(Graph)
|
||||
graph = Graph()
|
||||
input_id = graph.add_component(chat_input)
|
||||
output_id = graph.add_component(chat_output)
|
||||
graph.add_component_edge(input_id, (chat_input.outputs[0].name, chat_input.inputs[0].name), output_id)
|
||||
|
|
@ -58,7 +57,7 @@ async def test_graph_functional():
|
|||
chat_input.set(should_store_message=False)
|
||||
chat_output = ChatOutput(input_value="test", _id="chat_output")
|
||||
chat_output.set(sender_name=chat_input.message_response)
|
||||
graph = await asyncio.to_thread(Graph, chat_input, chat_output)
|
||||
graph = Graph(chat_input, chat_output)
|
||||
assert graph._run_queue == deque(["chat_input"])
|
||||
await graph.astep()
|
||||
assert graph._run_queue == deque(["chat_output"])
|
||||
|
|
@ -73,7 +72,7 @@ async def test_graph_functional_async_start():
|
|||
chat_input = ChatInput(_id="chat_input")
|
||||
chat_output = ChatOutput(input_value="test", _id="chat_output")
|
||||
chat_output.set(sender_name=chat_input.message_response)
|
||||
graph = await asyncio.to_thread(Graph, chat_input, chat_output)
|
||||
graph = Graph(chat_input, chat_output)
|
||||
# Now iterate through the graph
|
||||
# and check that the graph is running
|
||||
# correctly
|
||||
|
|
|
|||
|
|
@ -1,5 +1,3 @@
|
|||
import asyncio
|
||||
|
||||
from langflow.processing.process import process_tweaks
|
||||
from langflow.services.deps import get_session_service
|
||||
|
||||
|
|
@ -265,7 +263,7 @@ def test_tweak_not_in_template():
|
|||
|
||||
async def test_load_langchain_object_with_cached_session(basic_graph_data):
|
||||
# Provide a non-existent session_id
|
||||
session_service = await asyncio.to_thread(get_session_service)
|
||||
session_service = get_session_service()
|
||||
session_id1 = "non-existent-session-id"
|
||||
graph1, artifacts1 = await session_service.load_session(session_id1, basic_graph_data)
|
||||
# Use the new session_id to get the langchain_object again
|
||||
|
|
|
|||
8
uv.lock
generated
8
uv.lock
generated
|
|
@ -478,14 +478,14 @@ wheels = [
|
|||
|
||||
[[package]]
|
||||
name = "blockbuster"
|
||||
version = "1.3.1"
|
||||
version = "1.3.2"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "forbiddenfruit" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/6a/8c/e7e61396951b9ab1c39c35fa3a9737270a62da41d0b3ff6a15207747e5eb/blockbuster-1.3.1.tar.gz", hash = "sha256:59984286e1cd274d406129f93a42bd62b98fe53f484d544e6e743a5d6d7e42d5", size = 9409 }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/e4/0b/e61888f0510374e2fc3c86f8edff5f56c28b896c080af7732ed3e6b2f205/blockbuster-1.3.2.tar.gz", hash = "sha256:196a5f8fef3f22c718fd8e547dafc75a1936b87ff2b5a1f73ea233b00cff0d6a", size = 9463 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/a2/de/28b5c37969037ad7dcf08c18dba255e1ba3a42539cf04cae3ef627f706ae/blockbuster-1.3.1-py3-none-any.whl", hash = "sha256:883292a02c1ae9dbd76d637ea8b604a1cc75aebb0544d566aed90491e056963e", size = 8111 },
|
||||
{ url = "https://files.pythonhosted.org/packages/9a/c1/745a22df92431b878be1949015164c228a523f97d978ea7f2c7e277e2fc8/blockbuster-1.3.2-py3-none-any.whl", hash = "sha256:57fd9fcc51933cb421400f2b2c708c6fee22269a8691a10a9dbfb671638cfdc7", size = 8153 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
@ -3781,7 +3781,7 @@ requires-dist = [
|
|||
[package.metadata.requires-dev]
|
||||
dev = [
|
||||
{ name = "asgi-lifespan", specifier = ">=2.1.0" },
|
||||
{ name = "blockbuster", specifier = ">=1.3.1,<1.4" },
|
||||
{ name = "blockbuster", specifier = ">=1.3.2,<1.4" },
|
||||
{ name = "dictdiffer", specifier = ">=0.9.0" },
|
||||
{ name = "httpx", specifier = ">=0.27.0" },
|
||||
{ name = "ipykernel", specifier = ">=6.29.0" },
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue