adding ability for APIRequest to retry and save to a file (#4677)
* re-adding ability for APIRequest to retry and save to a file * [autofix.ci] apply automated fixes * enabling selective inclusion of httpx headers, along with response header * [autofix.ci] apply automated fixes * Update api_request.py * [autofix.ci] apply automated fixes * Update test_data_components.py * Update test_data_components.py * [autofix.ci] apply automated fixes * Update api_request.py * Update api_request.py * Update api_request.py * Update api_request.py * Update api_request.py --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Eric Hare <ericrhare@gmail.com>
This commit is contained in:
parent
d200c8aba4
commit
654b44347b
4 changed files with 205 additions and 24 deletions
|
|
@ -1,14 +1,20 @@
|
|||
import asyncio
|
||||
import json
|
||||
import mimetypes
|
||||
import re
|
||||
import tempfile
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
import httpx
|
||||
from loguru import logger
|
||||
import validators
|
||||
|
||||
from langflow.base.curl.parse import parse_context
|
||||
from langflow.custom import Component
|
||||
from langflow.io import DataInput, DropdownInput, IntInput, MessageTextInput, NestedDictInput, Output
|
||||
from langflow.io import BoolInput, DataInput, DropdownInput, IntInput, MessageTextInput, NestedDictInput, Output
|
||||
from langflow.schema import Data
|
||||
from langflow.schema.dotdict import dotdict
|
||||
|
||||
|
|
@ -28,7 +34,7 @@ class APIRequestComponent(Component):
|
|||
MessageTextInput(
|
||||
name="urls",
|
||||
display_name="URLs",
|
||||
is_list=True,
|
||||
list=True,
|
||||
info="Enter one or more URLs, separated by commas.",
|
||||
),
|
||||
MessageTextInput(
|
||||
|
|
@ -73,6 +79,30 @@ class APIRequestComponent(Component):
|
|||
value=5,
|
||||
info="The timeout to use for the request.",
|
||||
),
|
||||
BoolInput(
|
||||
name="follow_redirects",
|
||||
display_name="Follow Redirects",
|
||||
value=True,
|
||||
info="Whether to follow http redirects.",
|
||||
advanced=True,
|
||||
),
|
||||
BoolInput(
|
||||
name="save_to_file",
|
||||
display_name="Save to File",
|
||||
value=False,
|
||||
info="Save the API response to a temporary file",
|
||||
advanced=True,
|
||||
),
|
||||
BoolInput(
|
||||
name="include_httpx_metadata",
|
||||
display_name="Include HTTPx Metadata",
|
||||
value=False,
|
||||
info=(
|
||||
"Include properties such as headers, status_code, response_headers, "
|
||||
"and redirection_history in the output."
|
||||
),
|
||||
advanced=True,
|
||||
),
|
||||
]
|
||||
|
||||
outputs = [
|
||||
|
|
@ -91,12 +121,12 @@ class APIRequestComponent(Component):
|
|||
json_data = json.loads(parsed.data)
|
||||
build_config["body"]["value"] = json_data
|
||||
except json.JSONDecodeError:
|
||||
logger.exception("Error decoding JSON data")
|
||||
self.log("Error decoding JSON data")
|
||||
else:
|
||||
build_config["body"]["value"] = {}
|
||||
except Exception as exc:
|
||||
msg = f"Error parsing curl: {exc}"
|
||||
logger.exception(msg)
|
||||
self.log(msg)
|
||||
raise ValueError(msg) from exc
|
||||
return build_config
|
||||
|
||||
|
|
@ -113,6 +143,10 @@ class APIRequestComponent(Component):
|
|||
headers: dict | None = None,
|
||||
body: dict | None = None,
|
||||
timeout: int = 5,
|
||||
*,
|
||||
follow_redirects: bool = True,
|
||||
save_to_file: bool = False,
|
||||
include_httpx_metadata: bool = False,
|
||||
) -> Data:
|
||||
method = method.upper()
|
||||
if method not in {"GET", "POST", "PATCH", "PUT", "DELETE"}:
|
||||
|
|
@ -124,27 +158,79 @@ class APIRequestComponent(Component):
|
|||
body = json.loads(body)
|
||||
except Exception as e:
|
||||
msg = f"Error decoding JSON data: {e}"
|
||||
logger.exception(msg)
|
||||
self.log.exception(msg)
|
||||
body = None
|
||||
raise ValueError(msg) from e
|
||||
|
||||
data = body or None
|
||||
redirection_history = []
|
||||
|
||||
try:
|
||||
response = await client.request(method, url, headers=headers, json=data, timeout=timeout)
|
||||
try:
|
||||
result = response.json()
|
||||
except Exception: # noqa: BLE001
|
||||
logger.opt(exception=True).debug("Error decoding JSON response")
|
||||
result = response.text
|
||||
return Data(
|
||||
data={
|
||||
"source": url,
|
||||
"headers": headers,
|
||||
"status_code": response.status_code,
|
||||
"result": result,
|
||||
},
|
||||
response = await client.request(
|
||||
method,
|
||||
url,
|
||||
headers=headers,
|
||||
json=data,
|
||||
timeout=timeout,
|
||||
follow_redirects=follow_redirects,
|
||||
)
|
||||
|
||||
redirection_history = [
|
||||
{"url": str(redirect.url), "status_code": redirect.status_code} for redirect in response.history
|
||||
]
|
||||
|
||||
if response.is_redirect:
|
||||
redirection_history.append({"url": str(response.url), "status_code": response.status_code})
|
||||
|
||||
is_binary, file_path = self._response_info(response, with_file_path=save_to_file)
|
||||
response_headers = self._headers_to_dict(response.headers)
|
||||
|
||||
metadata: dict[str, Any] = {
|
||||
"source": url,
|
||||
}
|
||||
|
||||
if save_to_file:
|
||||
mode = "wb" if is_binary else "w"
|
||||
encoding = response.encoding if mode == "w" else None
|
||||
if file_path:
|
||||
with file_path.open(mode, encoding=encoding) as f:
|
||||
f.write(response.content if is_binary else response.text)
|
||||
|
||||
if include_httpx_metadata:
|
||||
metadata.update(
|
||||
{
|
||||
"file_path": str(file_path),
|
||||
"headers": headers,
|
||||
"status_code": response.status_code,
|
||||
"response_headers": response_headers,
|
||||
**({"redirection_history": redirection_history} if redirection_history else {}),
|
||||
}
|
||||
)
|
||||
return Data(data=metadata)
|
||||
# Populate result when not saving to a file
|
||||
if is_binary:
|
||||
result = response.content
|
||||
else:
|
||||
try:
|
||||
result = response.json()
|
||||
except Exception: # noqa: BLE001
|
||||
self.log("Error decoding JSON response")
|
||||
result = response.text.encode("utf-8")
|
||||
|
||||
# Add result to metadata
|
||||
metadata.update({"result": result})
|
||||
|
||||
# Add metadata to the output
|
||||
if include_httpx_metadata:
|
||||
metadata.update(
|
||||
{
|
||||
"headers": headers,
|
||||
"status_code": response.status_code,
|
||||
"response_headers": response_headers,
|
||||
**({"redirection_history": redirection_history} if redirection_history else {}),
|
||||
}
|
||||
)
|
||||
return Data(data=metadata)
|
||||
except httpx.TimeoutException:
|
||||
return Data(
|
||||
data={
|
||||
|
|
@ -155,13 +241,14 @@ class APIRequestComponent(Component):
|
|||
},
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.opt(exception=True).debug(f"Error making request to {url}")
|
||||
self.log(f"Error making request to {url}")
|
||||
return Data(
|
||||
data={
|
||||
"source": url,
|
||||
"headers": headers,
|
||||
"status_code": 500,
|
||||
"error": str(exc),
|
||||
**({"redirection_history": redirection_history} if redirection_history else {}),
|
||||
},
|
||||
)
|
||||
|
||||
|
|
@ -179,6 +266,14 @@ class APIRequestComponent(Component):
|
|||
headers = self.headers or {}
|
||||
body = self.body or {}
|
||||
timeout = self.timeout
|
||||
follow_redirects = self.follow_redirects
|
||||
save_to_file = self.save_to_file
|
||||
include_httpx_metadata = self.include_httpx_metadata
|
||||
|
||||
invalid_urls = [url for url in urls if not validators.url(url)]
|
||||
if invalid_urls:
|
||||
msg = f"Invalid URLs provided: {invalid_urls}"
|
||||
raise ValueError(msg)
|
||||
|
||||
if isinstance(self.query_params, str):
|
||||
query_params = dict(parse_qsl(self.query_params))
|
||||
|
|
@ -201,9 +296,83 @@ class APIRequestComponent(Component):
|
|||
async with httpx.AsyncClient() as client:
|
||||
results = await asyncio.gather(
|
||||
*[
|
||||
self.make_request(client, method, u, headers, rec, timeout)
|
||||
self.make_request(
|
||||
client,
|
||||
method,
|
||||
u,
|
||||
headers,
|
||||
rec,
|
||||
timeout,
|
||||
follow_redirects=follow_redirects,
|
||||
save_to_file=save_to_file,
|
||||
include_httpx_metadata=include_httpx_metadata,
|
||||
)
|
||||
for u, rec in zip(urls, bodies, strict=True)
|
||||
]
|
||||
)
|
||||
self.status = results
|
||||
return results
|
||||
|
||||
def _response_info(self, response: httpx.Response, *, with_file_path: bool = False) -> tuple[bool, Path | None]:
|
||||
"""Determine the file path and whether the response content is binary.
|
||||
|
||||
Args:
|
||||
response (Response): The HTTP response object.
|
||||
with_file_path (bool): Whether to save the response content to a file.
|
||||
|
||||
Returns:
|
||||
Tuple[bool, Path | None]:
|
||||
A tuple containing a boolean indicating if the content is binary and the full file path (if applicable).
|
||||
"""
|
||||
# Determine if the content is binary
|
||||
content_type = response.headers.get("Content-Type", "")
|
||||
is_binary = "application/octet-stream" in content_type or "application/binary" in content_type
|
||||
|
||||
if not with_file_path:
|
||||
return is_binary, None
|
||||
|
||||
# Step 1: Set up a subdirectory for the component in the OS temp directory
|
||||
component_temp_dir = Path(tempfile.gettempdir()) / self.__class__.__name__
|
||||
component_temp_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Step 2: Extract filename from Content-Disposition
|
||||
filename = None
|
||||
if "Content-Disposition" in response.headers:
|
||||
content_disposition = response.headers["Content-Disposition"]
|
||||
filename_match = re.search(r'filename="(.+?)"', content_disposition)
|
||||
if not filename_match: # Try to match RFC 5987 style
|
||||
filename_match = re.search(r"filename\*=(?:UTF-8'')?(.+)", content_disposition)
|
||||
if filename_match:
|
||||
extracted_filename = filename_match.group(1)
|
||||
# Ensure the filename is unique
|
||||
if (component_temp_dir / extracted_filename).exists():
|
||||
timestamp = datetime.now(ZoneInfo("UTC")).strftime("%Y%m%d%H%M%S%f")
|
||||
filename = f"{timestamp}-{extracted_filename}"
|
||||
else:
|
||||
filename = extracted_filename
|
||||
|
||||
# Step 3: Infer file extension or use part of the request URL if no filename
|
||||
if not filename:
|
||||
# Extract the last segment of the URL path
|
||||
url_path = urlparse(str(response.request.url)).path
|
||||
base_name = Path(url_path).name # Get the last segment of the path
|
||||
if not base_name: # If the path ends with a slash or is empty
|
||||
base_name = "response"
|
||||
|
||||
# Infer file extension
|
||||
extension = mimetypes.guess_extension(content_type.split(";")[0]) if content_type else None
|
||||
if not extension:
|
||||
extension = ".bin" if is_binary else ".txt" # Default extensions
|
||||
|
||||
# Combine the base name with timestamp and extension
|
||||
timestamp = datetime.now(ZoneInfo("UTC")).strftime("%Y%m%d%H%M%S%f")
|
||||
filename = f"{timestamp}-{base_name}{extension}"
|
||||
|
||||
# Step 4: Define the full file path
|
||||
file_path = component_temp_dir / filename
|
||||
|
||||
return is_binary, file_path
|
||||
|
||||
def _headers_to_dict(self, headers: httpx.Headers) -> dict[str, str]:
|
||||
"""Convert HTTP headers to a dictionary with lowercased keys."""
|
||||
return {k.lower(): v for k, v in headers.items()}
|
||||
|
|
|
|||
|
|
@ -162,6 +162,7 @@ dependencies = [
|
|||
"fastapi-pagination>=0.12.29,<1.0.0",
|
||||
"defusedxml>=0.7.1,<1.0.0",
|
||||
"pypdf~=5.1.0",
|
||||
"validators>=0.34.0",
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
|
|
|
|||
|
|
@ -24,7 +24,12 @@ async def test_successful_get_request(api_request):
|
|||
respx.get(url).mock(return_value=Response(200, json=mock_response))
|
||||
|
||||
# Making the request
|
||||
result = await api_request.make_request(client=httpx.AsyncClient(), method=method, url=url)
|
||||
result = await api_request.make_request(
|
||||
client=httpx.AsyncClient(),
|
||||
method=method,
|
||||
url=url,
|
||||
include_httpx_metadata=True,
|
||||
)
|
||||
|
||||
# Assertions
|
||||
assert result.data["status_code"] == 200
|
||||
|
|
@ -60,7 +65,9 @@ async def test_failed_request(api_request):
|
|||
respx.get(url).mock(return_value=Response(404))
|
||||
|
||||
# Making the request
|
||||
result = await api_request.make_request(client=httpx.AsyncClient(), method=method, url=url)
|
||||
result = await api_request.make_request(
|
||||
client=httpx.AsyncClient(), method=method, url=url, include_httpx_metadata=True
|
||||
)
|
||||
|
||||
# Assertions
|
||||
assert result.data["status_code"] == 404
|
||||
|
|
@ -74,7 +81,9 @@ async def test_timeout(api_request):
|
|||
respx.get(url).mock(side_effect=httpx.TimeoutException(message="Timeout", request=None))
|
||||
|
||||
# Making the request
|
||||
result = await api_request.make_request(client=httpx.AsyncClient(), method=method, url=url, timeout=1)
|
||||
result = await api_request.make_request(
|
||||
client=httpx.AsyncClient(), method=method, url=url, timeout=1, include_httpx_metadata=True
|
||||
)
|
||||
|
||||
# Assertions
|
||||
assert result.data["status_code"] == 408
|
||||
|
|
|
|||
2
uv.lock
generated
2
uv.lock
generated
|
|
@ -3853,6 +3853,7 @@ dependencies = [
|
|||
{ name = "typer" },
|
||||
{ name = "uncurl" },
|
||||
{ name = "uvicorn" },
|
||||
{ name = "validators" },
|
||||
]
|
||||
|
||||
[package.optional-dependencies]
|
||||
|
|
@ -4013,6 +4014,7 @@ requires-dist = [
|
|||
{ name = "types-requests", marker = "extra == 'dev'", specifier = ">=2.32.0" },
|
||||
{ name = "uncurl", specifier = ">=0.0.11,<1.0.0" },
|
||||
{ name = "uvicorn", specifier = ">=0.30.0,<1.0.0" },
|
||||
{ name = "validators", specifier = ">=0.34.0" },
|
||||
{ name = "vulture", marker = "extra == 'dev'", specifier = ">=2.11" },
|
||||
]
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue