kubernetes secret manager

This commit is contained in:
ming luo 2024-06-21 00:53:49 -04:00 committed by Gabriel Luiz Freitas Almeida
commit 1bf668a941
4 changed files with 312 additions and 16 deletions

View file

@ -91,6 +91,7 @@ cassio = { extras = ["cassio"], version = "^0.1.7", optional = true }
unstructured = {extras = ["docx", "md", "pptx"], version = "^0.14.4"}
langchain-aws = "^0.1.6"
langchain-mongodb = "^0.1.6"
kubernetes = "^30.1.0"
[tool.poetry.group.dev.dependencies]

View file

@ -0,0 +1,164 @@
from kubernetes import client, config # type: ignore
from kubernetes.client.rest import ApiException
from base64 import b64encode, b64decode
from loguru import logger
class KubernetesSecretManager:
"""
A class for managing Kubernetes secrets.
"""
def __init__(self, namespace: str = "langflow"):
"""
Initialize the KubernetesSecretManager class.
Args:
namespace (str): The namespace in which to perform secret operations.
"""
config.load_kube_config()
self.namespace = namespace
# initialize the Kubernetes API client
self.core_api = client.CoreV1Api()
def create_secret(self, name: str, data: dict, secret_type: str = "Opaque"):
"""
Create a new secret in the specified namespace.
Args:
name (str): The name of the secret to create.
data (dict): A dictionary containing the key-value pairs for the secret data.
secret_type (str, optional): The type of secret to create. Defaults to 'Opaque'.
Returns:
V1Secret: The created secret object.
"""
encoded_data = {k: b64encode(v.encode()).decode() for k, v in data.items()}
secret_metadata = client.V1ObjectMeta(name=name)
secret = client.V1Secret(
api_version="v1",
kind="Secret",
metadata=secret_metadata,
type=secret_type,
data=encoded_data
)
return self.core_api.create_namespaced_secret(self.namespace, secret)
def upsert_secret(self, secret_name: str, data: dict, secret_type: str = "Opaque"):
"""
Upsert a secret in the specified namespace.
If the secret doesn't exist, it will be created.
If it exists, it will be updated with new data while preserving existing keys.
:param secret_name: Name of the secret
:param new_data: Dictionary containing new key-value pairs for the secret
:return: Created or updated secret object
"""
try:
# Try to read the existing secret
existing_secret = self.core_api.read_namespaced_secret(secret_name, self.namespace)
# If secret exists, update it
existing_data = {k: b64decode(v).decode() for k, v in existing_secret.data.items()}
existing_data.update(data)
# Encode all data to base64
encoded_data = {k: b64encode(v.encode()).decode() for k, v in existing_data.items()}
# Update the existing secret
existing_secret.data = encoded_data
return self.core_api.replace_namespaced_secret(secret_name, self.namespace, existing_secret)
except ApiException as e:
if e.status == 404:
# Secret doesn't exist, create a new one
return self.create_secret(secret_name, data)
else:
logger.error(f"Error upserting secret {secret_name}: {e}")
raise
def get_secret(self, name: str) -> dict | None:
"""
Read a secret from the specified namespace.
Args:
name (str): The name of the secret to read.
Returns:
V1Secret: The secret object.
"""
try:
secret = self.core_api.read_namespaced_secret(name, self.namespace)
return {k: b64decode(v).decode() for k, v in secret.data.items()}
except ApiException as e:
if e.status == 404:
return None
raise
def update_secret(self, name: str, data: dict):
"""
Update an existing secret in the specified namespace.
Args:
name (str): The name of the secret to update.
data (dict): A dictionary containing the key-value pairs for the updated secret data.
Returns:
V1Secret: The updated secret object.
"""
# Get the existing secret
secret = self.core_api.read_namespaced_secret(name, self.namespace)
if secret is None:
raise ApiException(status=404, reason="Not Found", msg="Secret not found")
# Update the secret data
encoded_data = {k: b64encode(v.encode()).decode() for k, v in data.items()}
secret.data.update(encoded_data)
# Update the secret in Kubernetes
return self.core_api.replace_namespaced_secret(name, self.namespace, secret)
def delete_secret_key(self, name: str, key: str):
"""
Delete a key from the specified secret in the namespace.
Args:
name (str): The name of the secret.
key (str): The key to delete from the secret.
Returns:
V1Secret: The updated secret object.
"""
# Get the existing secret
secret = self.core_api.read_namespaced_secret(name, self.namespace)
if secret is None:
raise ApiException(status=404, reason="Not Found", msg="Secret not found")
# Delete the key from the secret data
if key in secret.data:
del secret.data[key]
else:
raise ApiException(status=404, reason="Not Found", msg="Key not found in the secret")
# Update the secret in Kubernetes
return self.core_api.replace_namespaced_secret(name, self.namespace, secret)
def delete_secret(self, name: str):
"""
Delete a secret from the specified namespace.
Args:
name (str): The name of the secret to delete.
Returns:
V1Status: The status object indicating the success or failure of the operation.
"""
return self.core_api.delete_namespaced_secret(name, self.namespace)
# utility function to encode user_id to base64 lower case and numbers only
# this is required by kubernetes secret name restrictions
def encode_user_id(user_id: str) -> str:
return b64encode(user_id.encode()).decode().lower().replace("=", "").replace("+", "-").replace("/", "_")

View file

@ -1,5 +1,5 @@
import os
from typing import TYPE_CHECKING, Optional, Union
from typing import TYPE_CHECKING, Optional, Tuple, Union
from uuid import UUID
from fastapi import Depends
@ -8,13 +8,17 @@ from sqlmodel import Session, select
from langflow.services.auth import utils as auth_utils
from langflow.services.base import Service
from langflow.services.variable.base import VariableService
from langflow.services.database.models.variable.model import Variable, VariableCreate
from langflow.services.deps import get_session
from langflow.services.variable.base import VariableService
from langflow.services.variable.kubernetes_secrets import KubernetesSecretManager, encode_user_id
if TYPE_CHECKING:
from langflow.services.settings.service import SettingsService
CREDENTIAL_TYPE = "Credential"
GENERIC_TYPE = "Generic"
class DatabaseVariableService(VariableService, Service):
def __init__(self, settings_service: "SettingsService"):
@ -40,7 +44,7 @@ class DatabaseVariableService(VariableService, Service):
name=var,
value=value,
default_fields=[],
_type="Credential",
_type=CREDENTIAL_TYPE,
session=session,
)
except Exception as e:
@ -60,7 +64,7 @@ class DatabaseVariableService(VariableService, Service):
# credential = session.query(Variable).filter(Variable.user_id == user_id, Variable.name == name).first()
variable = session.exec(select(Variable).where(Variable.user_id == user_id, Variable.name == name)).first()
if variable and variable.type == "Credential" and field == "session_id":
if variable.type == CREDENTIAL_TYPE and field == "session_id": # type: ignore
raise TypeError(
f"variable {name} of type 'Credential' cannot be used in a Session ID field "
"because its purpose is to prevent the exposure of values."
@ -112,7 +116,7 @@ class DatabaseVariableService(VariableService, Service):
name: str,
value: str,
default_fields: list[str] = [],
_type: str = "Generic",
_type: str = GENERIC_TYPE,
session: Session = Depends(get_session),
):
variable_base = VariableCreate(
@ -131,37 +135,109 @@ class DatabaseVariableService(VariableService, Service):
class KubernetesSecretService(VariableService, Service):
def __init__(self, settings_service: "SettingsService"):
self.settings_service = settings_service
# TODO: settings_service to set kubernetes namespace
self.kubernetes_secrets = KubernetesSecretManager()
def initialize_user_variables(self, user_id: Union[UUID, str], session: Session = Depends(get_session)):
return
def initialize_user_variables(self, user_id: Union[UUID, str], session: Session):
# Check for environment variables that should be stored in the database
should_or_should_not = "Should" if self.settings_service.settings.store_environment_variables else "Should not"
logger.info(f"{should_or_should_not} store environment variables in the kubernetes.")
if self.settings_service.settings.store_environment_variables:
variables = {}
for var in self.settings_service.settings.variables_to_get_from_environment:
if var in os.environ:
logger.debug(f"Creating {var} variable from environment.")
value = os.environ[var]
if isinstance(value, str):
value = value.strip()
key = CREDENTIAL_TYPE + "_" + var
variables[key] = str(value)
try:
secret_name = user_id
self.kubernetes_secrets.create_secret(
name=secret_name,
data=variables,
)
except Exception as e:
logger.error(f"Error creating {var} variable: {e}")
else:
logger.info("Skipping environment variable storage.")
# resolve_variable is a helper function that resolves the variable name to the actual key in the secret
def resolve_variable(
self,
secret_name: str,
user_id: Union[UUID, str],
name: str,
) -> Tuple[str, str]:
variables = self.kubernetes_secrets.get_secret(name=secret_name)
if not variables:
raise ValueError(f"user_id {user_id} variable not found.")
if name in variables:
return name, variables[name]
else:
credential_name = CREDENTIAL_TYPE + "_" + name
if credential_name in variables:
return credential_name, variables[credential_name]
else:
raise ValueError(f"user_id {user_id} variable name {name} not found.")
def get_variable(
self,
user_id: Union[UUID, str],
name: str,
field: str,
session: Session = Depends(get_session),
_session: Session = None,
) -> str:
return ""
secret_name = encode_user_id(user_id)
key, value = self.resolve_variable(secret_name, user_id, name)
if key.startswith(CREDENTIAL_TYPE + "_") and field == "session_id": # type: ignore
raise TypeError(
f"variable {name} of type 'Credential' cannot be used in a Session ID field "
"because its purpose is to prevent the exposure of values."
)
return value
def list_variables(self, user_id: Union[UUID, str], session: Session = Depends(get_session)) -> list[Optional[str]]:
return []
def list_variables(
self,
user_id: Union[UUID, str],
_session: Session = None,
) -> list[Optional[str]]:
variables = self.kubernetes_secrets.get_secret(name=encode_user_id(user_id))
if not variables:
return []
names = []
for key in variables.keys():
if key.startswith(CREDENTIAL_TYPE + "_"):
names.append(key[len(CREDENTIAL_TYPE) + 1 :])
else:
names.append(key)
return names
def update_variable(
self,
user_id: Union[UUID, str],
name: str,
value: str,
session: Session = Depends(get_session),
_session: Session = None,
):
return
secret_name = encode_user_id(user_id)
secret_key, _ = self.resolve_variable(secret_name, user_id, name)
return self.kubernetes_secrets.update_secret_key(name=secret_name, data={secret_key: value})
def delete_variable(
self,
user_id: Union[UUID, str],
name: str,
session: Session = Depends(get_session),
_session: Session = None,
):
secret_name = encode_user_id(user_id)
secret_key, _ = self.resolve_variable(secret_name, user_id, name)
self.kubernetes_secrets.delete_secret_key(name=secret_name, key=secret_key)
return
def create_variable(
@ -171,6 +247,11 @@ class KubernetesSecretService(VariableService, Service):
value: str,
default_fields: list[str] = [],
_type: str = "Generic",
session: Session = Depends(get_session),
_session: Session = None,
):
return
secret_name = encode_user_id(user_id)
secret_key = name
if _type == CREDENTIAL_TYPE:
secret_key = CREDENTIAL_TYPE + "_" + name
return self.kubernetes_secrets.upsert_secret(name=secret_name, data={secret_key: value})

View file

@ -0,0 +1,50 @@
import pytest
from unittest.mock import MagicMock, patch
from kubernetes.client.rest import ApiException
from kubernetes.client import V1ObjectMeta, V1Secret
from base64 import b64encode
from langflow.services.variable.kubernetes_secrets import KubernetesSecretManager
@pytest.fixture
def secret_manager():
return KubernetesSecretManager(namespace='test-namespace')
def test_create_secret(secret_manager, mocker):
mocker.patch.object(secret_manager.core_api, 'create_namespaced_secret', return_value=V1Secret(metadata=V1ObjectMeta(name='test-secret')))
secret_manager.create_secret(name='test-secret', data={'key': 'value'})
secret_manager.core_api.create_namespaced_secret.assert_called_once_with(
'test-namespace',
V1Secret(
api_version='v1',
kind='Secret',
metadata=V1ObjectMeta(name='test-secret'),
type='Opaque',
data={'key': b64encode('value'.encode()).decode()}
)
)
def test_get_secret(secret_manager, mocker):
mock_secret = V1Secret(data={'key': b64encode('value'.encode()).decode()})
mocker.patch.object(secret_manager.core_api, 'read_namespaced_secret', return_value=mock_secret)
secret_data = secret_manager.get_secret(name='test-secret')
secret_manager.core_api.read_namespaced_secret.assert_called_once_with('test-secret', 'test-namespace')
assert secret_data == {'key': 'value'}
def test_update_secret(secret_manager, mocker):
mocker.patch.object(secret_manager.core_api, 'replace_namespaced_secret', return_value=V1Secret(metadata=V1ObjectMeta(name='test-secret')))
secret_manager.update_secret(name='test-secret', data={'key': 'new-value'})
secret_manager.core_api.replace_namespaced_secret.assert_called_once_with(
'test-secret',
'test-namespace',
V1Secret(metadata=V1ObjectMeta(name='test-secret'), data={'key': 'new-value'})
)
def test_delete_secret(secret_manager, mocker):
mocker.patch.object(secret_manager.core_api, 'delete_namespaced_secret', return_value=MagicMock(status='Success'))
secret_manager.delete_secret(name='test-secret')
secret_manager.core_api.delete_namespaced_secret.assert_called_once_with('test-secret', 'test-namespace')