Source code for s3vaultlib.config.configmanager

#!/usr/bin/env python
import logging

from s3vaultlib import __application__
from s3vaultlib.utils import yaml
from s3vaultlib.utils.yaml import ParserError, ScannerError
from .role import Role

__author__ = "Giuseppe Chiesa"
__copyright__ = "Copyright 2017-2021, Giuseppe Chiesa"
__credits__ = ["Giuseppe Chiesa"]
__license__ = "BSD"
__maintainer__ = "Giuseppe Chiesa"
__email__ = "mail@giuseppechiesa.it"
__status__ = "PerpetualBeta"


[docs]class ConfigException(Exception): pass
[docs]class ConfigManager(object): def __init__(self, config_file): self.logger = logging.getLogger('{a}.{m}'.format(a=__application__, m=self.__class__.__name__)) self._config_file = config_file self.vault = None self.roles = [] self._role_paths = []
[docs] def get_all_path(self): return self._role_paths
[docs] def load_config(self): data = {} try: with open(self._config_file, 'r') as fin: data = yaml.load_from_stream(fin.read()) except (ParserError, ScannerError) as e: self.logger.error('Unable to load config file. Error is: {}'.format(str(e))) if not data.get('s3vaultlib', None): raise ConfigException('No s3vaultlib config section in the file') try: self.load_vault(data['s3vaultlib']) except ConfigException: raise try: self.load_roles(data['s3vaultlib']) except ConfigException: raise return self
[docs] def load_vault(self, s3vault_configdata): if not s3vault_configdata.get('vault', None): raise ConfigException('Vault section empty') if not s3vault_configdata.get('vault', {}).get('bucket', None): raise ConfigException('No bucket configured for vault') self.vault = dict(bucket=s3vault_configdata['vault']['bucket'])
[docs] def load_roles(self, s3vault_configdata=None): if not (s3vault_configdata.get('roles', None) and len(s3vault_configdata.get('roles', []))): raise ConfigException('No roles configured') for elem in s3vault_configdata['roles']: self.roles.append(self.parse_role_config(elem))
[docs] def parse_role_config(self, role_config): if not role_config.get('name', ''): raise ConfigException('No role name provided for config: {}'.format(role_config)) role = Role(role_config['name'], config_obj=self) if not role_config.get('path', []): role.path = role.name else: role.path = role_config['path'] if not role.is_path_all(): self._role_paths.extend(role.path) if not role_config.get('privileges', []): raise ConfigException('Privileges not set for role: {}'.format(role.name)) role.privileges = role_config['privileges'] if not role_config.get('kms_alias', ''): role.kms_alias = role.name else: role.kms_alias = role_config['kms_alias'] if role_config.get('managed_policies', []): role.managed_policies = role_config['managed_policies'] return role
@property def path_all(self): return self._role_paths