diff --git a/src/backend/base/langflow/components/data/api_request.py b/src/backend/base/langflow/components/data/api_request.py index 11856583d..5f4001df5 100644 --- a/src/backend/base/langflow/components/data/api_request.py +++ b/src/backend/base/langflow/components/data/api_request.py @@ -456,12 +456,15 @@ class APIRequestComponent(Component): def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None) -> dotdict: """Update the build config based on the selected mode.""" if field_name != "mode": + if field_name == "curl_input" and self.mode == "cURL" and self.curl_input: + return self.parse_curl(self.curl_input, build_config) return build_config # print(f"Current mode: {field_value}") if field_value == "cURL": set_field_display(build_config, "curl_input", value=True) - build_config = self.parse_curl(self.curl_input, build_config) + if build_config["curl_input"]["value"]: + build_config = self.parse_curl(build_config["curl_input"]["value"], build_config) else: set_field_display(build_config, "curl_input", value=False) diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Pokédex Agent.json b/src/backend/base/langflow/initial_setup/starter_projects/Pokédex Agent.json index 3f4d83641..5f06ff8e7 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Pokédex Agent.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Pokédex Agent.json @@ -922,7 +922,7 @@ "show": true, "title_case": false, "type": "code", - "value": "import json\nimport re\nimport tempfile\nfrom datetime import datetime, timezone\nfrom pathlib import Path\nfrom typing import Any\nfrom urllib.parse import parse_qsl, urlencode, urlparse, urlunparse\n\nimport aiofiles\nimport aiofiles.os as aiofiles_os\nimport httpx\nimport validators\n\nfrom langflow.base.curl.parse import parse_context\nfrom langflow.custom import Component\nfrom langflow.inputs.inputs import TabInput\nfrom langflow.io import (\n BoolInput,\n DataInput,\n DropdownInput,\n IntInput,\n MessageTextInput,\n MultilineInput,\n Output,\n TableInput,\n)\nfrom langflow.schema import Data\nfrom langflow.schema.dotdict import dotdict\nfrom langflow.services.deps import get_settings_service\nfrom langflow.utils.component_utils import set_current_fields, set_field_advanced, set_field_display\n\n# Define fields for each mode\nMODE_FIELDS = {\n \"URL\": [\n \"url_input\",\n \"method\",\n ],\n \"cURL\": [\"curl_input\"],\n}\n\n# Fields that should always be visible\nDEFAULT_FIELDS = [\"mode\"]\n\n\nclass APIRequestComponent(Component):\n display_name = \"API Request\"\n description = \"Make HTTP requests using URL or cURL commands.\"\n icon = \"Globe\"\n name = \"APIRequest\"\n\n inputs = [\n MessageTextInput(\n name=\"url_input\",\n display_name=\"URL\",\n info=\"Enter the URL for the request.\",\n advanced=False,\n tool_mode=True,\n ),\n MultilineInput(\n name=\"curl_input\",\n display_name=\"cURL\",\n info=(\n \"Paste a curl command to populate the fields. \"\n \"This will fill in the dictionary fields for headers and body.\"\n ),\n real_time_refresh=True,\n tool_mode=True,\n advanced=True,\n show=False,\n ),\n DropdownInput(\n name=\"method\",\n display_name=\"Method\",\n options=[\"GET\", \"POST\", \"PATCH\", \"PUT\", \"DELETE\"],\n value=\"GET\",\n info=\"The HTTP method to use.\",\n real_time_refresh=True,\n ),\n TabInput(\n name=\"mode\",\n display_name=\"Mode\",\n options=[\"URL\", \"cURL\"],\n value=\"URL\",\n info=\"Enable cURL mode to populate fields from a cURL command.\",\n real_time_refresh=True,\n ),\n DataInput(\n name=\"query_params\",\n display_name=\"Query Parameters\",\n info=\"The query parameters to append to the URL.\",\n advanced=True,\n ),\n TableInput(\n name=\"body\",\n display_name=\"Body\",\n info=\"The body to send with the request as a dictionary (for POST, PATCH, PUT).\",\n table_schema=[\n {\n \"name\": \"key\",\n \"display_name\": \"Key\",\n \"type\": \"str\",\n \"description\": \"Parameter name\",\n },\n {\n \"name\": \"value\",\n \"display_name\": \"Value\",\n \"description\": \"Parameter value\",\n },\n ],\n value=[],\n input_types=[\"Data\"],\n advanced=True,\n real_time_refresh=True,\n ),\n TableInput(\n name=\"headers\",\n display_name=\"Headers\",\n info=\"The headers to send with the request\",\n table_schema=[\n {\n \"name\": \"key\",\n \"display_name\": \"Header\",\n \"type\": \"str\",\n \"description\": \"Header name\",\n },\n {\n \"name\": \"value\",\n \"display_name\": \"Value\",\n \"type\": \"str\",\n \"description\": \"Header value\",\n },\n ],\n value=[{\"key\": \"User-Agent\", \"value\": get_settings_service().settings.user_agent}],\n advanced=True,\n input_types=[\"Data\"],\n real_time_refresh=True,\n ),\n IntInput(\n name=\"timeout\",\n display_name=\"Timeout\",\n value=30,\n info=\"The timeout to use for the request.\",\n advanced=True,\n ),\n BoolInput(\n name=\"follow_redirects\",\n display_name=\"Follow Redirects\",\n value=True,\n info=\"Whether to follow http redirects.\",\n advanced=True,\n ),\n BoolInput(\n name=\"save_to_file\",\n display_name=\"Save to File\",\n value=False,\n info=\"Save the API response to a temporary file\",\n advanced=True,\n ),\n BoolInput(\n name=\"include_httpx_metadata\",\n display_name=\"Include HTTPx Metadata\",\n value=False,\n info=(\n \"Include properties such as headers, status_code, response_headers, \"\n \"and redirection_history in the output.\"\n ),\n advanced=True,\n ),\n ]\n\n outputs = [\n Output(display_name=\"API Response\", name=\"data\", method=\"make_api_request\"),\n ]\n\n def _parse_json_value(self, value: Any) -> Any:\n \"\"\"Parse a value that might be a JSON string.\"\"\"\n if not isinstance(value, str):\n return value\n\n try:\n parsed = json.loads(value)\n except json.JSONDecodeError:\n return value\n else:\n return parsed\n\n def _process_body(self, body: Any) -> dict:\n \"\"\"Process the body input into a valid dictionary.\"\"\"\n if body is None:\n return {}\n if isinstance(body, dict):\n return self._process_dict_body(body)\n if isinstance(body, str):\n return self._process_string_body(body)\n if isinstance(body, list):\n return self._process_list_body(body)\n return {}\n\n def _process_dict_body(self, body: dict) -> dict:\n \"\"\"Process dictionary body by parsing JSON values.\"\"\"\n return {k: self._parse_json_value(v) for k, v in body.items()}\n\n def _process_string_body(self, body: str) -> dict:\n \"\"\"Process string body by attempting JSON parse.\"\"\"\n try:\n return self._process_body(json.loads(body))\n except json.JSONDecodeError:\n return {\"data\": body}\n\n def _process_list_body(self, body: list) -> dict:\n \"\"\"Process list body by converting to key-value dictionary.\"\"\"\n processed_dict = {}\n try:\n for item in body:\n if not self._is_valid_key_value_item(item):\n continue\n key = item[\"key\"]\n value = self._parse_json_value(item[\"value\"])\n processed_dict[key] = value\n except (KeyError, TypeError, ValueError) as e:\n self.log(f\"Failed to process body list: {e}\")\n return {}\n return processed_dict\n\n def _is_valid_key_value_item(self, item: Any) -> bool:\n \"\"\"Check if an item is a valid key-value dictionary.\"\"\"\n return isinstance(item, dict) and \"key\" in item and \"value\" in item\n\n def parse_curl(self, curl: str, build_config: dotdict) -> dotdict:\n \"\"\"Parse a cURL command and update build configuration.\"\"\"\n try:\n parsed = parse_context(curl)\n\n # Update basic configuration\n url = parsed.url\n # Normalize URL before setting it\n url = self._normalize_url(url)\n\n build_config[\"url_input\"][\"value\"] = url\n build_config[\"method\"][\"value\"] = parsed.method.upper()\n\n # Process headers\n headers_list = [{\"key\": k, \"value\": v} for k, v in parsed.headers.items()]\n build_config[\"headers\"][\"value\"] = headers_list\n\n # Process body data\n if not parsed.data:\n build_config[\"body\"][\"value\"] = []\n elif parsed.data:\n try:\n json_data = json.loads(parsed.data)\n if isinstance(json_data, dict):\n body_list = [\n {\"key\": k, \"value\": json.dumps(v) if isinstance(v, dict | list) else str(v)}\n for k, v in json_data.items()\n ]\n build_config[\"body\"][\"value\"] = body_list\n else:\n build_config[\"body\"][\"value\"] = [{\"key\": \"data\", \"value\": json.dumps(json_data)}]\n except json.JSONDecodeError:\n build_config[\"body\"][\"value\"] = [{\"key\": \"data\", \"value\": parsed.data}]\n\n except Exception as exc:\n msg = f\"Error parsing curl: {exc}\"\n self.log(msg)\n raise ValueError(msg) from exc\n\n return build_config\n\n def _normalize_url(self, url: str) -> str:\n \"\"\"Normalize URL by adding https:// if no protocol is specified.\"\"\"\n if not url or not isinstance(url, str):\n msg = \"URL cannot be empty\"\n raise ValueError(msg)\n\n url = url.strip()\n if url.startswith((\"http://\", \"https://\")):\n return url\n return f\"https://{url}\"\n\n async def make_request(\n self,\n client: httpx.AsyncClient,\n method: str,\n url: str,\n headers: dict | None = None,\n body: Any = None,\n timeout: int = 5,\n *,\n follow_redirects: bool = True,\n save_to_file: bool = False,\n include_httpx_metadata: bool = False,\n ) -> Data:\n method = method.upper()\n if method not in {\"GET\", \"POST\", \"PATCH\", \"PUT\", \"DELETE\"}:\n msg = f\"Unsupported method: {method}\"\n raise ValueError(msg)\n\n processed_body = self._process_body(body)\n redirection_history = []\n\n try:\n # Prepare request parameters\n request_params = {\n \"method\": method,\n \"url\": url,\n \"headers\": headers,\n \"json\": processed_body,\n \"timeout\": timeout,\n \"follow_redirects\": follow_redirects,\n }\n response = await client.request(**request_params)\n\n redirection_history = [\n {\n \"url\": redirect.headers.get(\"Location\", str(redirect.url)),\n \"status_code\": redirect.status_code,\n }\n for redirect in response.history\n ]\n\n is_binary, file_path = await self._response_info(response, with_file_path=save_to_file)\n response_headers = self._headers_to_dict(response.headers)\n\n # Base metadata\n metadata = {\n \"source\": url,\n \"status_code\": response.status_code,\n \"response_headers\": response_headers,\n }\n\n if redirection_history:\n metadata[\"redirection_history\"] = redirection_history\n\n if save_to_file:\n mode = \"wb\" if is_binary else \"w\"\n encoding = response.encoding if mode == \"w\" else None\n if file_path:\n await aiofiles_os.makedirs(file_path.parent, exist_ok=True)\n if is_binary:\n async with aiofiles.open(file_path, \"wb\") as f:\n await f.write(response.content)\n await f.flush()\n else:\n async with aiofiles.open(file_path, \"w\", encoding=encoding) as f:\n await f.write(response.text)\n await f.flush()\n metadata[\"file_path\"] = str(file_path)\n\n if include_httpx_metadata:\n metadata.update({\"headers\": headers})\n return Data(data=metadata)\n\n # Handle response content\n if is_binary:\n result = response.content\n else:\n try:\n result = response.json()\n except json.JSONDecodeError:\n self.log(\"Failed to decode JSON response\")\n result = response.text.encode(\"utf-8\")\n\n metadata[\"result\"] = result\n\n if include_httpx_metadata:\n metadata.update({\"headers\": headers})\n\n return Data(data=metadata)\n except (httpx.HTTPError, httpx.RequestError, httpx.TimeoutException) as exc:\n self.log(f\"Error making request to {url}\")\n return Data(\n data={\n \"source\": url,\n \"headers\": headers,\n \"status_code\": 500,\n \"error\": str(exc),\n **({\"redirection_history\": redirection_history} if redirection_history else {}),\n },\n )\n\n def add_query_params(self, url: str, params: dict) -> str:\n \"\"\"Add query parameters to URL efficiently.\"\"\"\n if not params:\n return url\n url_parts = list(urlparse(url))\n query = dict(parse_qsl(url_parts[4]))\n query.update(params)\n url_parts[4] = urlencode(query)\n return urlunparse(url_parts)\n\n def _headers_to_dict(self, headers: httpx.Headers) -> dict[str, str]:\n \"\"\"Convert HTTP headers to a dictionary with lowercased keys.\"\"\"\n return {k.lower(): v for k, v in headers.items()}\n\n def _process_headers(self, headers: Any) -> dict:\n \"\"\"Process the headers input into a valid dictionary.\"\"\"\n if headers is None:\n return {}\n if isinstance(headers, dict):\n return headers\n if isinstance(headers, list):\n return {item[\"key\"]: item[\"value\"] for item in headers if self._is_valid_key_value_item(item)}\n return {}\n\n async def make_api_request(self) -> Data:\n \"\"\"Make HTTP request with optimized parameter handling.\"\"\"\n method = self.method\n url = self.url_input.strip() if isinstance(self.url_input, str) else \"\"\n headers = self.headers or {}\n body = self.body or {}\n timeout = self.timeout\n follow_redirects = self.follow_redirects\n save_to_file = self.save_to_file\n include_httpx_metadata = self.include_httpx_metadata\n\n # if self.mode == \"cURL\" and self.curl_input:\n # self._build_config = self.parse_curl(self.curl_input, dotdict())\n # # After parsing curl, get the normalized URL\n # url = self._build_config[\"url_input\"][\"value\"]\n\n # Normalize URL before validation\n url = self._normalize_url(url)\n\n # Validate URL\n if not validators.url(url):\n msg = f\"Invalid URL provided: {url}\"\n raise ValueError(msg)\n\n # Process query parameters\n if isinstance(self.query_params, str):\n query_params = dict(parse_qsl(self.query_params))\n else:\n query_params = self.query_params.data if self.query_params else {}\n\n # Process headers and body\n headers = self._process_headers(headers)\n body = self._process_body(body)\n url = self.add_query_params(url, query_params)\n\n async with httpx.AsyncClient() as client:\n result = await self.make_request(\n client,\n method,\n url,\n headers,\n body,\n timeout,\n follow_redirects=follow_redirects,\n save_to_file=save_to_file,\n include_httpx_metadata=include_httpx_metadata,\n )\n self.status = result\n return result\n\n def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None) -> dotdict:\n \"\"\"Update the build config based on the selected mode.\"\"\"\n if field_name != \"mode\":\n return build_config\n\n # print(f\"Current mode: {field_value}\")\n if field_value == \"cURL\":\n set_field_display(build_config, \"curl_input\", value=True)\n build_config = self.parse_curl(self.curl_input, build_config)\n else:\n set_field_display(build_config, \"curl_input\", value=False)\n\n return set_current_fields(\n build_config=build_config,\n action_fields=MODE_FIELDS,\n selected_action=field_value,\n default_fields=DEFAULT_FIELDS,\n func=set_field_advanced,\n default_value=True,\n )\n\n async def _response_info(\n self, response: httpx.Response, *, with_file_path: bool = False\n ) -> tuple[bool, Path | None]:\n \"\"\"Determine the file path and whether the response content is binary.\n\n Args:\n response (Response): The HTTP response object.\n with_file_path (bool): Whether to save the response content to a file.\n\n Returns:\n Tuple[bool, Path | None]:\n A tuple containing a boolean indicating if the content is binary and the full file path (if applicable).\n \"\"\"\n content_type = response.headers.get(\"Content-Type\", \"\")\n is_binary = \"application/octet-stream\" in content_type or \"application/binary\" in content_type\n\n if not with_file_path:\n return is_binary, None\n\n component_temp_dir = Path(tempfile.gettempdir()) / self.__class__.__name__\n\n # Create directory asynchronously\n await aiofiles_os.makedirs(component_temp_dir, exist_ok=True)\n\n filename = None\n if \"Content-Disposition\" in response.headers:\n content_disposition = response.headers[\"Content-Disposition\"]\n filename_match = re.search(r'filename=\"(.+?)\"', content_disposition)\n if filename_match:\n extracted_filename = filename_match.group(1)\n filename = extracted_filename\n\n # Step 3: Infer file extension or use part of the request URL if no filename\n if not filename:\n # Extract the last segment of the URL path\n url_path = urlparse(str(response.request.url) if response.request else \"\").path\n base_name = Path(url_path).name # Get the last segment of the path\n if not base_name: # If the path ends with a slash or is empty\n base_name = \"response\"\n\n # Infer file extension\n content_type_to_extension = {\n \"text/plain\": \".txt\",\n \"application/json\": \".json\",\n \"image/jpeg\": \".jpg\",\n \"image/png\": \".png\",\n \"application/octet-stream\": \".bin\",\n }\n extension = content_type_to_extension.get(content_type, \".bin\" if is_binary else \".txt\")\n filename = f\"{base_name}{extension}\"\n\n # Step 4: Define the full file path\n file_path = component_temp_dir / filename\n\n # Step 5: Check if file exists asynchronously and handle accordingly\n try:\n # Try to create the file exclusively (x mode) to check existence\n async with aiofiles.open(file_path, \"x\") as _:\n pass # File created successfully, we can use this path\n except FileExistsError:\n # If file exists, append a timestamp to the filename\n timestamp = datetime.now(timezone.utc).strftime(\"%Y%m%d%H%M%S%f\")\n file_path = component_temp_dir / f\"{timestamp}-{filename}\"\n\n return is_binary, file_path\n" + "value": "import json\nimport re\nimport tempfile\nfrom datetime import datetime, timezone\nfrom pathlib import Path\nfrom typing import Any\nfrom urllib.parse import parse_qsl, urlencode, urlparse, urlunparse\n\nimport aiofiles\nimport aiofiles.os as aiofiles_os\nimport httpx\nimport validators\n\nfrom langflow.base.curl.parse import parse_context\nfrom langflow.custom import Component\nfrom langflow.inputs.inputs import TabInput\nfrom langflow.io import (\n BoolInput,\n DataInput,\n DropdownInput,\n IntInput,\n MessageTextInput,\n MultilineInput,\n Output,\n TableInput,\n)\nfrom langflow.schema import Data\nfrom langflow.schema.dotdict import dotdict\nfrom langflow.services.deps import get_settings_service\nfrom langflow.utils.component_utils import set_current_fields, set_field_advanced, set_field_display\n\n# Define fields for each mode\nMODE_FIELDS = {\n \"URL\": [\n \"url_input\",\n \"method\",\n ],\n \"cURL\": [\"curl_input\"],\n}\n\n# Fields that should always be visible\nDEFAULT_FIELDS = [\"mode\"]\n\n\nclass APIRequestComponent(Component):\n display_name = \"API Request\"\n description = \"Make HTTP requests using URL or cURL commands.\"\n icon = \"Globe\"\n name = \"APIRequest\"\n\n inputs = [\n MessageTextInput(\n name=\"url_input\",\n display_name=\"URL\",\n info=\"Enter the URL for the request.\",\n advanced=False,\n tool_mode=True,\n ),\n MultilineInput(\n name=\"curl_input\",\n display_name=\"cURL\",\n info=(\n \"Paste a curl command to populate the fields. \"\n \"This will fill in the dictionary fields for headers and body.\"\n ),\n real_time_refresh=True,\n tool_mode=True,\n advanced=True,\n show=False,\n ),\n DropdownInput(\n name=\"method\",\n display_name=\"Method\",\n options=[\"GET\", \"POST\", \"PATCH\", \"PUT\", \"DELETE\"],\n value=\"GET\",\n info=\"The HTTP method to use.\",\n real_time_refresh=True,\n ),\n TabInput(\n name=\"mode\",\n display_name=\"Mode\",\n options=[\"URL\", \"cURL\"],\n value=\"URL\",\n info=\"Enable cURL mode to populate fields from a cURL command.\",\n real_time_refresh=True,\n ),\n DataInput(\n name=\"query_params\",\n display_name=\"Query Parameters\",\n info=\"The query parameters to append to the URL.\",\n advanced=True,\n ),\n TableInput(\n name=\"body\",\n display_name=\"Body\",\n info=\"The body to send with the request as a dictionary (for POST, PATCH, PUT).\",\n table_schema=[\n {\n \"name\": \"key\",\n \"display_name\": \"Key\",\n \"type\": \"str\",\n \"description\": \"Parameter name\",\n },\n {\n \"name\": \"value\",\n \"display_name\": \"Value\",\n \"description\": \"Parameter value\",\n },\n ],\n value=[],\n input_types=[\"Data\"],\n advanced=True,\n real_time_refresh=True,\n ),\n TableInput(\n name=\"headers\",\n display_name=\"Headers\",\n info=\"The headers to send with the request\",\n table_schema=[\n {\n \"name\": \"key\",\n \"display_name\": \"Header\",\n \"type\": \"str\",\n \"description\": \"Header name\",\n },\n {\n \"name\": \"value\",\n \"display_name\": \"Value\",\n \"type\": \"str\",\n \"description\": \"Header value\",\n },\n ],\n value=[{\"key\": \"User-Agent\", \"value\": get_settings_service().settings.user_agent}],\n advanced=True,\n input_types=[\"Data\"],\n real_time_refresh=True,\n ),\n IntInput(\n name=\"timeout\",\n display_name=\"Timeout\",\n value=30,\n info=\"The timeout to use for the request.\",\n advanced=True,\n ),\n BoolInput(\n name=\"follow_redirects\",\n display_name=\"Follow Redirects\",\n value=True,\n info=\"Whether to follow http redirects.\",\n advanced=True,\n ),\n BoolInput(\n name=\"save_to_file\",\n display_name=\"Save to File\",\n value=False,\n info=\"Save the API response to a temporary file\",\n advanced=True,\n ),\n BoolInput(\n name=\"include_httpx_metadata\",\n display_name=\"Include HTTPx Metadata\",\n value=False,\n info=(\n \"Include properties such as headers, status_code, response_headers, \"\n \"and redirection_history in the output.\"\n ),\n advanced=True,\n ),\n ]\n\n outputs = [\n Output(display_name=\"API Response\", name=\"data\", method=\"make_api_request\"),\n ]\n\n def _parse_json_value(self, value: Any) -> Any:\n \"\"\"Parse a value that might be a JSON string.\"\"\"\n if not isinstance(value, str):\n return value\n\n try:\n parsed = json.loads(value)\n except json.JSONDecodeError:\n return value\n else:\n return parsed\n\n def _process_body(self, body: Any) -> dict:\n \"\"\"Process the body input into a valid dictionary.\"\"\"\n if body is None:\n return {}\n if isinstance(body, dict):\n return self._process_dict_body(body)\n if isinstance(body, str):\n return self._process_string_body(body)\n if isinstance(body, list):\n return self._process_list_body(body)\n return {}\n\n def _process_dict_body(self, body: dict) -> dict:\n \"\"\"Process dictionary body by parsing JSON values.\"\"\"\n return {k: self._parse_json_value(v) for k, v in body.items()}\n\n def _process_string_body(self, body: str) -> dict:\n \"\"\"Process string body by attempting JSON parse.\"\"\"\n try:\n return self._process_body(json.loads(body))\n except json.JSONDecodeError:\n return {\"data\": body}\n\n def _process_list_body(self, body: list) -> dict:\n \"\"\"Process list body by converting to key-value dictionary.\"\"\"\n processed_dict = {}\n try:\n for item in body:\n if not self._is_valid_key_value_item(item):\n continue\n key = item[\"key\"]\n value = self._parse_json_value(item[\"value\"])\n processed_dict[key] = value\n except (KeyError, TypeError, ValueError) as e:\n self.log(f\"Failed to process body list: {e}\")\n return {}\n return processed_dict\n\n def _is_valid_key_value_item(self, item: Any) -> bool:\n \"\"\"Check if an item is a valid key-value dictionary.\"\"\"\n return isinstance(item, dict) and \"key\" in item and \"value\" in item\n\n\n def parse_curl(self, curl: str, build_config: dotdict) -> dotdict:\n \"\"\"Parse a cURL command and update build configuration.\"\"\"\n try:\n parsed = parse_context(curl)\n\n # Update basic configuration\n url = parsed.url\n # Normalize URL before setting it\n url = self._normalize_url(url)\n\n build_config[\"url_input\"][\"value\"] = url\n build_config[\"method\"][\"value\"] = parsed.method.upper()\n\n # Process headers\n headers_list = [{\"key\": k, \"value\": v} for k, v in parsed.headers.items()]\n build_config[\"headers\"][\"value\"] = headers_list\n\n # Process body data\n if not parsed.data:\n build_config[\"body\"][\"value\"] = []\n elif parsed.data:\n try:\n json_data = json.loads(parsed.data)\n if isinstance(json_data, dict):\n body_list = [\n {\"key\": k, \"value\": json.dumps(v) if isinstance(v, dict | list) else str(v)}\n for k, v in json_data.items()\n ]\n build_config[\"body\"][\"value\"] = body_list\n else:\n build_config[\"body\"][\"value\"] = [{\"key\": \"data\", \"value\": json.dumps(json_data)}]\n except json.JSONDecodeError:\n build_config[\"body\"][\"value\"] = [{\"key\": \"data\", \"value\": parsed.data}]\n\n except Exception as exc:\n msg = f\"Error parsing curl: {exc}\"\n self.log(msg)\n raise ValueError(msg) from exc\n\n return build_config\n\n def _normalize_url(self, url: str) -> str:\n \"\"\"Normalize URL by adding https:// if no protocol is specified.\"\"\"\n if not url or not isinstance(url, str):\n msg = \"URL cannot be empty\"\n raise ValueError(msg)\n\n url = url.strip()\n if url.startswith((\"http://\", \"https://\")):\n return url\n return f\"https://{url}\"\n\n async def make_request(\n self,\n client: httpx.AsyncClient,\n method: str,\n url: str,\n headers: dict | None = None,\n body: Any = None,\n timeout: int = 5,\n *,\n follow_redirects: bool = True,\n save_to_file: bool = False,\n include_httpx_metadata: bool = False,\n ) -> Data:\n method = method.upper()\n if method not in {\"GET\", \"POST\", \"PATCH\", \"PUT\", \"DELETE\"}:\n msg = f\"Unsupported method: {method}\"\n raise ValueError(msg)\n\n processed_body = self._process_body(body)\n redirection_history = []\n\n try:\n # Prepare request parameters\n request_params = {\n \"method\": method,\n \"url\": url,\n \"headers\": headers,\n \"json\": processed_body,\n \"timeout\": timeout,\n \"follow_redirects\": follow_redirects,\n }\n response = await client.request(**request_params)\n\n redirection_history = [\n {\n \"url\": redirect.headers.get(\"Location\", str(redirect.url)),\n \"status_code\": redirect.status_code,\n }\n for redirect in response.history\n ]\n\n is_binary, file_path = await self._response_info(response, with_file_path=save_to_file)\n response_headers = self._headers_to_dict(response.headers)\n\n # Base metadata\n metadata = {\n \"source\": url,\n \"status_code\": response.status_code,\n \"response_headers\": response_headers,\n }\n\n if redirection_history:\n metadata[\"redirection_history\"] = redirection_history\n\n if save_to_file:\n mode = \"wb\" if is_binary else \"w\"\n encoding = response.encoding if mode == \"w\" else None\n if file_path:\n await aiofiles_os.makedirs(file_path.parent, exist_ok=True)\n if is_binary:\n async with aiofiles.open(file_path, \"wb\") as f:\n await f.write(response.content)\n await f.flush()\n else:\n async with aiofiles.open(file_path, \"w\", encoding=encoding) as f:\n await f.write(response.text)\n await f.flush()\n metadata[\"file_path\"] = str(file_path)\n\n if include_httpx_metadata:\n metadata.update({\"headers\": headers})\n return Data(data=metadata)\n\n # Handle response content\n if is_binary:\n result = response.content\n else:\n try:\n result = response.json()\n except json.JSONDecodeError:\n self.log(\"Failed to decode JSON response\")\n result = response.text.encode(\"utf-8\")\n\n metadata[\"result\"] = result\n\n if include_httpx_metadata:\n metadata.update({\"headers\": headers})\n\n return Data(data=metadata)\n except (httpx.HTTPError, httpx.RequestError, httpx.TimeoutException) as exc:\n self.log(f\"Error making request to {url}\")\n return Data(\n data={\n \"source\": url,\n \"headers\": headers,\n \"status_code\": 500,\n \"error\": str(exc),\n **({\"redirection_history\": redirection_history} if redirection_history else {}),\n },\n )\n\n def add_query_params(self, url: str, params: dict) -> str:\n \"\"\"Add query parameters to URL efficiently.\"\"\"\n if not params:\n return url\n url_parts = list(urlparse(url))\n query = dict(parse_qsl(url_parts[4]))\n query.update(params)\n url_parts[4] = urlencode(query)\n return urlunparse(url_parts)\n\n def _headers_to_dict(self, headers: httpx.Headers) -> dict[str, str]:\n \"\"\"Convert HTTP headers to a dictionary with lowercased keys.\"\"\"\n return {k.lower(): v for k, v in headers.items()}\n\n def _process_headers(self, headers: Any) -> dict:\n \"\"\"Process the headers input into a valid dictionary.\"\"\"\n if headers is None:\n return {}\n if isinstance(headers, dict):\n return headers\n if isinstance(headers, list):\n return {item[\"key\"]: item[\"value\"] for item in headers if self._is_valid_key_value_item(item)}\n return {}\n\n async def make_api_request(self) -> Data:\n \"\"\"Make HTTP request with optimized parameter handling.\"\"\"\n method = self.method\n url = self.url_input.strip() if isinstance(self.url_input, str) else \"\"\n headers = self.headers or {}\n body = self.body or {}\n timeout = self.timeout\n follow_redirects = self.follow_redirects\n save_to_file = self.save_to_file\n include_httpx_metadata = self.include_httpx_metadata\n\n # if self.mode == \"cURL\" and self.curl_input:\n # self._build_config = self.parse_curl(self.curl_input, dotdict())\n # # After parsing curl, get the normalized URL\n # url = self._build_config[\"url_input\"][\"value\"]\n\n # Normalize URL before validation\n url = self._normalize_url(url)\n\n # Validate URL\n if not validators.url(url):\n msg = f\"Invalid URL provided: {url}\"\n raise ValueError(msg)\n\n # Process query parameters\n if isinstance(self.query_params, str):\n query_params = dict(parse_qsl(self.query_params))\n else:\n query_params = self.query_params.data if self.query_params else {}\n\n # Process headers and body\n headers = self._process_headers(headers)\n body = self._process_body(body)\n url = self.add_query_params(url, query_params)\n\n async with httpx.AsyncClient() as client:\n result = await self.make_request(\n client,\n method,\n url,\n headers,\n body,\n timeout,\n follow_redirects=follow_redirects,\n save_to_file=save_to_file,\n include_httpx_metadata=include_httpx_metadata,\n )\n self.status = result\n return result\n\n def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None) -> dotdict:\n \"\"\"Update the build config based on the selected mode.\"\"\"\n if field_name != \"mode\":\n if field_name == \"curl_input\" and self.mode == \"cURL\" and self.curl_input:\n return self.parse_curl(self.curl_input, build_config)\n return build_config\n\n # print(f\"Current mode: {field_value}\")\n if field_value == \"cURL\":\n set_field_display(build_config, \"curl_input\", value=True)\n if build_config[\"curl_input\"][\"value\"]:\n build_config = self.parse_curl(build_config[\"curl_input\"][\"value\"], build_config)\n else:\n set_field_display(build_config, \"curl_input\", value=False)\n\n return set_current_fields(\n build_config=build_config,\n action_fields=MODE_FIELDS,\n selected_action=field_value,\n default_fields=DEFAULT_FIELDS,\n func=set_field_advanced,\n default_value=True,\n )\n\n async def _response_info(\n self, response: httpx.Response, *, with_file_path: bool = False\n ) -> tuple[bool, Path | None]:\n \"\"\"Determine the file path and whether the response content is binary.\n\n Args:\n response (Response): The HTTP response object.\n with_file_path (bool): Whether to save the response content to a file.\n\n Returns:\n Tuple[bool, Path | None]:\n A tuple containing a boolean indicating if the content is binary and the full file path (if applicable).\n \"\"\"\n content_type = response.headers.get(\"Content-Type\", \"\")\n is_binary = \"application/octet-stream\" in content_type or \"application/binary\" in content_type\n\n if not with_file_path:\n return is_binary, None\n\n component_temp_dir = Path(tempfile.gettempdir()) / self.__class__.__name__\n\n # Create directory asynchronously\n await aiofiles_os.makedirs(component_temp_dir, exist_ok=True)\n\n filename = None\n if \"Content-Disposition\" in response.headers:\n content_disposition = response.headers[\"Content-Disposition\"]\n filename_match = re.search(r'filename=\"(.+?)\"', content_disposition)\n if filename_match:\n extracted_filename = filename_match.group(1)\n filename = extracted_filename\n\n # Step 3: Infer file extension or use part of the request URL if no filename\n if not filename:\n # Extract the last segment of the URL path\n url_path = urlparse(str(response.request.url) if response.request else \"\").path\n base_name = Path(url_path).name # Get the last segment of the path\n if not base_name: # If the path ends with a slash or is empty\n base_name = \"response\"\n\n # Infer file extension\n content_type_to_extension = {\n \"text/plain\": \".txt\",\n \"application/json\": \".json\",\n \"image/jpeg\": \".jpg\",\n \"image/png\": \".png\",\n \"application/octet-stream\": \".bin\",\n }\n extension = content_type_to_extension.get(content_type, \".bin\" if is_binary else \".txt\")\n filename = f\"{base_name}{extension}\"\n\n # Step 4: Define the full file path\n file_path = component_temp_dir / filename\n\n # Step 5: Check if file exists asynchronously and handle accordingly\n try:\n # Try to create the file exclusively (x mode) to check existence\n async with aiofiles.open(file_path, \"x\") as _:\n pass # File created successfully, we can use this path\n except FileExistsError:\n # If file exists, append a timestamp to the filename\n timestamp = datetime.now(timezone.utc).strftime(\"%Y%m%d%H%M%S%f\")\n file_path = component_temp_dir / f\"{timestamp}-{filename}\"\n\n return is_binary, file_path\n" }, "curl_input": { "_input_type": "MultilineInput",