From 1bf668a9416577498f789e46cd627a51d17784a3 Mon Sep 17 00:00:00 2001 From: ming luo Date: Fri, 21 Jun 2024 00:53:49 -0400 Subject: [PATCH] kubernetes secret manager --- pyproject.toml | 1 + .../services/variable/kubernetes_secrets.py | 164 ++++++++++++++++++ .../langflow/services/variable/service.py | 113 ++++++++++-- tests/unit/test_kubernetes_secrets.py | 50 ++++++ 4 files changed, 312 insertions(+), 16 deletions(-) create mode 100644 src/backend/base/langflow/services/variable/kubernetes_secrets.py create mode 100644 tests/unit/test_kubernetes_secrets.py diff --git a/pyproject.toml b/pyproject.toml index 704ec2038..931ae3764 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -91,6 +91,7 @@ cassio = { extras = ["cassio"], version = "^0.1.7", optional = true } unstructured = {extras = ["docx", "md", "pptx"], version = "^0.14.4"} langchain-aws = "^0.1.6" langchain-mongodb = "^0.1.6" +kubernetes = "^30.1.0" [tool.poetry.group.dev.dependencies] diff --git a/src/backend/base/langflow/services/variable/kubernetes_secrets.py b/src/backend/base/langflow/services/variable/kubernetes_secrets.py new file mode 100644 index 000000000..b449cc0c0 --- /dev/null +++ b/src/backend/base/langflow/services/variable/kubernetes_secrets.py @@ -0,0 +1,164 @@ +from kubernetes import client, config # type: ignore +from kubernetes.client.rest import ApiException +from base64 import b64encode, b64decode + +from loguru import logger + +class KubernetesSecretManager: + """ + A class for managing Kubernetes secrets. + """ + + def __init__(self, namespace: str = "langflow"): + """ + Initialize the KubernetesSecretManager class. + + Args: + namespace (str): The namespace in which to perform secret operations. + """ + config.load_kube_config() + self.namespace = namespace + + # initialize the Kubernetes API client + self.core_api = client.CoreV1Api() + + def create_secret(self, name: str, data: dict, secret_type: str = "Opaque"): + """ + Create a new secret in the specified namespace. + + Args: + name (str): The name of the secret to create. + data (dict): A dictionary containing the key-value pairs for the secret data. + secret_type (str, optional): The type of secret to create. Defaults to 'Opaque'. + + Returns: + V1Secret: The created secret object. + """ + encoded_data = {k: b64encode(v.encode()).decode() for k, v in data.items()} + + secret_metadata = client.V1ObjectMeta(name=name) + secret = client.V1Secret( + api_version="v1", + kind="Secret", + metadata=secret_metadata, + type=secret_type, + data=encoded_data + ) + + return self.core_api.create_namespaced_secret(self.namespace, secret) + + def upsert_secret(self, secret_name: str, data: dict, secret_type: str = "Opaque"): + """ + Upsert a secret in the specified namespace. + If the secret doesn't exist, it will be created. + If it exists, it will be updated with new data while preserving existing keys. + + :param secret_name: Name of the secret + :param new_data: Dictionary containing new key-value pairs for the secret + :return: Created or updated secret object + """ + try: + # Try to read the existing secret + existing_secret = self.core_api.read_namespaced_secret(secret_name, self.namespace) + + # If secret exists, update it + existing_data = {k: b64decode(v).decode() for k, v in existing_secret.data.items()} + existing_data.update(data) + + # Encode all data to base64 + encoded_data = {k: b64encode(v.encode()).decode() for k, v in existing_data.items()} + + # Update the existing secret + existing_secret.data = encoded_data + return self.core_api.replace_namespaced_secret(secret_name, self.namespace, existing_secret) + + except ApiException as e: + if e.status == 404: + # Secret doesn't exist, create a new one + return self.create_secret(secret_name, data) + else: + logger.error(f"Error upserting secret {secret_name}: {e}") + raise + + def get_secret(self, name: str) -> dict | None: + """ + Read a secret from the specified namespace. + + Args: + name (str): The name of the secret to read. + + Returns: + V1Secret: The secret object. + """ + try: + secret = self.core_api.read_namespaced_secret(name, self.namespace) + return {k: b64decode(v).decode() for k, v in secret.data.items()} + except ApiException as e: + if e.status == 404: + return None + raise + + def update_secret(self, name: str, data: dict): + """ + Update an existing secret in the specified namespace. + + Args: + name (str): The name of the secret to update. + data (dict): A dictionary containing the key-value pairs for the updated secret data. + + Returns: + V1Secret: The updated secret object. + """ + # Get the existing secret + secret = self.core_api.read_namespaced_secret(name, self.namespace) + if secret is None: + raise ApiException(status=404, reason="Not Found", msg="Secret not found") + + # Update the secret data + encoded_data = {k: b64encode(v.encode()).decode() for k, v in data.items()} + secret.data.update(encoded_data) + + # Update the secret in Kubernetes + return self.core_api.replace_namespaced_secret(name, self.namespace, secret) + + def delete_secret_key(self, name: str, key: str): + """ + Delete a key from the specified secret in the namespace. + + Args: + name (str): The name of the secret. + key (str): The key to delete from the secret. + + Returns: + V1Secret: The updated secret object. + """ + # Get the existing secret + secret = self.core_api.read_namespaced_secret(name, self.namespace) + if secret is None: + raise ApiException(status=404, reason="Not Found", msg="Secret not found") + + # Delete the key from the secret data + if key in secret.data: + del secret.data[key] + else: + raise ApiException(status=404, reason="Not Found", msg="Key not found in the secret") + + # Update the secret in Kubernetes + return self.core_api.replace_namespaced_secret(name, self.namespace, secret) + + def delete_secret(self, name: str): + """ + Delete a secret from the specified namespace. + + Args: + name (str): The name of the secret to delete. + + Returns: + V1Status: The status object indicating the success or failure of the operation. + """ + return self.core_api.delete_namespaced_secret(name, self.namespace) + +# utility function to encode user_id to base64 lower case and numbers only +# this is required by kubernetes secret name restrictions +def encode_user_id(user_id: str) -> str: + return b64encode(user_id.encode()).decode().lower().replace("=", "").replace("+", "-").replace("/", "_") diff --git a/src/backend/base/langflow/services/variable/service.py b/src/backend/base/langflow/services/variable/service.py index 1f90ae7f3..ae7fc0f2b 100644 --- a/src/backend/base/langflow/services/variable/service.py +++ b/src/backend/base/langflow/services/variable/service.py @@ -1,5 +1,5 @@ import os -from typing import TYPE_CHECKING, Optional, Union +from typing import TYPE_CHECKING, Optional, Tuple, Union from uuid import UUID from fastapi import Depends @@ -8,13 +8,17 @@ from sqlmodel import Session, select from langflow.services.auth import utils as auth_utils from langflow.services.base import Service -from langflow.services.variable.base import VariableService from langflow.services.database.models.variable.model import Variable, VariableCreate from langflow.services.deps import get_session +from langflow.services.variable.base import VariableService +from langflow.services.variable.kubernetes_secrets import KubernetesSecretManager, encode_user_id if TYPE_CHECKING: from langflow.services.settings.service import SettingsService +CREDENTIAL_TYPE = "Credential" +GENERIC_TYPE = "Generic" + class DatabaseVariableService(VariableService, Service): def __init__(self, settings_service: "SettingsService"): @@ -40,7 +44,7 @@ class DatabaseVariableService(VariableService, Service): name=var, value=value, default_fields=[], - _type="Credential", + _type=CREDENTIAL_TYPE, session=session, ) except Exception as e: @@ -60,7 +64,7 @@ class DatabaseVariableService(VariableService, Service): # credential = session.query(Variable).filter(Variable.user_id == user_id, Variable.name == name).first() variable = session.exec(select(Variable).where(Variable.user_id == user_id, Variable.name == name)).first() - if variable and variable.type == "Credential" and field == "session_id": + if variable.type == CREDENTIAL_TYPE and field == "session_id": # type: ignore raise TypeError( f"variable {name} of type 'Credential' cannot be used in a Session ID field " "because its purpose is to prevent the exposure of values." @@ -112,7 +116,7 @@ class DatabaseVariableService(VariableService, Service): name: str, value: str, default_fields: list[str] = [], - _type: str = "Generic", + _type: str = GENERIC_TYPE, session: Session = Depends(get_session), ): variable_base = VariableCreate( @@ -131,37 +135,109 @@ class DatabaseVariableService(VariableService, Service): class KubernetesSecretService(VariableService, Service): def __init__(self, settings_service: "SettingsService"): self.settings_service = settings_service + # TODO: settings_service to set kubernetes namespace + self.kubernetes_secrets = KubernetesSecretManager() - def initialize_user_variables(self, user_id: Union[UUID, str], session: Session = Depends(get_session)): - return + def initialize_user_variables(self, user_id: Union[UUID, str], session: Session): + # Check for environment variables that should be stored in the database + should_or_should_not = "Should" if self.settings_service.settings.store_environment_variables else "Should not" + logger.info(f"{should_or_should_not} store environment variables in the kubernetes.") + if self.settings_service.settings.store_environment_variables: + variables = {} + for var in self.settings_service.settings.variables_to_get_from_environment: + if var in os.environ: + logger.debug(f"Creating {var} variable from environment.") + value = os.environ[var] + if isinstance(value, str): + value = value.strip() + key = CREDENTIAL_TYPE + "_" + var + variables[key] = str(value) + + try: + secret_name = user_id + self.kubernetes_secrets.create_secret( + name=secret_name, + data=variables, + ) + except Exception as e: + logger.error(f"Error creating {var} variable: {e}") + + else: + logger.info("Skipping environment variable storage.") + + # resolve_variable is a helper function that resolves the variable name to the actual key in the secret + def resolve_variable( + self, + secret_name: str, + user_id: Union[UUID, str], + name: str, + ) -> Tuple[str, str]: + variables = self.kubernetes_secrets.get_secret(name=secret_name) + if not variables: + raise ValueError(f"user_id {user_id} variable not found.") + + if name in variables: + return name, variables[name] + else: + credential_name = CREDENTIAL_TYPE + "_" + name + if credential_name in variables: + return credential_name, variables[credential_name] + else: + raise ValueError(f"user_id {user_id} variable name {name} not found.") def get_variable( self, user_id: Union[UUID, str], name: str, field: str, - session: Session = Depends(get_session), + _session: Session = None, ) -> str: - return "" + secret_name = encode_user_id(user_id) + key, value = self.resolve_variable(secret_name, user_id, name) + if key.startswith(CREDENTIAL_TYPE + "_") and field == "session_id": # type: ignore + raise TypeError( + f"variable {name} of type 'Credential' cannot be used in a Session ID field " + "because its purpose is to prevent the exposure of values." + ) + return value - def list_variables(self, user_id: Union[UUID, str], session: Session = Depends(get_session)) -> list[Optional[str]]: - return [] + def list_variables( + self, + user_id: Union[UUID, str], + _session: Session = None, + ) -> list[Optional[str]]: + variables = self.kubernetes_secrets.get_secret(name=encode_user_id(user_id)) + if not variables: + return [] + + names = [] + for key in variables.keys(): + if key.startswith(CREDENTIAL_TYPE + "_"): + names.append(key[len(CREDENTIAL_TYPE) + 1 :]) + else: + names.append(key) + return names def update_variable( self, user_id: Union[UUID, str], name: str, value: str, - session: Session = Depends(get_session), + _session: Session = None, ): - return + secret_name = encode_user_id(user_id) + secret_key, _ = self.resolve_variable(secret_name, user_id, name) + return self.kubernetes_secrets.update_secret_key(name=secret_name, data={secret_key: value}) def delete_variable( self, user_id: Union[UUID, str], name: str, - session: Session = Depends(get_session), + _session: Session = None, ): + secret_name = encode_user_id(user_id) + secret_key, _ = self.resolve_variable(secret_name, user_id, name) + self.kubernetes_secrets.delete_secret_key(name=secret_name, key=secret_key) return def create_variable( @@ -171,6 +247,11 @@ class KubernetesSecretService(VariableService, Service): value: str, default_fields: list[str] = [], _type: str = "Generic", - session: Session = Depends(get_session), + _session: Session = None, ): - return + secret_name = encode_user_id(user_id) + secret_key = name + if _type == CREDENTIAL_TYPE: + secret_key = CREDENTIAL_TYPE + "_" + name + + return self.kubernetes_secrets.upsert_secret(name=secret_name, data={secret_key: value}) diff --git a/tests/unit/test_kubernetes_secrets.py b/tests/unit/test_kubernetes_secrets.py new file mode 100644 index 000000000..a80b5c88d --- /dev/null +++ b/tests/unit/test_kubernetes_secrets.py @@ -0,0 +1,50 @@ +import pytest +from unittest.mock import MagicMock, patch +from kubernetes.client.rest import ApiException +from kubernetes.client import V1ObjectMeta, V1Secret +from base64 import b64encode + +from langflow.services.variable.kubernetes_secrets import KubernetesSecretManager + +@pytest.fixture +def secret_manager(): + return KubernetesSecretManager(namespace='test-namespace') + +def test_create_secret(secret_manager, mocker): + mocker.patch.object(secret_manager.core_api, 'create_namespaced_secret', return_value=V1Secret(metadata=V1ObjectMeta(name='test-secret'))) + + secret_manager.create_secret(name='test-secret', data={'key': 'value'}) + secret_manager.core_api.create_namespaced_secret.assert_called_once_with( + 'test-namespace', + V1Secret( + api_version='v1', + kind='Secret', + metadata=V1ObjectMeta(name='test-secret'), + type='Opaque', + data={'key': b64encode('value'.encode()).decode()} + ) + ) + +def test_get_secret(secret_manager, mocker): + mock_secret = V1Secret(data={'key': b64encode('value'.encode()).decode()}) + mocker.patch.object(secret_manager.core_api, 'read_namespaced_secret', return_value=mock_secret) + + secret_data = secret_manager.get_secret(name='test-secret') + secret_manager.core_api.read_namespaced_secret.assert_called_once_with('test-secret', 'test-namespace') + assert secret_data == {'key': 'value'} + +def test_update_secret(secret_manager, mocker): + mocker.patch.object(secret_manager.core_api, 'replace_namespaced_secret', return_value=V1Secret(metadata=V1ObjectMeta(name='test-secret'))) + + secret_manager.update_secret(name='test-secret', data={'key': 'new-value'}) + secret_manager.core_api.replace_namespaced_secret.assert_called_once_with( + 'test-secret', + 'test-namespace', + V1Secret(metadata=V1ObjectMeta(name='test-secret'), data={'key': 'new-value'}) + ) + +def test_delete_secret(secret_manager, mocker): + mocker.patch.object(secret_manager.core_api, 'delete_namespaced_secret', return_value=MagicMock(status='Success')) + + secret_manager.delete_secret(name='test-secret') + secret_manager.core_api.delete_namespaced_secret.assert_called_once_with('test-secret', 'test-namespace')