🔧 fix(base.py): add DOWNLOAD_WEBHOOK_URL setting to support webhook integration for component downloads

🔧 fix(service.py): call webhook after downloading a component to trigger webhook integration
This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-10-24 16:10:33 -03:00
commit 6d6b2ed826
2 changed files with 21 additions and 2 deletions

View file

@ -53,6 +53,7 @@ class Settings(BaseSettings):
LANGFUSE_HOST: Optional[str] = None
STORE_URL: Optional[str] = None
DOWNLOAD_WEBHOOK_URL: Optional[str] = None
@validator("CONFIG_DIR", pre=True, allow_reuse=True)
def set_langflow_dir(cls, value):

View file

@ -1,7 +1,9 @@
from datetime import datetime
from uuid import UUID
from langflow.services.base import Service
from typing import TYPE_CHECKING, List, Dict, Any, Optional
import httpx
from httpx import HTTPError
from langflow.services.store.schema import (
ComponentResponse,
@ -24,6 +26,7 @@ class StoreService(Service):
def __init__(self, settings_service: "SettingsService"):
self.settings_service = settings_service
self.base_url = self.settings_service.settings.STORE_URL
self.webhook_url = self.settings_service.settings.DOWNLOAD_WEBHOOK_URL
self.components_url = f"{self.base_url}/items/components"
def _get(
@ -43,6 +46,19 @@ class StoreService(Service):
except Exception as exc:
raise ValueError(f"GET failed: {exc}")
def call_webhook(self, api_key: str, webhook_url: str, component_id: UUID) -> None:
# The webhook is a POST request with the data in the body
# For now we are calling it just for testing
try:
headers = {"Authorization": f"Bearer {api_key}"}
response = httpx.post(
webhook_url, headers=headers, json={"component_id": str(component_id)}
)
response.raise_for_status()
return response.json()
except HTTPError as exc:
raise exc
def search(
self,
api_key: Optional[str],
@ -108,12 +124,14 @@ class StoreService(Service):
results = self._get(self.components_url, api_key, params)
return [ListComponentResponse(**component) for component in results]
def download(self, api_key: str, id: str) -> DownloadComponentResponse:
url = f"{self.components_url}/{id}"
def download(self, api_key: str, component_id: str) -> DownloadComponentResponse:
url = f"{self.components_url}/{component_id}"
params = {
"fields": ",".join(["id", "name", "description", "data", "is_component"])
}
component = self._get(url, api_key, params)
self.call_webhook(api_key, self.webhook_url, component_id)
return DownloadComponentResponse(**component)
def upload(