Source code for s3vaultlib.config.role

#!/usr/bin/env python
import logging

from s3vaultlib import __application__
from s3vaultlib.kms.kmsresolver import KMSResolver

__author__ = "Giuseppe Chiesa"
__copyright__ = "Copyright 2017-2021, Giuseppe Chiesa"
__credits__ = ["Giuseppe Chiesa"]
__license__ = "BSD"
__maintainer__ = "Giuseppe Chiesa"
__email__ = "mail@giuseppechiesa.it"
__status__ = "PerpetualBeta"


[docs]class RoleException(Exception): pass
[docs]class Role(object): SUPPORTED_PRIVILEGES = ('read', 'write') def __init__(self, name, config_obj): """ Role Object :param name: role name :param config_obj: ConfigManager object :type config_obj: ConfigManager """ self.logger = logging.getLogger('{a}.{m}'.format(a=__application__, m=self.__class__.__name__)) self._name = name self._priv = [] self._path = [] self._config_obj = config_obj """ :type : S3VaultConfig """ self._kms_alias = '' self._kms_arn = None self._managed_policies = [] self._connection_factory = None self._arn = None @property def name(self): return self._name @property def path(self): if self.is_path_all(): return self._config_obj.get_all_path() else: return self._path @path.setter def path(self, path): if isinstance(path, list): self._path = path else: self._path = [path] self._path = list(map(self._path_sanitizer, self._path)) @staticmethod def _path_sanitizer(path): if path[-1] == '/': return path[:-1] return path @property def privileges(self): return self._priv @privileges.setter def privileges(self, privileges): priv = [] if not isinstance(privileges, list): priv.append(privileges) else: priv = privileges if any([p not in self.SUPPORTED_PRIVILEGES for p in priv]): raise RoleException('Privileges: {p} contains not supported keywords. Allowed: ' '{a}'.format(p=privileges, a=self.SUPPORTED_PRIVILEGES)) self._priv = priv @property def kms_alias(self): return self._kms_alias @kms_alias.setter def kms_alias(self, kms_alias): self._kms_alias = kms_alias @property def managed_policies(self): return self._managed_policies @managed_policies.setter def managed_policies(self, policy_list): self._managed_policies = policy_list
[docs] def is_path_all(self): return self._path == ['_all_']
[docs] def with_connection_factory(self, connection_factory): self._connection_factory = connection_factory return self
@property def kms_arn(self): if not self._connection_factory: self.logger.warning('Connection factory not instantiated, you may want to use with_connection_factory ' 'first') return '' if not self._kms_arn: kms_resolver = KMSResolver(connection_manager=self._connection_factory, keyalias=self.kms_alias) self._kms_arn = kms_resolver.retrieve_key_arn() return self._kms_arn