From d2919f228d30965984c0ed2224ab2a9b844f6c5d Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Tue, 13 May 2025 15:36:40 -0400 Subject: [PATCH] feat: add default user agent in api request component (#7631) * Update api_request.py * Update api_request.py * Update api_request.py * [autofix.ci] apply automated fixes * Update src/backend/base/langflow/components/data/api_request.py Co-authored-by: Gabriel Luiz Freitas Almeida * Update src/backend/base/langflow/components/data/api_request.py Co-authored-by: Gabriel Luiz Freitas Almeida * Update api_request.py * [autofix.ci] apply automated fixes * fix format * fix format issues * [autofix.ci] apply automated fixes * Template Update --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Gabriel Luiz Freitas Almeida --- .../langflow/components/data/api_request.py | 20 ++++++++++++++----- .../starter_projects/Pokédex Agent.json | 4 ++-- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/src/backend/base/langflow/components/data/api_request.py b/src/backend/base/langflow/components/data/api_request.py index 5772c7ab5..3edc8bea6 100644 --- a/src/backend/base/langflow/components/data/api_request.py +++ b/src/backend/base/langflow/components/data/api_request.py @@ -29,6 +29,9 @@ from langflow.io import ( from langflow.schema import Data from langflow.schema.dataframe import DataFrame from langflow.schema.dotdict import dotdict +from langflow.services.deps import get_settings_service + +# Get settings using the service class APIRequestComponent(Component): @@ -99,11 +102,12 @@ class APIRequestComponent(Component): value=[], input_types=["Data"], advanced=True, + real_time_refresh=True, ), TableInput( name="headers", display_name="Headers", - info="The headers to send with the request as a dictionary.", + info="The headers to send with the request", table_schema=[ { "name": "key", @@ -118,14 +122,15 @@ class APIRequestComponent(Component): "description": "Header value", }, ], - value=[], + value=[{"key": "User-Agent", "value": get_settings_service().settings.user_agent}], advanced=True, input_types=["Data"], + real_time_refresh=True, ), IntInput( name="timeout", display_name="Timeout", - value=5, + value=30, info="The timeout to use for the request.", advanced=True, ), @@ -278,7 +283,8 @@ class APIRequestComponent(Component): return build_config def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None) -> dotdict: - if field_name == "use_curl": + if field_name == "use_curl" and field_value: + # if we remove field value from validation, this gets validated every time build_config = self._update_curl_mode(build_config, use_curl=field_value) # Fields that should not be reset @@ -303,9 +309,13 @@ class APIRequestComponent(Component): reset_value = type_reset_mapping.get(type(input_field), None) build_config[input_field.name]["value"] = reset_value self.log(f"Reset field {input_field.name} to {reset_value}") - elif field_name == "method" and not self.use_curl: + # Don't try to parse the boolean value as a curl command + return build_config + if field_name == "method" and not self.use_curl: build_config = self._update_method_fields(build_config, field_value) elif field_name == "curl" and self.use_curl and field_value: + # Not reachable, because we don't have a way to update + # the curl field, self.use_curl is set after the build_config is created build_config = self.parse_curl(field_value, build_config) return build_config diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Pokédex Agent.json b/src/backend/base/langflow/initial_setup/starter_projects/Pokédex Agent.json index 746cf1fe9..ed74b1054 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Pokédex Agent.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Pokédex Agent.json @@ -920,7 +920,7 @@ "show": true, "title_case": false, "type": "code", - "value": "import asyncio\nimport json\nimport re\nimport tempfile\nfrom datetime import datetime, timezone\nfrom pathlib import Path\nfrom typing import Any\nfrom urllib.parse import parse_qsl, urlencode, urlparse, urlunparse\n\nimport aiofiles\nimport aiofiles.os as aiofiles_os\nimport httpx\nimport validators\n\nfrom langflow.base.curl.parse import parse_context\nfrom langflow.custom import Component\nfrom langflow.io import (\n BoolInput,\n DataInput,\n DropdownInput,\n FloatInput,\n IntInput,\n MessageTextInput,\n MultilineInput,\n Output,\n StrInput,\n TableInput,\n)\nfrom langflow.schema import Data\nfrom langflow.schema.dataframe import DataFrame\nfrom langflow.schema.dotdict import dotdict\n\n\nclass APIRequestComponent(Component):\n display_name = \"API Request\"\n description = \"Make HTTP requests using URLs or cURL commands.\"\n icon = \"Globe\"\n name = \"APIRequest\"\n\n default_keys = [\"urls\", \"method\", \"query_params\"]\n\n inputs = [\n MessageTextInput(\n name=\"urls\",\n display_name=\"URLs\",\n list=True,\n info=\"Enter one or more URLs, separated by commas.\",\n advanced=False,\n tool_mode=True,\n ),\n MultilineInput(\n name=\"curl\",\n display_name=\"cURL\",\n info=(\n \"Paste a curl command to populate the fields. \"\n \"This will fill in the dictionary fields for headers and body.\"\n ),\n advanced=True,\n real_time_refresh=True,\n tool_mode=True,\n ),\n DropdownInput(\n name=\"method\",\n display_name=\"Method\",\n options=[\"GET\", \"POST\", \"PATCH\", \"PUT\", \"DELETE\"],\n info=\"The HTTP method to use.\",\n real_time_refresh=True,\n ),\n BoolInput(\n name=\"use_curl\",\n display_name=\"Use cURL\",\n value=False,\n info=\"Enable cURL mode to populate fields from a cURL command.\",\n real_time_refresh=True,\n ),\n DataInput(\n name=\"query_params\",\n display_name=\"Query Parameters\",\n info=\"The query parameters to append to the URL.\",\n advanced=True,\n ),\n TableInput(\n name=\"body\",\n display_name=\"Body\",\n info=\"The body to send with the request as a dictionary (for POST, PATCH, PUT).\",\n table_schema=[\n {\n \"name\": \"key\",\n \"display_name\": \"Key\",\n \"type\": \"str\",\n \"description\": \"Parameter name\",\n },\n {\n \"name\": \"value\",\n \"display_name\": \"Value\",\n \"description\": \"Parameter value\",\n },\n ],\n value=[],\n input_types=[\"Data\"],\n advanced=True,\n ),\n TableInput(\n name=\"headers\",\n display_name=\"Headers\",\n info=\"The headers to send with the request as a dictionary.\",\n table_schema=[\n {\n \"name\": \"key\",\n \"display_name\": \"Header\",\n \"type\": \"str\",\n \"description\": \"Header name\",\n },\n {\n \"name\": \"value\",\n \"display_name\": \"Value\",\n \"type\": \"str\",\n \"description\": \"Header value\",\n },\n ],\n value=[],\n advanced=True,\n input_types=[\"Data\"],\n ),\n IntInput(\n name=\"timeout\",\n display_name=\"Timeout\",\n value=5,\n info=\"The timeout to use for the request.\",\n advanced=True,\n ),\n BoolInput(\n name=\"follow_redirects\",\n display_name=\"Follow Redirects\",\n value=True,\n info=\"Whether to follow http redirects.\",\n advanced=True,\n ),\n BoolInput(\n name=\"save_to_file\",\n display_name=\"Save to File\",\n value=False,\n info=\"Save the API response to a temporary file\",\n advanced=True,\n ),\n BoolInput(\n name=\"include_httpx_metadata\",\n display_name=\"Include HTTPx Metadata\",\n value=False,\n info=(\n \"Include properties such as headers, status_code, response_headers, \"\n \"and redirection_history in the output.\"\n ),\n advanced=True,\n ),\n ]\n\n outputs = [\n Output(display_name=\"Data\", name=\"data\", method=\"make_requests\"),\n Output(display_name=\"DataFrame\", name=\"dataframe\", method=\"as_dataframe\"),\n ]\n\n def _parse_json_value(self, value: Any) -> Any:\n \"\"\"Parse a value that might be a JSON string.\"\"\"\n if not isinstance(value, str):\n return value\n\n try:\n parsed = json.loads(value)\n except json.JSONDecodeError:\n return value\n else:\n return parsed\n\n def _process_body(self, body: Any) -> dict:\n \"\"\"Process the body input into a valid dictionary.\n\n Args:\n body: The body to process, can be dict, str, or list\n Returns:\n Processed dictionary\n \"\"\"\n if body is None:\n return {}\n if isinstance(body, dict):\n return self._process_dict_body(body)\n if isinstance(body, str):\n return self._process_string_body(body)\n if isinstance(body, list):\n return self._process_list_body(body)\n\n return {}\n\n def _process_dict_body(self, body: dict) -> dict:\n \"\"\"Process dictionary body by parsing JSON values.\"\"\"\n return {k: self._parse_json_value(v) for k, v in body.items()}\n\n def _process_string_body(self, body: str) -> dict:\n \"\"\"Process string body by attempting JSON parse.\"\"\"\n try:\n return self._process_body(json.loads(body))\n except json.JSONDecodeError:\n return {\"data\": body}\n\n def _process_list_body(self, body: list) -> dict:\n \"\"\"Process list body by converting to key-value dictionary.\"\"\"\n processed_dict = {}\n\n try:\n for item in body:\n if not self._is_valid_key_value_item(item):\n continue\n\n key = item[\"key\"]\n value = self._parse_json_value(item[\"value\"])\n processed_dict[key] = value\n\n except (KeyError, TypeError, ValueError) as e:\n self.log(f\"Failed to process body list: {e}\")\n return {} # Return empty dictionary instead of None\n\n return processed_dict\n\n def _is_valid_key_value_item(self, item: Any) -> bool:\n \"\"\"Check if an item is a valid key-value dictionary.\"\"\"\n return isinstance(item, dict) and \"key\" in item and \"value\" in item\n\n def parse_curl(self, curl: str, build_config: dotdict) -> dotdict:\n \"\"\"Parse a cURL command and update build configuration.\n\n Args:\n curl: The cURL command to parse\n build_config: The build configuration to update\n Returns:\n Updated build configuration\n \"\"\"\n try:\n parsed = parse_context(curl)\n\n # Update basic configuration\n build_config[\"urls\"][\"value\"] = [parsed.url]\n build_config[\"method\"][\"value\"] = parsed.method.upper()\n build_config[\"headers\"][\"advanced\"] = True\n build_config[\"body\"][\"advanced\"] = True\n\n # Process headers\n headers_list = [{\"key\": k, \"value\": v} for k, v in parsed.headers.items()]\n build_config[\"headers\"][\"value\"] = headers_list\n\n if headers_list:\n build_config[\"headers\"][\"advanced\"] = False\n\n # Process body data\n if not parsed.data:\n build_config[\"body\"][\"value\"] = []\n elif parsed.data:\n try:\n json_data = json.loads(parsed.data)\n if isinstance(json_data, dict):\n body_list = [\n {\"key\": k, \"value\": json.dumps(v) if isinstance(v, dict | list) else str(v)}\n for k, v in json_data.items()\n ]\n build_config[\"body\"][\"value\"] = body_list\n build_config[\"body\"][\"advanced\"] = False\n else:\n build_config[\"body\"][\"value\"] = [{\"key\": \"data\", \"value\": json.dumps(json_data)}]\n build_config[\"body\"][\"advanced\"] = False\n except json.JSONDecodeError:\n build_config[\"body\"][\"value\"] = [{\"key\": \"data\", \"value\": parsed.data}]\n build_config[\"body\"][\"advanced\"] = False\n\n except Exception as exc:\n msg = f\"Error parsing curl: {exc}\"\n self.log(msg)\n raise ValueError(msg) from exc\n\n return build_config\n\n def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None) -> dotdict:\n if field_name == \"use_curl\":\n build_config = self._update_curl_mode(build_config, use_curl=field_value)\n\n # Fields that should not be reset\n preserve_fields = {\"timeout\", \"follow_redirects\", \"save_to_file\", \"include_httpx_metadata\", \"use_curl\"}\n\n # Mapping between input types and their reset values\n type_reset_mapping = {\n TableInput: [],\n BoolInput: False,\n IntInput: 0,\n FloatInput: 0.0,\n MessageTextInput: \"\",\n StrInput: \"\",\n MultilineInput: \"\",\n DropdownInput: \"GET\",\n DataInput: {},\n }\n\n for input_field in self.inputs:\n # Only reset if field is not in preserve list\n if input_field.name not in preserve_fields:\n reset_value = type_reset_mapping.get(type(input_field), None)\n build_config[input_field.name][\"value\"] = reset_value\n self.log(f\"Reset field {input_field.name} to {reset_value}\")\n elif field_name == \"method\" and not self.use_curl:\n build_config = self._update_method_fields(build_config, field_value)\n elif field_name == \"curl\" and self.use_curl and field_value:\n build_config = self.parse_curl(field_value, build_config)\n return build_config\n\n def _update_curl_mode(self, build_config: dotdict, *, use_curl: bool) -> dotdict:\n always_visible = [\"method\", \"use_curl\"]\n\n for field in self.inputs:\n field_name = field.name\n field_config = build_config.get(field_name)\n if isinstance(field_config, dict):\n if field_name in always_visible:\n field_config[\"advanced\"] = False\n elif field_name == \"urls\":\n field_config[\"advanced\"] = use_curl\n elif field_name == \"curl\":\n field_config[\"advanced\"] = not use_curl\n field_config[\"real_time_refresh\"] = use_curl\n elif field_name in {\"body\", \"headers\"}:\n field_config[\"advanced\"] = True # Always keep body and headers in advanced when use_curl is False\n else:\n field_config[\"advanced\"] = use_curl\n else:\n self.log(f\"Expected dict for build_config[{field_name}], got {type(field_config).__name__}\")\n\n if not use_curl:\n current_method = build_config.get(\"method\", {}).get(\"value\", \"GET\")\n build_config = self._update_method_fields(build_config, current_method)\n\n return build_config\n\n def _update_method_fields(self, build_config: dotdict, method: str) -> dotdict:\n common_fields = [\n \"urls\",\n \"method\",\n \"use_curl\",\n ]\n\n always_advanced_fields = [\n \"body\",\n \"headers\",\n \"timeout\",\n \"follow_redirects\",\n \"save_to_file\",\n \"include_httpx_metadata\",\n ]\n\n body_fields = [\"body\"]\n\n for field in self.inputs:\n field_name = field.name\n field_config = build_config.get(field_name)\n if isinstance(field_config, dict):\n if field_name in common_fields:\n field_config[\"advanced\"] = False\n elif field_name in body_fields:\n field_config[\"advanced\"] = method not in {\"POST\", \"PUT\", \"PATCH\"}\n elif field_name in always_advanced_fields:\n field_config[\"advanced\"] = True\n else:\n field_config[\"advanced\"] = True\n else:\n self.log(f\"Expected dict for build_config[{field_name}], got {type(field_config).__name__}\")\n\n return build_config\n\n async def make_request(\n self,\n client: httpx.AsyncClient,\n method: str,\n url: str,\n headers: dict | None = None,\n body: Any = None,\n timeout: int = 5,\n *,\n follow_redirects: bool = True,\n save_to_file: bool = False,\n include_httpx_metadata: bool = False,\n ) -> Data:\n method = method.upper()\n if method not in {\"GET\", \"POST\", \"PATCH\", \"PUT\", \"DELETE\"}:\n msg = f\"Unsupported method: {method}\"\n raise ValueError(msg)\n\n # Process body using the new helper method\n processed_body = self._process_body(body)\n redirection_history = []\n\n try:\n response = await client.request(\n method,\n url,\n headers=headers,\n json=processed_body,\n timeout=timeout,\n follow_redirects=follow_redirects,\n )\n\n redirection_history = [\n {\n \"url\": redirect.headers.get(\"Location\", str(redirect.url)),\n \"status_code\": redirect.status_code,\n }\n for redirect in response.history\n ]\n\n is_binary, file_path = await self._response_info(response, with_file_path=save_to_file)\n response_headers = self._headers_to_dict(response.headers)\n\n metadata: dict[str, Any] = {\n \"source\": url,\n }\n\n if save_to_file:\n mode = \"wb\" if is_binary else \"w\"\n encoding = response.encoding if mode == \"w\" else None\n if file_path:\n # Ensure parent directory exists\n await aiofiles_os.makedirs(file_path.parent, exist_ok=True)\n if is_binary:\n async with aiofiles.open(file_path, \"wb\") as f:\n await f.write(response.content)\n await f.flush()\n else:\n async with aiofiles.open(file_path, \"w\", encoding=encoding) as f:\n await f.write(response.text)\n await f.flush()\n metadata[\"file_path\"] = str(file_path)\n\n if include_httpx_metadata:\n metadata.update(\n {\n \"headers\": headers,\n \"status_code\": response.status_code,\n \"response_headers\": response_headers,\n **({\"redirection_history\": redirection_history} if redirection_history else {}),\n }\n )\n return Data(data=metadata)\n\n if is_binary:\n result = response.content\n else:\n try:\n result = response.json()\n except json.JSONDecodeError:\n self.log(\"Failed to decode JSON response\")\n result = response.text.encode(\"utf-8\")\n\n metadata.update({\"result\": result})\n\n if include_httpx_metadata:\n metadata.update(\n {\n \"headers\": headers,\n \"status_code\": response.status_code,\n \"response_headers\": response_headers,\n **({\"redirection_history\": redirection_history} if redirection_history else {}),\n }\n )\n return Data(data=metadata)\n except httpx.TimeoutException:\n return Data(\n data={\n \"source\": url,\n \"headers\": headers,\n \"status_code\": 408,\n \"error\": \"Request timed out\",\n },\n )\n except Exception as exc: # noqa: BLE001\n self.log(f\"Error making request to {url}\")\n return Data(\n data={\n \"source\": url,\n \"headers\": headers,\n \"status_code\": 500,\n \"error\": str(exc),\n **({\"redirection_history\": redirection_history} if redirection_history else {}),\n },\n )\n\n def add_query_params(self, url: str, params: dict) -> str:\n url_parts = list(urlparse(url))\n query = dict(parse_qsl(url_parts[4]))\n query.update(params)\n url_parts[4] = urlencode(query)\n return urlunparse(url_parts)\n\n async def make_requests(self) -> list[Data]:\n method = self.method\n urls = [url.strip() for url in self.urls if url.strip()]\n headers = self.headers or {}\n body = self.body or {}\n timeout = self.timeout\n follow_redirects = self.follow_redirects\n save_to_file = self.save_to_file\n include_httpx_metadata = self.include_httpx_metadata\n\n if self.use_curl and self.curl:\n self._build_config = self.parse_curl(self.curl, dotdict())\n\n invalid_urls = [url for url in urls if not validators.url(url)]\n if invalid_urls:\n msg = f\"Invalid URLs provided: {invalid_urls}\"\n raise ValueError(msg)\n\n if isinstance(self.query_params, str):\n query_params = dict(parse_qsl(self.query_params))\n else:\n query_params = self.query_params.data if self.query_params else {}\n\n # Process headers here\n headers = self._process_headers(headers)\n\n # Process body\n body = self._process_body(body)\n\n bodies = [body] * len(urls)\n\n urls = [self.add_query_params(url, query_params) for url in urls]\n\n async with httpx.AsyncClient() as client:\n results = await asyncio.gather(\n *[\n self.make_request(\n client,\n method,\n u,\n headers,\n rec,\n timeout,\n follow_redirects=follow_redirects,\n save_to_file=save_to_file,\n include_httpx_metadata=include_httpx_metadata,\n )\n for u, rec in zip(urls, bodies, strict=False)\n ]\n )\n self.status = results\n return results\n\n async def _response_info(\n self, response: httpx.Response, *, with_file_path: bool = False\n ) -> tuple[bool, Path | None]:\n \"\"\"Determine the file path and whether the response content is binary.\n\n Args:\n response (Response): The HTTP response object.\n with_file_path (bool): Whether to save the response content to a file.\n\n Returns:\n Tuple[bool, Path | None]:\n A tuple containing a boolean indicating if the content is binary and the full file path (if applicable).\n \"\"\"\n content_type = response.headers.get(\"Content-Type\", \"\")\n is_binary = \"application/octet-stream\" in content_type or \"application/binary\" in content_type\n\n if not with_file_path:\n return is_binary, None\n\n component_temp_dir = Path(tempfile.gettempdir()) / self.__class__.__name__\n\n # Create directory asynchronously\n await aiofiles_os.makedirs(component_temp_dir, exist_ok=True)\n\n filename = None\n if \"Content-Disposition\" in response.headers:\n content_disposition = response.headers[\"Content-Disposition\"]\n filename_match = re.search(r'filename=\"(.+?)\"', content_disposition)\n if filename_match:\n extracted_filename = filename_match.group(1)\n filename = extracted_filename\n\n # Step 3: Infer file extension or use part of the request URL if no filename\n if not filename:\n # Extract the last segment of the URL path\n url_path = urlparse(str(response.request.url) if response.request else \"\").path\n base_name = Path(url_path).name # Get the last segment of the path\n if not base_name: # If the path ends with a slash or is empty\n base_name = \"response\"\n\n # Infer file extension\n content_type_to_extension = {\n \"text/plain\": \".txt\",\n \"application/json\": \".json\",\n \"image/jpeg\": \".jpg\",\n \"image/png\": \".png\",\n \"application/octet-stream\": \".bin\",\n }\n extension = content_type_to_extension.get(content_type, \".bin\" if is_binary else \".txt\")\n filename = f\"{base_name}{extension}\"\n\n # Step 4: Define the full file path\n file_path = component_temp_dir / filename\n\n # Step 5: Check if file exists asynchronously and handle accordingly\n try:\n # Try to create the file exclusively (x mode) to check existence\n async with aiofiles.open(file_path, \"x\") as _:\n pass # File created successfully, we can use this path\n except FileExistsError:\n # If file exists, append a timestamp to the filename\n timestamp = datetime.now(timezone.utc).strftime(\"%Y%m%d%H%M%S%f\")\n file_path = component_temp_dir / f\"{timestamp}-{filename}\"\n\n return is_binary, file_path\n\n def _headers_to_dict(self, headers: httpx.Headers) -> dict[str, str]:\n \"\"\"Convert HTTP headers to a dictionary with lowercased keys.\"\"\"\n return {k.lower(): v for k, v in headers.items()}\n\n def _process_headers(self, headers: Any) -> dict:\n \"\"\"Process the headers input into a valid dictionary.\n\n Args:\n headers: The headers to process, can be dict, str, or list\n Returns:\n Processed dictionary\n \"\"\"\n if headers is None:\n return {}\n if isinstance(headers, dict):\n return headers\n if isinstance(headers, list):\n processed_headers = {}\n try:\n for item in headers:\n if not self._is_valid_key_value_item(item):\n continue\n key = item[\"key\"]\n value = item[\"value\"]\n processed_headers[key] = value\n except (KeyError, TypeError, ValueError) as e:\n self.log(f\"Failed to process headers list: {e}\")\n return {} # Return empty dictionary instead of None\n return processed_headers\n return {}\n\n async def as_dataframe(self) -> DataFrame:\n \"\"\"Convert the API response data into a DataFrame.\n\n Returns:\n DataFrame: A DataFrame containing the API response data.\n \"\"\"\n data = await self.make_requests()\n return DataFrame(data)\n" + "value": "import asyncio\nimport json\nimport re\nimport tempfile\nfrom datetime import datetime, timezone\nfrom pathlib import Path\nfrom typing import Any\nfrom urllib.parse import parse_qsl, urlencode, urlparse, urlunparse\n\nimport aiofiles\nimport aiofiles.os as aiofiles_os\nimport httpx\nimport validators\n\nfrom langflow.base.curl.parse import parse_context\nfrom langflow.custom import Component\nfrom langflow.io import (\n BoolInput,\n DataInput,\n DropdownInput,\n FloatInput,\n IntInput,\n MessageTextInput,\n MultilineInput,\n Output,\n StrInput,\n TableInput,\n)\nfrom langflow.schema import Data\nfrom langflow.schema.dataframe import DataFrame\nfrom langflow.schema.dotdict import dotdict\nfrom langflow.services.deps import get_settings_service\n\n# Get settings using the service\n\n\nclass APIRequestComponent(Component):\n display_name = \"API Request\"\n description = \"Make HTTP requests using URLs or cURL commands.\"\n icon = \"Globe\"\n name = \"APIRequest\"\n\n default_keys = [\"urls\", \"method\", \"query_params\"]\n\n inputs = [\n MessageTextInput(\n name=\"urls\",\n display_name=\"URLs\",\n list=True,\n info=\"Enter one or more URLs, separated by commas.\",\n advanced=False,\n tool_mode=True,\n ),\n MultilineInput(\n name=\"curl\",\n display_name=\"cURL\",\n info=(\n \"Paste a curl command to populate the fields. \"\n \"This will fill in the dictionary fields for headers and body.\"\n ),\n advanced=True,\n real_time_refresh=True,\n tool_mode=True,\n ),\n DropdownInput(\n name=\"method\",\n display_name=\"Method\",\n options=[\"GET\", \"POST\", \"PATCH\", \"PUT\", \"DELETE\"],\n info=\"The HTTP method to use.\",\n real_time_refresh=True,\n ),\n BoolInput(\n name=\"use_curl\",\n display_name=\"Use cURL\",\n value=False,\n info=\"Enable cURL mode to populate fields from a cURL command.\",\n real_time_refresh=True,\n ),\n DataInput(\n name=\"query_params\",\n display_name=\"Query Parameters\",\n info=\"The query parameters to append to the URL.\",\n advanced=True,\n ),\n TableInput(\n name=\"body\",\n display_name=\"Body\",\n info=\"The body to send with the request as a dictionary (for POST, PATCH, PUT).\",\n table_schema=[\n {\n \"name\": \"key\",\n \"display_name\": \"Key\",\n \"type\": \"str\",\n \"description\": \"Parameter name\",\n },\n {\n \"name\": \"value\",\n \"display_name\": \"Value\",\n \"description\": \"Parameter value\",\n },\n ],\n value=[],\n input_types=[\"Data\"],\n advanced=True,\n real_time_refresh=True,\n ),\n TableInput(\n name=\"headers\",\n display_name=\"Headers\",\n info=\"The headers to send with the request\",\n table_schema=[\n {\n \"name\": \"key\",\n \"display_name\": \"Header\",\n \"type\": \"str\",\n \"description\": \"Header name\",\n },\n {\n \"name\": \"value\",\n \"display_name\": \"Value\",\n \"type\": \"str\",\n \"description\": \"Header value\",\n },\n ],\n value=[{\"key\": \"User-Agent\", \"value\": get_settings_service().settings.user_agent}],\n advanced=True,\n input_types=[\"Data\"],\n real_time_refresh=True,\n ),\n IntInput(\n name=\"timeout\",\n display_name=\"Timeout\",\n value=30,\n info=\"The timeout to use for the request.\",\n advanced=True,\n ),\n BoolInput(\n name=\"follow_redirects\",\n display_name=\"Follow Redirects\",\n value=True,\n info=\"Whether to follow http redirects.\",\n advanced=True,\n ),\n BoolInput(\n name=\"save_to_file\",\n display_name=\"Save to File\",\n value=False,\n info=\"Save the API response to a temporary file\",\n advanced=True,\n ),\n BoolInput(\n name=\"include_httpx_metadata\",\n display_name=\"Include HTTPx Metadata\",\n value=False,\n info=(\n \"Include properties such as headers, status_code, response_headers, \"\n \"and redirection_history in the output.\"\n ),\n advanced=True,\n ),\n ]\n\n outputs = [\n Output(display_name=\"Data\", name=\"data\", method=\"make_requests\"),\n Output(display_name=\"DataFrame\", name=\"dataframe\", method=\"as_dataframe\"),\n ]\n\n def _parse_json_value(self, value: Any) -> Any:\n \"\"\"Parse a value that might be a JSON string.\"\"\"\n if not isinstance(value, str):\n return value\n\n try:\n parsed = json.loads(value)\n except json.JSONDecodeError:\n return value\n else:\n return parsed\n\n def _process_body(self, body: Any) -> dict:\n \"\"\"Process the body input into a valid dictionary.\n\n Args:\n body: The body to process, can be dict, str, or list\n Returns:\n Processed dictionary\n \"\"\"\n if body is None:\n return {}\n if isinstance(body, dict):\n return self._process_dict_body(body)\n if isinstance(body, str):\n return self._process_string_body(body)\n if isinstance(body, list):\n return self._process_list_body(body)\n\n return {}\n\n def _process_dict_body(self, body: dict) -> dict:\n \"\"\"Process dictionary body by parsing JSON values.\"\"\"\n return {k: self._parse_json_value(v) for k, v in body.items()}\n\n def _process_string_body(self, body: str) -> dict:\n \"\"\"Process string body by attempting JSON parse.\"\"\"\n try:\n return self._process_body(json.loads(body))\n except json.JSONDecodeError:\n return {\"data\": body}\n\n def _process_list_body(self, body: list) -> dict:\n \"\"\"Process list body by converting to key-value dictionary.\"\"\"\n processed_dict = {}\n\n try:\n for item in body:\n if not self._is_valid_key_value_item(item):\n continue\n\n key = item[\"key\"]\n value = self._parse_json_value(item[\"value\"])\n processed_dict[key] = value\n\n except (KeyError, TypeError, ValueError) as e:\n self.log(f\"Failed to process body list: {e}\")\n return {} # Return empty dictionary instead of None\n\n return processed_dict\n\n def _is_valid_key_value_item(self, item: Any) -> bool:\n \"\"\"Check if an item is a valid key-value dictionary.\"\"\"\n return isinstance(item, dict) and \"key\" in item and \"value\" in item\n\n def parse_curl(self, curl: str, build_config: dotdict) -> dotdict:\n \"\"\"Parse a cURL command and update build configuration.\n\n Args:\n curl: The cURL command to parse\n build_config: The build configuration to update\n Returns:\n Updated build configuration\n \"\"\"\n try:\n parsed = parse_context(curl)\n\n # Update basic configuration\n build_config[\"urls\"][\"value\"] = [parsed.url]\n build_config[\"method\"][\"value\"] = parsed.method.upper()\n build_config[\"headers\"][\"advanced\"] = True\n build_config[\"body\"][\"advanced\"] = True\n\n # Process headers\n headers_list = [{\"key\": k, \"value\": v} for k, v in parsed.headers.items()]\n build_config[\"headers\"][\"value\"] = headers_list\n\n if headers_list:\n build_config[\"headers\"][\"advanced\"] = False\n\n # Process body data\n if not parsed.data:\n build_config[\"body\"][\"value\"] = []\n elif parsed.data:\n try:\n json_data = json.loads(parsed.data)\n if isinstance(json_data, dict):\n body_list = [\n {\"key\": k, \"value\": json.dumps(v) if isinstance(v, dict | list) else str(v)}\n for k, v in json_data.items()\n ]\n build_config[\"body\"][\"value\"] = body_list\n build_config[\"body\"][\"advanced\"] = False\n else:\n build_config[\"body\"][\"value\"] = [{\"key\": \"data\", \"value\": json.dumps(json_data)}]\n build_config[\"body\"][\"advanced\"] = False\n except json.JSONDecodeError:\n build_config[\"body\"][\"value\"] = [{\"key\": \"data\", \"value\": parsed.data}]\n build_config[\"body\"][\"advanced\"] = False\n\n except Exception as exc:\n msg = f\"Error parsing curl: {exc}\"\n self.log(msg)\n raise ValueError(msg) from exc\n\n return build_config\n\n def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None) -> dotdict:\n if field_name == \"use_curl\" and field_value:\n # if we remove field value from validation, this gets validated every time\n build_config = self._update_curl_mode(build_config, use_curl=field_value)\n\n # Fields that should not be reset\n preserve_fields = {\"timeout\", \"follow_redirects\", \"save_to_file\", \"include_httpx_metadata\", \"use_curl\"}\n\n # Mapping between input types and their reset values\n type_reset_mapping = {\n TableInput: [],\n BoolInput: False,\n IntInput: 0,\n FloatInput: 0.0,\n MessageTextInput: \"\",\n StrInput: \"\",\n MultilineInput: \"\",\n DropdownInput: \"GET\",\n DataInput: {},\n }\n\n for input_field in self.inputs:\n # Only reset if field is not in preserve list\n if input_field.name not in preserve_fields:\n reset_value = type_reset_mapping.get(type(input_field), None)\n build_config[input_field.name][\"value\"] = reset_value\n self.log(f\"Reset field {input_field.name} to {reset_value}\")\n # Don't try to parse the boolean value as a curl command\n return build_config\n if field_name == \"method\" and not self.use_curl:\n build_config = self._update_method_fields(build_config, field_value)\n elif field_name == \"curl\" and self.use_curl and field_value:\n # Not reachable, because we don't have a way to update\n # the curl field, self.use_curl is set after the build_config is created\n build_config = self.parse_curl(field_value, build_config)\n return build_config\n\n def _update_curl_mode(self, build_config: dotdict, *, use_curl: bool) -> dotdict:\n always_visible = [\"method\", \"use_curl\"]\n\n for field in self.inputs:\n field_name = field.name\n field_config = build_config.get(field_name)\n if isinstance(field_config, dict):\n if field_name in always_visible:\n field_config[\"advanced\"] = False\n elif field_name == \"urls\":\n field_config[\"advanced\"] = use_curl\n elif field_name == \"curl\":\n field_config[\"advanced\"] = not use_curl\n field_config[\"real_time_refresh\"] = use_curl\n elif field_name in {\"body\", \"headers\"}:\n field_config[\"advanced\"] = True # Always keep body and headers in advanced when use_curl is False\n else:\n field_config[\"advanced\"] = use_curl\n else:\n self.log(f\"Expected dict for build_config[{field_name}], got {type(field_config).__name__}\")\n\n if not use_curl:\n current_method = build_config.get(\"method\", {}).get(\"value\", \"GET\")\n build_config = self._update_method_fields(build_config, current_method)\n\n return build_config\n\n def _update_method_fields(self, build_config: dotdict, method: str) -> dotdict:\n common_fields = [\n \"urls\",\n \"method\",\n \"use_curl\",\n ]\n\n always_advanced_fields = [\n \"body\",\n \"headers\",\n \"timeout\",\n \"follow_redirects\",\n \"save_to_file\",\n \"include_httpx_metadata\",\n ]\n\n body_fields = [\"body\"]\n\n for field in self.inputs:\n field_name = field.name\n field_config = build_config.get(field_name)\n if isinstance(field_config, dict):\n if field_name in common_fields:\n field_config[\"advanced\"] = False\n elif field_name in body_fields:\n field_config[\"advanced\"] = method not in {\"POST\", \"PUT\", \"PATCH\"}\n elif field_name in always_advanced_fields:\n field_config[\"advanced\"] = True\n else:\n field_config[\"advanced\"] = True\n else:\n self.log(f\"Expected dict for build_config[{field_name}], got {type(field_config).__name__}\")\n\n return build_config\n\n async def make_request(\n self,\n client: httpx.AsyncClient,\n method: str,\n url: str,\n headers: dict | None = None,\n body: Any = None,\n timeout: int = 5,\n *,\n follow_redirects: bool = True,\n save_to_file: bool = False,\n include_httpx_metadata: bool = False,\n ) -> Data:\n method = method.upper()\n if method not in {\"GET\", \"POST\", \"PATCH\", \"PUT\", \"DELETE\"}:\n msg = f\"Unsupported method: {method}\"\n raise ValueError(msg)\n\n # Process body using the new helper method\n processed_body = self._process_body(body)\n redirection_history = []\n\n try:\n response = await client.request(\n method,\n url,\n headers=headers,\n json=processed_body,\n timeout=timeout,\n follow_redirects=follow_redirects,\n )\n\n redirection_history = [\n {\n \"url\": redirect.headers.get(\"Location\", str(redirect.url)),\n \"status_code\": redirect.status_code,\n }\n for redirect in response.history\n ]\n\n is_binary, file_path = await self._response_info(response, with_file_path=save_to_file)\n response_headers = self._headers_to_dict(response.headers)\n\n metadata: dict[str, Any] = {\n \"source\": url,\n }\n\n if save_to_file:\n mode = \"wb\" if is_binary else \"w\"\n encoding = response.encoding if mode == \"w\" else None\n if file_path:\n # Ensure parent directory exists\n await aiofiles_os.makedirs(file_path.parent, exist_ok=True)\n if is_binary:\n async with aiofiles.open(file_path, \"wb\") as f:\n await f.write(response.content)\n await f.flush()\n else:\n async with aiofiles.open(file_path, \"w\", encoding=encoding) as f:\n await f.write(response.text)\n await f.flush()\n metadata[\"file_path\"] = str(file_path)\n\n if include_httpx_metadata:\n metadata.update(\n {\n \"headers\": headers,\n \"status_code\": response.status_code,\n \"response_headers\": response_headers,\n **({\"redirection_history\": redirection_history} if redirection_history else {}),\n }\n )\n return Data(data=metadata)\n\n if is_binary:\n result = response.content\n else:\n try:\n result = response.json()\n except json.JSONDecodeError:\n self.log(\"Failed to decode JSON response\")\n result = response.text.encode(\"utf-8\")\n\n metadata.update({\"result\": result})\n\n if include_httpx_metadata:\n metadata.update(\n {\n \"headers\": headers,\n \"status_code\": response.status_code,\n \"response_headers\": response_headers,\n **({\"redirection_history\": redirection_history} if redirection_history else {}),\n }\n )\n return Data(data=metadata)\n except httpx.TimeoutException:\n return Data(\n data={\n \"source\": url,\n \"headers\": headers,\n \"status_code\": 408,\n \"error\": \"Request timed out\",\n },\n )\n except Exception as exc: # noqa: BLE001\n self.log(f\"Error making request to {url}\")\n return Data(\n data={\n \"source\": url,\n \"headers\": headers,\n \"status_code\": 500,\n \"error\": str(exc),\n **({\"redirection_history\": redirection_history} if redirection_history else {}),\n },\n )\n\n def add_query_params(self, url: str, params: dict) -> str:\n url_parts = list(urlparse(url))\n query = dict(parse_qsl(url_parts[4]))\n query.update(params)\n url_parts[4] = urlencode(query)\n return urlunparse(url_parts)\n\n async def make_requests(self) -> list[Data]:\n method = self.method\n urls = [url.strip() for url in self.urls if url.strip()]\n headers = self.headers or {}\n body = self.body or {}\n timeout = self.timeout\n follow_redirects = self.follow_redirects\n save_to_file = self.save_to_file\n include_httpx_metadata = self.include_httpx_metadata\n\n if self.use_curl and self.curl:\n self._build_config = self.parse_curl(self.curl, dotdict())\n\n invalid_urls = [url for url in urls if not validators.url(url)]\n if invalid_urls:\n msg = f\"Invalid URLs provided: {invalid_urls}\"\n raise ValueError(msg)\n\n if isinstance(self.query_params, str):\n query_params = dict(parse_qsl(self.query_params))\n else:\n query_params = self.query_params.data if self.query_params else {}\n\n # Process headers here\n headers = self._process_headers(headers)\n\n # Process body\n body = self._process_body(body)\n\n bodies = [body] * len(urls)\n\n urls = [self.add_query_params(url, query_params) for url in urls]\n\n async with httpx.AsyncClient() as client:\n results = await asyncio.gather(\n *[\n self.make_request(\n client,\n method,\n u,\n headers,\n rec,\n timeout,\n follow_redirects=follow_redirects,\n save_to_file=save_to_file,\n include_httpx_metadata=include_httpx_metadata,\n )\n for u, rec in zip(urls, bodies, strict=False)\n ]\n )\n self.status = results\n return results\n\n async def _response_info(\n self, response: httpx.Response, *, with_file_path: bool = False\n ) -> tuple[bool, Path | None]:\n \"\"\"Determine the file path and whether the response content is binary.\n\n Args:\n response (Response): The HTTP response object.\n with_file_path (bool): Whether to save the response content to a file.\n\n Returns:\n Tuple[bool, Path | None]:\n A tuple containing a boolean indicating if the content is binary and the full file path (if applicable).\n \"\"\"\n content_type = response.headers.get(\"Content-Type\", \"\")\n is_binary = \"application/octet-stream\" in content_type or \"application/binary\" in content_type\n\n if not with_file_path:\n return is_binary, None\n\n component_temp_dir = Path(tempfile.gettempdir()) / self.__class__.__name__\n\n # Create directory asynchronously\n await aiofiles_os.makedirs(component_temp_dir, exist_ok=True)\n\n filename = None\n if \"Content-Disposition\" in response.headers:\n content_disposition = response.headers[\"Content-Disposition\"]\n filename_match = re.search(r'filename=\"(.+?)\"', content_disposition)\n if filename_match:\n extracted_filename = filename_match.group(1)\n filename = extracted_filename\n\n # Step 3: Infer file extension or use part of the request URL if no filename\n if not filename:\n # Extract the last segment of the URL path\n url_path = urlparse(str(response.request.url) if response.request else \"\").path\n base_name = Path(url_path).name # Get the last segment of the path\n if not base_name: # If the path ends with a slash or is empty\n base_name = \"response\"\n\n # Infer file extension\n content_type_to_extension = {\n \"text/plain\": \".txt\",\n \"application/json\": \".json\",\n \"image/jpeg\": \".jpg\",\n \"image/png\": \".png\",\n \"application/octet-stream\": \".bin\",\n }\n extension = content_type_to_extension.get(content_type, \".bin\" if is_binary else \".txt\")\n filename = f\"{base_name}{extension}\"\n\n # Step 4: Define the full file path\n file_path = component_temp_dir / filename\n\n # Step 5: Check if file exists asynchronously and handle accordingly\n try:\n # Try to create the file exclusively (x mode) to check existence\n async with aiofiles.open(file_path, \"x\") as _:\n pass # File created successfully, we can use this path\n except FileExistsError:\n # If file exists, append a timestamp to the filename\n timestamp = datetime.now(timezone.utc).strftime(\"%Y%m%d%H%M%S%f\")\n file_path = component_temp_dir / f\"{timestamp}-{filename}\"\n\n return is_binary, file_path\n\n def _headers_to_dict(self, headers: httpx.Headers) -> dict[str, str]:\n \"\"\"Convert HTTP headers to a dictionary with lowercased keys.\"\"\"\n return {k.lower(): v for k, v in headers.items()}\n\n def _process_headers(self, headers: Any) -> dict:\n \"\"\"Process the headers input into a valid dictionary.\n\n Args:\n headers: The headers to process, can be dict, str, or list\n Returns:\n Processed dictionary\n \"\"\"\n if headers is None:\n return {}\n if isinstance(headers, dict):\n return headers\n if isinstance(headers, list):\n processed_headers = {}\n try:\n for item in headers:\n if not self._is_valid_key_value_item(item):\n continue\n key = item[\"key\"]\n value = item[\"value\"]\n processed_headers[key] = value\n except (KeyError, TypeError, ValueError) as e:\n self.log(f\"Failed to process headers list: {e}\")\n return {} # Return empty dictionary instead of None\n return processed_headers\n return {}\n\n async def as_dataframe(self) -> DataFrame:\n \"\"\"Convert the API response data into a DataFrame.\n\n Returns:\n DataFrame: A DataFrame containing the API response data.\n \"\"\"\n data = await self.make_requests()\n return DataFrame(data)\n" }, "curl": { "_input_type": "MultilineInput", @@ -971,7 +971,7 @@ "advanced": true, "display_name": "Headers", "dynamic": false, - "info": "The headers to send with the request as a dictionary.", + "info": "The headers to send with the request", "input_types": [ "Data" ],