diff --git a/src/backend/langflow/api/v1/store.py b/src/backend/langflow/api/v1/store.py index 5133b11f6..3f3bf825b 100644 --- a/src/backend/langflow/api/v1/store.py +++ b/src/backend/langflow/api/v1/store.py @@ -73,7 +73,7 @@ def create_component( @router.get("/components/", response_model=ListComponentResponseModel) -def get_components( +async def get_components( search: Annotated[Optional[str], Query()] = None, status: Annotated[Optional[str], Query()] = None, is_component: Annotated[Optional[bool], Query()] = None, @@ -98,7 +98,7 @@ def get_components( result: List[ListComponentResponse] = [] authorized = False try: - result = store_service.query_components( + result = await store_service.query_components( api_key=store_api_Key, page=page, limit=limit, sort=sort, filter_conditions=filter_conditions ) except HTTPStatusError as exc: @@ -107,7 +107,7 @@ def get_components( try: if result: if len(result) >= limit: - comp_count = store_service.count_components( + comp_count = await store_service.count_components( api_key=store_api_Key, filter_conditions=filter_conditions, ) @@ -123,7 +123,9 @@ def get_components( # Now, from the result, we need to get the components # the user likes and set the liked_by_user to True try: - updated_result = update_components_with_user_data(result, store_service, store_api_Key, liked=liked) + updated_result = await update_components_with_user_data( + result, store_service, store_api_Key, liked=liked + ) authorized = True result = updated_result except Exception: @@ -141,7 +143,7 @@ def get_components( @router.get("/components/{component_id}", response_model=DownloadComponentResponse) -def read_component( +async def read_component( component_id: UUID, store_service: StoreService = Depends(get_store_service), store_api_Key: str = Depends(get_user_store_api_key), @@ -149,7 +151,7 @@ def read_component( # If the component is from the store, we need to get it from the store try: - component = store_service.download(store_api_Key, component_id) + component = await store_service.download(store_api_Key, component_id) except Exception as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc @@ -160,36 +162,36 @@ def read_component( @router.get("/tags", response_model=List[TagResponse]) -def get_tags( +async def get_tags( store_service: StoreService = Depends(get_store_service), store_api_Key: str = Depends(get_optional_user_store_api_key), ): try: - return store_service.get_tags(store_api_Key) + return await store_service.get_tags(store_api_Key) except Exception as exc: raise HTTPException(status_code=500, detail=str(exc)) @router.get("/users/likes", response_model=List[UsersLikesResponse]) -def get_list_of_components_liked_by_user( +async def get_list_of_components_liked_by_user( store_service: StoreService = Depends(get_store_service), store_api_Key: str = Depends(get_user_store_api_key), ): try: - return store_service.get_user_likes(store_api_Key) + return await store_service.get_user_likes(store_api_Key) except Exception as exc: raise HTTPException(status_code=500, detail=str(exc)) @router.post("/users/likes/{component_id}", response_model=UsersLikesResponse) -def like_component( +async def like_component( component_id: UUID, store_service: StoreService = Depends(get_store_service), store_api_Key: str = Depends(get_user_store_api_key), ): try: - result = store_service.like_component(store_api_Key, component_id) - likes_count = store_service.get_component_likes_count(store_api_Key, component_id) + result = await store_service.like_component(store_api_Key, component_id) + likes_count = await store_service.get_component_likes_count(store_api_Key, component_id) return UsersLikesResponse(likes_count=likes_count, liked_by_user=result) except Exception as exc: diff --git a/src/backend/langflow/services/store/service.py b/src/backend/langflow/services/store/service.py index 003c5ac2c..6833d84ed 100644 --- a/src/backend/langflow/services/store/service.py +++ b/src/backend/langflow/services/store/service.py @@ -1,5 +1,5 @@ import json -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional from uuid import UUID import httpx @@ -72,7 +72,7 @@ class StoreService(Service): # will make a property return that data # Without making the request multiple times - def _get( + async def _get( self, url: str, api_key: Optional[str] = None, params: Optional[Dict[str, Any]] = None ) -> List[Dict[str, Any]]: """Utility method to perform GET requests.""" @@ -80,22 +80,24 @@ class StoreService(Service): headers = {"Authorization": f"Bearer {api_key}"} else: headers = {} - try: - response = httpx.get(url, headers=headers, params=params) - response.raise_for_status() - return response.json()["data"] - except HTTPError as exc: - raise exc - except Exception as exc: - raise ValueError(f"GET failed: {exc}") + async with httpx.AsyncClient() as client: + try: + response = await client.get(url, headers=headers, params=params) + response.raise_for_status() + except HTTPError as exc: + raise exc + except Exception as exc: + raise ValueError(f"GET failed: {exc}") + return response.json()["data"] - def call_webhook(self, api_key: str, webhook_url: str, component_id: UUID) -> None: + async def call_webhook(self, api_key: str, webhook_url: str, component_id: UUID) -> None: # The webhook is a POST request with the data in the body # For now we are calling it just for testing try: headers = {"Authorization": f"Bearer {api_key}"} - response = httpx.post(webhook_url, headers=headers, json={"component_id": str(component_id)}) - response.raise_for_status() + async with httpx.AsyncClient() as client: + response = await client.post(webhook_url, headers=headers, json={"component_id": str(component_id)}) + response.raise_for_status() return response.json() except HTTPError as exc: raise exc @@ -108,7 +110,7 @@ class StoreService(Service): tags_filter["tags"]["_and"].append({"_some": {"tags_id": {"name": {"_eq": tag}}}}) return tags_filter - def count_components( + async def count_components( self, filter_conditions: List[Dict[str, Any]], api_key: Optional[str] = None, @@ -117,7 +119,7 @@ class StoreService(Service): if filter_conditions: params["filter"] = json.dumps({"_and": filter_conditions}) - results = self._get(self.components_url, api_key, params) + results = await self._get(self.components_url, api_key, params) return int(results[0].get("count", 0)) @staticmethod @@ -173,33 +175,32 @@ class StoreService(Service): else: return {"status": {"_in": ["public", "Public"]}} - def query_components( + async def query_components( self, api_key: Optional[str] = None, sort: Optional[List[str]] = None, page: int = 1, limit: int = 15, fields: Optional[List[str]] = None, - is_component: Optional[bool] = None, filter_conditions: Optional[List[Dict[str, Any]]] = None, - ) -> Tuple[List[ListComponentResponse], List[Dict[str, Any]]]: - params = {"page": page, "limit": limit} + ) -> List[ListComponentResponse]: + params: Dict[str, Any] = { + "page": page, + "limit": limit, + "fields": ",".join(fields) if fields else ",".join(self.default_fields), + } # ?aggregate[count]=likes - params["fields"] = ",".join(fields) if fields else ",".join(self.default_fields) if sort: params["sort"] = ",".join(sort) - if is_component is not None: - filter_conditions.append({"is_component": {"_eq": is_component}}) - # Only public components or the ones created by the user # check for "public" or "Public" if filter_conditions: params["filter"] = json.dumps({"_and": filter_conditions}) - results = self._get(self.components_url, api_key, params) + results = await self._get(self.components_url, api_key, params) results_objects = [ListComponentResponse(**component) for component in results] # Flatten the tags # for component in results_objects: @@ -207,7 +208,7 @@ class StoreService(Service): # component.tags = [tags_id.tags_id for tags_id in component.tags] return results_objects - def get_liked_by_user_components(self, component_ids: List[UUID], api_key: str) -> List[UUID]: + async def get_liked_by_user_components(self, component_ids: List[UUID], api_key: str) -> List[UUID]: # Get fields id # filter should be "id is in component_ids AND liked_by directus_users_id token is api_key" # return the ids @@ -225,11 +226,11 @@ class StoreService(Service): } ), } - results = self._get(self.components_url, api_key, params) + results = await self._get(self.components_url, api_key, params) return [result["id"] for result in results] # Which of the components is parent of the user's components - def get_components_in_users_collection(self, component_ids: List[str], api_key: str): + async def get_components_in_users_collection(self, component_ids: List[str], api_key: str): user_data = user_data_var.get() if not user_data: raise ValueError("No user data") @@ -244,19 +245,19 @@ class StoreService(Service): } ), } - results = self._get(self.components_url, api_key, params) + results = await self._get(self.components_url, api_key, params) return [result["id"] for result in results] - def download(self, api_key: str, component_id: UUID) -> DownloadComponentResponse: + async def download(self, api_key: str, component_id: UUID) -> DownloadComponentResponse: url = f"{self.components_url}/{component_id}" params = {"fields": ",".join(["id", "name", "description", "data", "is_component"])} - component = self._get(url, api_key, params) - self.call_webhook(api_key, self.download_webhook_url, component_id) + component = await self._get(url, api_key, params) + await self.call_webhook(api_key, self.download_webhook_url, component_id) return DownloadComponentResponse(**component) - def upload(self, api_key: str, component_data: StoreComponentCreate) -> ComponentResponse: + async def upload(self, api_key: str, component_data: StoreComponentCreate) -> ComponentResponse: headers = {"Authorization": f"Bearer {api_key}"} component_dict = component_data.dict(exclude_unset=True) # Parent is a UUID, but the store expects a string @@ -266,8 +267,11 @@ class StoreService(Service): component_dict = process_tags_for_post(component_dict) try: - response = httpx.post(self.components_url, headers=headers, json=component_dict) - response.raise_for_status() + # response = httpx.post(self.components_url, headers=headers, json=component_dict) + # response.raise_for_status() + async with httpx.AsyncClient() as client: + response = await client.post(self.components_url, headers=headers, json=component_dict) + response.raise_for_status() component = response.json()["data"] return ComponentResponse(**component) except HTTPError as exc: @@ -280,27 +284,27 @@ class StoreService(Service): pass raise ValueError(f"Upload failed: {exc}") - def get_tags(self, api_key: str) -> List[Dict[str, Any]]: + async def get_tags(self, api_key: str) -> List[Dict[str, Any]]: url = f"{self.base_url}/items/tags" params = {"fields": ",".join(["id", "name"])} - tags = self._get(url, api_key, params) + tags = await self._get(url, api_key, params) return tags - def get_user_likes(self, api_key: str) -> List[Dict[str, Any]]: + async def get_user_likes(self, api_key: str) -> List[Dict[str, Any]]: url = f"{self.base_url}/users/me" params = { "fields": ",".join(["id", "likes"]), } - likes = self._get(url, api_key, params) + likes = await self._get(url, api_key, params) return likes - def get_component_likes_count(self, api_key: str, component_id: str) -> int: + async def get_component_likes_count(self, api_key: str, component_id: str) -> int: url = f"{self.components_url}/{component_id}" params = { "fields": ",".join(["id", "count(liked_by)"]), } - result = self._get(url, api_key, params) + result = await self._get(url, api_key, params) if len(result) == 0: raise ValueError("Component not found") likes = result["liked_by_count"] @@ -312,17 +316,24 @@ class StoreService(Service): raise ValueError(f"Unexpected value for likes count: {likes}") return likes - def like_component(self, api_key: str, component_id: str) -> bool: + async def like_component(self, api_key: str, component_id: str) -> bool: # if it returns a list with one id, it means the like was successful # if it returns an int, it means the like was removed headers = {"Authorization": f"Bearer {api_key}"} - response = httpx.post( - self.like_webhook_url, - json={"component_id": str(component_id)}, - headers=headers, - ) + # response = httpx.post( + # self.like_webhook_url, + # json={"component_id": str(component_id)}, + # headers=headers, + # ) - response.raise_for_status() + # response.raise_for_status() + async with httpx.AsyncClient() as client: + response = await client.post( + self.like_webhook_url, + json={"component_id": str(component_id)}, + headers=headers, + ) + response.raise_for_status() if response.status_code == 200: result = response.json() @@ -332,3 +343,5 @@ class StoreService(Service): return False else: raise ValueError(f"Unexpected result: {result}") + else: + raise ValueError(f"Unexpected status code: {response.status_code}") diff --git a/src/backend/langflow/services/store/utils.py b/src/backend/langflow/services/store/utils.py index ec0fbc8a6..309516f1c 100644 --- a/src/backend/langflow/services/store/utils.py +++ b/src/backend/langflow/services/store/utils.py @@ -12,7 +12,7 @@ def process_tags_for_post(component_dict): return component_dict -def update_components_with_user_data( +async def update_components_with_user_data( components: List["ListComponentResponse"], store_service: "StoreService", store_api_Key: str, @@ -27,7 +27,7 @@ def update_components_with_user_data( # So we can set liked_by_user to True for all components liked_by_user_ids = component_ids else: - liked_by_user_ids = store_service.get_liked_by_user_components( + liked_by_user_ids = await store_service.get_liked_by_user_components( component_ids=component_ids, api_key=store_api_Key, )