🔧 chore(service.py): remove unused code and add logging for webhook failures in StoreService class

This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-11-14 14:53:39 -03:00
commit 887badcf72

View file

@ -1,10 +1,11 @@
import json
from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from uuid import UUID
import httpx
from httpx import HTTPError, HTTPStatusError
from loguru import logger
from langflow.services.base import Service
from langflow.services.store.schema import (
ComponentResponse,
@ -99,72 +100,8 @@ class StoreService(Service):
return response.json()
except HTTPError as exc:
raise exc
def search(
self,
api_key: Optional[str],
query: str,
page: int = 1,
limit: int = 10,
status: Optional[str] = None,
is_component: Optional[bool] = None,
tags: Optional[List[str]] = None,
date_from: Optional[datetime] = None,
date_to: Optional[datetime] = None,
sort: Optional[List[str]] = ["-count(liked_by)"],
fields: Optional[List[str]] = None,
filter_by_user: bool = False,
) -> List[ComponentResponse]:
# ?sort=sort,-date_created,author.name
# // or
# ?sort[]=sort
# &sort[]=-date_created
# &sort[]=-author.name
params = {
"search": query,
"page": page,
"limit": limit,
}
filter_conditions: List[Dict[str, Any]] = []
if status:
filter_conditions.append({"status": {"_eq": status}})
if is_component is not None:
# params["filter[is_component][_eq]"] = is_component
filter_conditions.append({"is_component": {"_eq": is_component}})
if tags:
tags_filter = self.build_tags_filter(tags)
filter_conditions.append(tags_filter)
if date_from:
# params["filter[date_updated][_gte]"] = date_from.isoformat()
filter_conditions.append({"date_updated": {"_gte": date_from.isoformat()}})
if date_to:
# params["filter[date_updated][_lte]"] = date_to.isoformat()
filter_conditions.append({"date_updated": {"_lte": date_to.isoformat()}})
if sort:
params["sort"] = ",".join(sort)
if fields:
params["fields"] = ",".join(fields)
if filter_by_user:
params["deep"] = json.dumps({"components": {"_filter": {"user_created": {"token": {"_eq": api_key}}}}})
else:
# params["filter"] = json.dumps({"status": {"_eq": "public"}})
filter_conditions.append({"status": {"_in": ["public", "Public"]}})
if filter_conditions:
params["filter"] = json.dumps({"_and": filter_conditions})
results = self._get(self.components_url, api_key, params)
return [ComponentResponse(**component) for component in results]
except Exception as exc:
logger.debug(f"Webhook failed: {exc}")
def build_tags_filter(self, tags: List[str]):
tags_filter = {"tags": {"_and": []}}