🔧 fix(store.py): import UsersLikesResponse in store.py to resolve import error

 feat(store.py): add new endpoint to get list of components liked by a user
🔧 fix(schema.py): add UsersLikesResponse schema to define the response structure for the new endpoint
🔧 fix(service.py): add get_user_likes method to retrieve the list of components liked by a user from the store API
This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-10-26 11:46:42 -03:00
commit 6615bf3b03
3 changed files with 23 additions and 2 deletions

View file

@ -12,6 +12,7 @@ from langflow.services.store.schema import (
ListComponentResponse,
StoreComponentCreate,
TagResponse,
UsersLikesResponse,
)
from fastapi import APIRouter, Depends, HTTPException, Query
@ -173,5 +174,12 @@ def get_tags(
raise HTTPException(status_code=500, detail=str(exc))
# urlencodedstr = https://api.langflow.store/items/components?page=0&limit=10000&fields=id%2Cname%2Cdescription%2Cuser_created.name%2Cis_component%2Ctags.tags_id.name%2Ctags.tags_id.id%2Ccount%28liked_by%29&filter=%7B%22status%22%3A%20%7B%22_in%22%3A%20%5B%22public%22%2C%20%22Public%22%5D%7D%7D'
# normalstr = https://api.langflow.store/items/components?page=0&limit=10000&fields=id,name,description,user_created.name,is_component,tags.tags_id.name,tags.tags_id.id,count(liked_by)&filter={"status": {"_in": ["public", "Public"]}}'
@router.get("/users/likes", response_model=List[UsersLikesResponse])
def get_list_of_components_liked_by_user(
store_service: StoreService = Depends(get_store_service),
store_api_Key: str = Depends(get_user_store_api_key),
):
try:
return store_service.get_user_likes(store_api_Key)
except Exception as exc:
raise HTTPException(status_code=500, detail=str(exc))

View file

@ -9,6 +9,11 @@ class TagResponse(BaseModel):
name: Optional[str]
class UsersLikesResponse(BaseModel):
id: UUID
likes: Optional[List[UUID]]
class ComponentResponse(BaseModel):
id: UUID
status: Optional[str]

View file

@ -221,3 +221,11 @@ class StoreService(Service):
params = {"fields": ",".join(["id", "name"])}
tags = self._get(url, api_key, params)
return tags
def get_user_likes(self, api_key: str) -> List[Dict[str, Any]]:
url = f"{self.base_url}/users/me"
params = {
"fields": ",".join(["id", "likes"]),
}
likes = self._get(url, api_key, params)
return likes