refactor: Update URLComponent and FileComponent to use langflow custom Component class
Refactor the URLComponent and FileComponent classes to inherit from the langflow custom Component class instead of the deprecated CustomComponent class. This change ensures compatibility with the latest langflow library updates and improves the maintainability of the code.
This commit is contained in:
parent
259bdcbd95
commit
0a9ef2cde0
4 changed files with 187 additions and 153 deletions
|
|
@ -1,56 +1,66 @@
|
|||
import asyncio
|
||||
import json
|
||||
from typing import Any, List, Optional
|
||||
from typing import List, Optional
|
||||
|
||||
import httpx
|
||||
from loguru import logger
|
||||
|
||||
from langflow.base.curl.parse import parse_context
|
||||
from langflow.custom import CustomComponent
|
||||
from langflow.field_typing import NestedDict
|
||||
from langflow.custom import Component
|
||||
from langflow.inputs import StrInput, DropdownInput, NestedDictInput, IntInput
|
||||
from langflow.schema import Data
|
||||
from langflow.schema.dotdict import dotdict
|
||||
from langflow.template import Output
|
||||
|
||||
|
||||
class APIRequest(CustomComponent):
|
||||
display_name: str = "API Request"
|
||||
description: str = "Make HTTP requests given one or more URLs."
|
||||
output_types: list[str] = ["Data"]
|
||||
documentation: str = "https://docs.langflow.org/components/utilities#api-request"
|
||||
class APIRequestComponent(Component):
|
||||
display_name = "API Request"
|
||||
description = "Make HTTP requests given one or more URLs."
|
||||
icon = "Globe"
|
||||
|
||||
field_config = {
|
||||
"urls": {"display_name": "URLs", "info": "URLs to make requests to."},
|
||||
"curl": {
|
||||
"display_name": "Curl",
|
||||
"info": "Paste a curl command to populate the fields.",
|
||||
"refresh_button": True,
|
||||
"refresh_button_text": "",
|
||||
},
|
||||
"method": {
|
||||
"display_name": "Method",
|
||||
"info": "The HTTP method to use.",
|
||||
"options": ["GET", "POST", "PATCH", "PUT"],
|
||||
"value": "GET",
|
||||
},
|
||||
"headers": {
|
||||
"display_name": "Headers",
|
||||
"info": "The headers to send with the request.",
|
||||
"input_types": ["Data"],
|
||||
},
|
||||
"body": {
|
||||
"display_name": "Body",
|
||||
"info": "The body to send with the request (for POST, PATCH, PUT).",
|
||||
"input_types": ["Data"],
|
||||
},
|
||||
"timeout": {
|
||||
"display_name": "Timeout",
|
||||
"info": "The timeout to use for the request.",
|
||||
"value": 5,
|
||||
},
|
||||
}
|
||||
inputs = [
|
||||
StrInput(
|
||||
name="urls",
|
||||
display_name="URLs",
|
||||
multiline=True,
|
||||
info="Enter one or more URLs, separated by commas.",
|
||||
),
|
||||
StrInput(
|
||||
name="curl",
|
||||
display_name="Curl",
|
||||
info="Paste a curl command to populate the fields.",
|
||||
advanced=True,
|
||||
),
|
||||
DropdownInput(
|
||||
name="method",
|
||||
display_name="Method",
|
||||
options=["GET", "POST", "PATCH", "PUT"],
|
||||
value="GET",
|
||||
info="The HTTP method to use.",
|
||||
),
|
||||
NestedDictInput(
|
||||
name="headers",
|
||||
display_name="Headers",
|
||||
info="The headers to send with the request.",
|
||||
),
|
||||
NestedDictInput(
|
||||
name="body",
|
||||
display_name="Body",
|
||||
info="The body to send with the request (for POST, PATCH, PUT).",
|
||||
),
|
||||
IntInput(
|
||||
name="timeout",
|
||||
display_name="Timeout",
|
||||
value=5,
|
||||
info="The timeout to use for the request.",
|
||||
),
|
||||
]
|
||||
|
||||
def parse_curl(self, curl: str, build_config: dotdict) -> dotdict:
|
||||
outputs = [
|
||||
Output(display_name="Data", name="data", method="make_requests"),
|
||||
]
|
||||
|
||||
def parse_curl(self, curl: str):
|
||||
build_config = self._build_config()
|
||||
try:
|
||||
parsed = parse_context(curl)
|
||||
build_config["urls"]["value"] = [parsed.url]
|
||||
|
|
@ -61,17 +71,12 @@ class APIRequest(CustomComponent):
|
|||
json_data = json.loads(parsed.data)
|
||||
build_config["body"]["value"] = json_data
|
||||
except json.JSONDecodeError as e:
|
||||
print(e)
|
||||
logger.error(e)
|
||||
except Exception as exc:
|
||||
logger.error(f"Error parsing curl: {exc}")
|
||||
raise ValueError(f"Error parsing curl: {exc}")
|
||||
return build_config
|
||||
|
||||
def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
|
||||
if field_name == "curl" and field_value is not None:
|
||||
build_config = self.parse_curl(field_value, build_config)
|
||||
return build_config
|
||||
|
||||
async def make_request(
|
||||
self,
|
||||
client: httpx.AsyncClient,
|
||||
|
|
@ -120,18 +125,18 @@ class APIRequest(CustomComponent):
|
|||
},
|
||||
)
|
||||
|
||||
async def build(
|
||||
self,
|
||||
method: str,
|
||||
urls: List[str],
|
||||
curl: Optional[str] = None,
|
||||
headers: Optional[NestedDict] = {},
|
||||
body: Optional[NestedDict] = {},
|
||||
timeout: int = 5,
|
||||
) -> List[Data]:
|
||||
if headers is None:
|
||||
headers_dict = {}
|
||||
elif isinstance(headers, Data):
|
||||
async def make_requests(self) -> List[Data]:
|
||||
method = self.method
|
||||
urls = [url.strip() for url in self.urls.split(",") if url.strip()]
|
||||
curl = self.curl
|
||||
headers = self.headers or {}
|
||||
body = self.body or {}
|
||||
timeout = self.timeout
|
||||
|
||||
if curl:
|
||||
self._build_config = self.parse_curl(curl)
|
||||
|
||||
if isinstance(headers, Data):
|
||||
headers_dict = headers.data
|
||||
else:
|
||||
headers_dict = headers
|
||||
|
|
@ -142,11 +147,11 @@ class APIRequest(CustomComponent):
|
|||
bodies = [body]
|
||||
else:
|
||||
bodies = body
|
||||
bodies = [b.data if isinstance(b, Data) else b for b in bodies] # type: ignore
|
||||
bodies = [b.data if isinstance(b, Data) else b for b in bodies]
|
||||
|
||||
if len(urls) != len(bodies):
|
||||
# add bodies with None
|
||||
bodies += [None] * (len(urls) - len(bodies)) # type: ignore
|
||||
bodies += [None] * (len(urls) - len(bodies))
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
results = await asyncio.gather(
|
||||
*[self.make_request(client, method, u, headers_dict, rec, timeout) for u, rec in zip(urls, bodies)]
|
||||
|
|
|
|||
|
|
@ -1,55 +1,80 @@
|
|||
from typing import Any, Dict, List, Optional
|
||||
from typing import List, Optional
|
||||
|
||||
from langflow.base.data.utils import parallel_load_data, parse_text_file_to_data, retrieve_file_paths
|
||||
from langflow.custom import CustomComponent
|
||||
from langflow.custom import Component
|
||||
from langflow.inputs import StrInput, IntInput, BoolInput
|
||||
from langflow.template import Output
|
||||
from langflow.schema import Data
|
||||
|
||||
|
||||
class DirectoryComponent(CustomComponent):
|
||||
class DirectoryComponent(Component):
|
||||
display_name = "Directory"
|
||||
description = "Recursively load files from a directory."
|
||||
icon = "folder"
|
||||
|
||||
def build_config(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"path": {"display_name": "Path"},
|
||||
"types": {
|
||||
"display_name": "Types",
|
||||
"info": "File types to load. Leave empty to load all types.",
|
||||
},
|
||||
"depth": {"display_name": "Depth", "info": "Depth to search for files."},
|
||||
"max_concurrency": {"display_name": "Max Concurrency", "advanced": True},
|
||||
"load_hidden": {
|
||||
"display_name": "Load Hidden",
|
||||
"advanced": True,
|
||||
"info": "If true, hidden files will be loaded.",
|
||||
},
|
||||
"recursive": {
|
||||
"display_name": "Recursive",
|
||||
"advanced": True,
|
||||
"info": "If true, the search will be recursive.",
|
||||
},
|
||||
"silent_errors": {
|
||||
"display_name": "Silent Errors",
|
||||
"advanced": True,
|
||||
"info": "If true, errors will not raise an exception.",
|
||||
},
|
||||
"use_multithreading": {
|
||||
"display_name": "Use Multithreading",
|
||||
"advanced": True,
|
||||
},
|
||||
}
|
||||
inputs = [
|
||||
StrInput(
|
||||
name="path",
|
||||
display_name="Path",
|
||||
info="Path to the directory to load files from.",
|
||||
),
|
||||
StrInput(
|
||||
name="types",
|
||||
display_name="Types",
|
||||
info="File types to load. Leave empty to load all types.",
|
||||
),
|
||||
IntInput(
|
||||
name="depth",
|
||||
display_name="Depth",
|
||||
info="Depth to search for files.",
|
||||
value=0,
|
||||
),
|
||||
IntInput(
|
||||
name="max_concurrency",
|
||||
display_name="Max Concurrency",
|
||||
advanced=True,
|
||||
info="Maximum concurrency for loading files.",
|
||||
value=2,
|
||||
),
|
||||
BoolInput(
|
||||
name="load_hidden",
|
||||
display_name="Load Hidden",
|
||||
advanced=True,
|
||||
info="If true, hidden files will be loaded.",
|
||||
),
|
||||
BoolInput(
|
||||
name="recursive",
|
||||
display_name="Recursive",
|
||||
advanced=True,
|
||||
info="If true, the search will be recursive.",
|
||||
),
|
||||
BoolInput(
|
||||
name="silent_errors",
|
||||
display_name="Silent Errors",
|
||||
advanced=True,
|
||||
info="If true, errors will not raise an exception.",
|
||||
),
|
||||
BoolInput(
|
||||
name="use_multithreading",
|
||||
display_name="Use Multithreading",
|
||||
advanced=True,
|
||||
info="If true, multithreading will be used.",
|
||||
),
|
||||
]
|
||||
|
||||
outputs = [
|
||||
Output(display_name="Data", name="data", method="load_directory"),
|
||||
]
|
||||
|
||||
def load_directory(self) -> List[Optional[Data]]:
|
||||
path = self.path
|
||||
depth = self.depth
|
||||
max_concurrency = self.max_concurrency
|
||||
load_hidden = self.load_hidden
|
||||
recursive = self.recursive
|
||||
silent_errors = self.silent_errors
|
||||
use_multithreading = self.use_multithreading
|
||||
|
||||
def build(
|
||||
self,
|
||||
path: str,
|
||||
depth: int = 0,
|
||||
max_concurrency: int = 2,
|
||||
load_hidden: bool = False,
|
||||
recursive: bool = True,
|
||||
silent_errors: bool = False,
|
||||
use_multithreading: bool = True,
|
||||
) -> List[Optional[Data]]:
|
||||
resolved_path = self.resolve_path(path)
|
||||
file_paths = retrieve_file_paths(resolved_path, load_hidden, recursive, depth)
|
||||
loaded_data = []
|
||||
|
|
|
|||
|
|
@ -1,48 +1,46 @@
|
|||
from pathlib import Path
|
||||
from typing import Any, Dict
|
||||
|
||||
from langflow.base.data.utils import TEXT_FILE_TYPES, parse_text_file_to_data
|
||||
from langflow.custom import CustomComponent
|
||||
from langflow.custom import Component
|
||||
from langflow.inputs import FileInput, BoolInput
|
||||
from langflow.template import Output
|
||||
from langflow.schema import Data
|
||||
from langflow.base.data.utils import TEXT_FILE_TYPES, parse_text_file_to_data
|
||||
|
||||
|
||||
class FileComponent(CustomComponent):
|
||||
class FileComponent(Component):
|
||||
display_name = "File"
|
||||
description = "A generic file loader."
|
||||
icon = "file-text"
|
||||
|
||||
def build_config(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"path": {
|
||||
"display_name": "Path",
|
||||
"field_type": "file",
|
||||
"file_types": TEXT_FILE_TYPES,
|
||||
"info": f"Supported file types: {', '.join(TEXT_FILE_TYPES)}",
|
||||
},
|
||||
"silent_errors": {
|
||||
"display_name": "Silent Errors",
|
||||
"advanced": True,
|
||||
"info": "If true, errors will not raise an exception.",
|
||||
},
|
||||
}
|
||||
inputs = [
|
||||
FileInput(
|
||||
name="path",
|
||||
display_name="Path",
|
||||
info=f"Supported file types: {', '.join(TEXT_FILE_TYPES)}",
|
||||
),
|
||||
BoolInput(
|
||||
name="silent_errors",
|
||||
display_name="Silent Errors",
|
||||
advanced=True,
|
||||
info="If true, errors will not raise an exception.",
|
||||
),
|
||||
]
|
||||
|
||||
outputs = [
|
||||
Output(display_name="Data", name="data", method="load_file"),
|
||||
]
|
||||
|
||||
def load_file(self) -> Data:
|
||||
path = self.path
|
||||
silent_errors = self.silent_errors
|
||||
|
||||
resolved_path = Path(path).resolve()
|
||||
extension = resolved_path.suffix[1:].lower()
|
||||
|
||||
def load_file(self, path: str, silent_errors: bool = False) -> Data:
|
||||
resolved_path = self.resolve_path(path)
|
||||
path_obj = Path(resolved_path)
|
||||
extension = path_obj.suffix[1:].lower()
|
||||
if extension == "doc":
|
||||
raise ValueError("doc files are not supported. Please save as .docx")
|
||||
if extension not in TEXT_FILE_TYPES:
|
||||
raise ValueError(f"Unsupported file type: {extension}")
|
||||
record = parse_text_file_to_data(resolved_path, silent_errors)
|
||||
self.status = record if record else "No data"
|
||||
return record or Data()
|
||||
|
||||
def build(
|
||||
self,
|
||||
path: str,
|
||||
silent_errors: bool = False,
|
||||
) -> Data:
|
||||
record = self.load_file(path, silent_errors)
|
||||
self.status = record
|
||||
return record
|
||||
data = parse_text_file_to_data(resolved_path, silent_errors)
|
||||
self.status = data if data else "No data"
|
||||
return data or Data()
|
||||
|
|
|
|||
|
|
@ -1,27 +1,33 @@
|
|||
from typing import Any, Dict
|
||||
|
||||
from langflow.custom import Component
|
||||
from langflow.inputs import StrInput
|
||||
from langflow.template import Output
|
||||
from langflow.schema import Data
|
||||
from langchain_community.document_loaders.web_base import WebBaseLoader
|
||||
|
||||
from langflow.custom import CustomComponent
|
||||
from langflow.schema import Data
|
||||
|
||||
|
||||
class URLComponent(CustomComponent):
|
||||
class URLComponent(Component):
|
||||
display_name = "URL"
|
||||
description = "Fetch content from one or more URLs."
|
||||
icon = "layout-template"
|
||||
|
||||
def build_config(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"urls": {"display_name": "URL"},
|
||||
}
|
||||
inputs = [
|
||||
StrInput(
|
||||
name="urls",
|
||||
display_name="URLs",
|
||||
info="Enter one or more URLs, separated by commas.",
|
||||
value="",
|
||||
is_list=True,
|
||||
),
|
||||
]
|
||||
|
||||
def build(
|
||||
self,
|
||||
urls: list[str],
|
||||
) -> list[Data]:
|
||||
loader = WebBaseLoader(web_paths=[url for url in urls if url])
|
||||
outputs = [
|
||||
Output(display_name="Data", name="data", method="fetch_content"),
|
||||
]
|
||||
|
||||
def fetch_content(self) -> Data:
|
||||
urls = [url.strip() for url in self.urls if url.strip()]
|
||||
loader = WebBaseLoader(web_paths=urls)
|
||||
docs = loader.load()
|
||||
data = self.to_data(docs)
|
||||
data = [Data(content=doc.page_content) for doc in docs]
|
||||
self.status = data
|
||||
return data
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue