-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing
13 changed files
with
407 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
from ._vaultrepo import * | ||
from ._vaultadmin import * |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
__all__ = [ | ||
'VaultGateway', | ||
] | ||
from fidelius.structs import * | ||
from ._structs import * | ||
import hvac | ||
|
||
|
||
import logging | ||
log = logging.getLogger(__file__) | ||
|
||
|
||
class VaultGateway: | ||
def __init__(self, url: str, token: str, verify: bool = True, timeout: int = 30, namespace: Optional[str] = None): | ||
self._client = hvac.Client(url=url, token=token, verify=verify, timeout=timeout, namespace=namespace) | ||
self._keyvals: Dict[str, Dict[str, str]] = {} # self._keyvals[path][key] = val | ||
|
||
def flush_cache(self): | ||
self._keyvals = {} | ||
|
||
def _read_secret(self, path: str) -> VaultResponse: | ||
res_dict = self._client.secrets.kv.read_secret(path=path) | ||
return VaultResponse.from_dict(res_dict) | ||
|
||
def _load_path(self, path: str): | ||
if path not in self._keyvals: | ||
self._keyvals[path] = {} | ||
res = self._read_secret(path) | ||
if res.data and isinstance(res.data.data, dict): | ||
self._keyvals[path] = res.data.data | ||
else: | ||
log.error(f'The data for requested path was not a dict or doesnt exist! {path=}, {res=}') | ||
|
||
def get_secret_param(self, path: str, key: str) -> Optional[str]: | ||
self._load_path(path) | ||
return self._keyvals[path].get(key, None) | ||
|
||
def _force_path_update(self, path: str): | ||
# First, clear this path from the cache! | ||
if path in self._keyvals: | ||
del self._keyvals[path] | ||
# Then, load the path so we're up to date! | ||
self._load_path(path) | ||
|
||
def create_secret_param(self, path: str, key: str, value: str): | ||
self._force_path_update(path) | ||
old_data = self._keyvals[path] | ||
if key in old_data: | ||
raise FideliusParameterAlreadyExists(f'parameter already exists: {path}/{key}') | ||
old_data[key] = value | ||
self._client.secrets.kv.create_or_update_secret(path=path, secret=old_data) | ||
self._force_path_update(path) | ||
|
||
def set_metadata(self, path: str, metadata: Dict[str, str]): | ||
self._client.secrets.kv.update_metadata(path=path, custom_metadata=metadata) | ||
|
||
def update_secret_param(self, path: str, key: str, value: str): | ||
self._force_path_update(path) | ||
old_data = self._keyvals[path] | ||
if key not in old_data: | ||
raise FideliusParameterNotFound(f'parameter not found: {path}/{key}') | ||
old_data[key] = value | ||
self._client.secrets.kv.create_or_update_secret(path=path, secret=old_data) | ||
self._force_path_update(path) | ||
|
||
def delete_secret_param(self, path: str, key: str): | ||
self._force_path_update(path) | ||
old_data = self._keyvals[path] | ||
if key not in old_data: | ||
raise FideliusParameterNotFound(f'parameter not found: {path}/{key}') | ||
del old_data[key] | ||
self._client.secrets.kv.create_or_update_secret(path=path, secret=old_data) | ||
self._force_path_update(path) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
from ._vaultrepo import VaultKeyValRepo as FideliusRepo | ||
from ._vaultadmin import VaultKeyValAdmin as FideliusAdmin |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
__all__ = [ | ||
'VaultResponse', | ||
'VaultResponseData', | ||
'VaultResponseMetadata', | ||
] | ||
|
||
from ccptools.structs import * | ||
from fidelius.utils import SelfResolvingFromDictDataclass | ||
|
||
|
||
@dataclasses.dataclass | ||
class VaultResponseMetadata(SelfResolvingFromDictDataclass): | ||
created_time: Optional[Datetime] = None | ||
custom_metadata: Any = None | ||
deletion_time: Optional[Datetime] = None | ||
destroyed: bool = False | ||
version: Optional[int] = None | ||
|
||
|
||
@dataclasses.dataclass | ||
class VaultResponseData(SelfResolvingFromDictDataclass): | ||
data: Dict[str, str] = dataclasses.field(default_factory=dict) | ||
metadata: Optional[VaultResponseMetadata] = None | ||
|
||
|
||
@dataclasses.dataclass | ||
class VaultResponse(SelfResolvingFromDictDataclass): | ||
request_id: str = '' | ||
lease_id: Optional[str] = '' | ||
renewable: bool = False | ||
lease_duration: Optional[int] = None | ||
data: Optional[VaultResponseData] = None | ||
wrap_info: Optional[Any] = None | ||
warnings: Optional[Any] = None | ||
auth: Optional[Any] = None | ||
mount_type: Optional[str] = None | ||
|
||
def get_keyval(self, key: str) -> Optional[str]: | ||
if isinstance(self.data, VaultResponseData): | ||
if isinstance(self.data.data, dict): | ||
return self.data.data.get(key, None) | ||
return None | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
__all__ = [ | ||
'VaultKeyValAdmin', | ||
] | ||
|
||
from fidelius.structs import * | ||
from fidelius.gateway._abstract import * | ||
from ._vaultrepo import * | ||
|
||
import logging | ||
log = logging.getLogger(__name__) | ||
|
||
|
||
class VaultKeyValAdmin(_BaseFideliusAdminRepo, VaultKeyValRepo): | ||
def __init__(self, app_props: FideliusAppProps, tags: Optional[FideliusTags] = None, **kwargs): | ||
log.debug('VaultKeyValAdmin.__init__') | ||
super().__init__(app_props, tags, **kwargs) | ||
|
||
def create_param(self, name: str, value: str, | ||
description: Optional[str] = None, env: Optional[str] = None) -> (str, str): | ||
self._gw.create_secret_param(path=self._nameless_path(env=env), key=name, value=value) | ||
self._gw.set_metadata(path=self._nameless_path(env=env), metadata=self.tags.to_dict()) | ||
return self.get_full_path(name, env=env), self.get_expression_string(name) | ||
|
||
def update_param(self, name: str, value: str, | ||
description: Optional[str] = None, env: Optional[str] = None) -> (str, str): | ||
self._gw.update_secret_param(path=self._nameless_path(env=env), key=name, value=value) | ||
return self.get_full_path(name, env=env), self.get_expression_string(name) | ||
|
||
def delete_param(self, name: str, env: Optional[str] = None): | ||
self._gw.delete_secret_param(path=self._nameless_path(env=env), key=name) | ||
|
||
def create_shared_param(self, name: str, folder: str, value: str, | ||
description: Optional[str] = None, env: Optional[str] = None) -> (str, str): | ||
self._gw.create_secret_param(path=self._nameless_path(folder=folder, env=env), key=name, value=value) | ||
self._gw.set_metadata(path=self._nameless_path(folder=folder, env=env), metadata=self.tags.to_dict()) | ||
return self.get_full_path(name, folder=folder, env=env), self.get_expression_string(name, folder=folder) | ||
|
||
def update_shared_param(self, name: str, folder: str, value: str, | ||
description: Optional[str] = None, env: Optional[str] = None) -> (str, str): | ||
self._gw.update_secret_param(path=self._nameless_path(folder=folder, env=env), key=name, value=value) | ||
return self.get_full_path(name, folder=folder, env=env), self.get_expression_string(name, folder=folder) | ||
|
||
def delete_shared_param(self, name: str, folder: str, env: Optional[str] = None): | ||
self._gw.delete_secret_param(path=self._nameless_path(env=env, folder=folder), key=name) | ||
|
||
def create_secret(self, name: str, value: str, | ||
description: Optional[str] = None, env: Optional[str] = None) -> (str, str): | ||
return self.create_param(name=name, value=value, description=description, env=env) | ||
|
||
def update_secret(self, name: str, value: str, | ||
description: Optional[str] = None, env: Optional[str] = None) -> (str, str): | ||
return self.update_param(name=name, value=value, description=description, env=env) | ||
|
||
def delete_secret(self, name: str, env: Optional[str] = None): | ||
self.delete_param(name=name, env=env) | ||
|
||
def create_shared_secret(self, name: str, folder: str, value: str, | ||
description: Optional[str] = None, env: Optional[str] = None) -> (str, str): | ||
return self.create_shared_param(name=name, folder=folder, value=value, description=description, env=env) | ||
|
||
def update_shared_secret(self, name: str, folder: str, value: str, | ||
description: Optional[str] = None, env: Optional[str] = None) -> (str, str): | ||
return self.update_shared_param(name=name, folder=folder, value=value, description=description, env=env) | ||
|
||
def delete_shared_secret(self, name: str, folder: str, env: Optional[str] = None): | ||
self.delete_shared_param(name=name, folder=folder, env=env) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
__all__ = [ | ||
'VaultKeyValRepo', | ||
] | ||
|
||
from fidelius.structs import * | ||
from fidelius.gateway._abstract import * | ||
from ._client import * | ||
|
||
import os | ||
|
||
import logging | ||
log = logging.getLogger(__name__) | ||
|
||
|
||
class VaultKeyValRepo(_BaseFideliusRepo): | ||
def __init__(self, app_props: FideliusAppProps, | ||
vault_url: Optional[str] = None, | ||
vault_token: Optional[str] = None, | ||
|
||
verify: Union[bool, str] = True, | ||
timeout_sec: int = 30, | ||
flush_cache_every_time: bool = False, | ||
**kwargs): | ||
"""Fidelius Admin Repo that uses Hashicorp's Vault and its Secrets Key/Value store as a backend | ||
VAULT_ADDR | ||
VAULT_TOKEN | ||
VAULT_CACERT | ||
VAULT_CAPATH | ||
VAULT_CLIENT_CERT | ||
VAULT_CLIENT_KEY | ||
:param app_props: The current application properties. | ||
... | ||
:param flush_cache_every_time: Optional flat that'll flush the entire | ||
cache before every operation if set to | ||
True and is just intended for testing | ||
purposes. | ||
""" | ||
super().__init__(app_props, **kwargs) | ||
self._flush_cache_every_time = flush_cache_every_time | ||
|
||
self._vault_url = vault_url or os.environ.get('FIDELIUS_VAULT_ADDR', '') or os.environ.get('VAULT_ADDR', '') | ||
if not self._vault_url: | ||
raise EnvironmentError('Fidelius VaultKeyValRepo requires the base API URL address for Vault when initialising or in the FIDELIUS_VAULT_ADDR or VAULT_ADDR environment variables') | ||
|
||
self._vault_token = vault_token or os.environ.get('FIDELIUS_VAULT_TOKEN', '') or os.environ.get('VAULT_TOKEN', '') | ||
if not self._vault_token: | ||
raise EnvironmentError('Fidelius VaultKeyValRepo requires a vault token to access Vault when initialising or in the FIDELIUS_VAULT_ADDR or VAULT_ADDR environment variables') | ||
|
||
self._verify = verify | ||
self._timeout_sec = timeout_sec | ||
|
||
self._gw = VaultGateway(url=self._vault_url, token=self._vault_token, verify=self._verify, timeout=self._timeout_sec) | ||
|
||
def _nameless_path(self, folder: Optional[str] = None, env: Optional[str] = None) -> str: | ||
return self.get_full_path(name='', folder=folder, env=env)[:-1] | ||
|
||
def get_app_param(self, name: str, env: Optional[str] = None) -> Optional[str]: | ||
return self._gw.get_secret_param(self._nameless_path(env=env), name) | ||
|
||
def get_shared_param(self, name: str, folder: str, env: Optional[str] = None) -> Optional[str]: | ||
return self._gw.get_secret_param(self._nameless_path(folder=folder, env=env), name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from ._selfresdc import * |
Oops, something went wrong.