#!/usr/bin/env python
import json
import logging
import os
from collections import OrderedDict
from jinja2 import Environment, FileSystemLoader
from string_utils import snake_case_to_camel
from ..connection.connectionmanager import ConnectionManager
from s3vaultlib import __application__
__author__ = "Giuseppe Chiesa"
__copyright__ = "Copyright 2017-2021, Giuseppe Chiesa"
__credits__ = ["Giuseppe Chiesa"]
__license__ = "BSD"
__maintainer__ = "Giuseppe Chiesa"
__email__ = "mail@giuseppechiesa.it"
__status__ = "PerpetualBeta"
BASE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), '../')
[docs]class PolicyManager(object):
def __init__(self, config_manager):
"""
:type config_manager: ConnectionManager
:param config_manager:
"""
self.logger = logging.getLogger('{a}.{m}'.format(a=__application__, m=self.__class__.__name__))
self._config_manager = config_manager
self._j2env = Environment(loader=FileSystemLoader(os.path.join(BASE_PATH, '_resources', 'templates')),
trim_blocks=True, autoescape=False)
self._j2env.filters['cfsanitize'] = cloudformation_sanitize_string
self._policy_variables = None
self._connection_factory = None
self._account_id = ''
[docs] def get_policy_variables(self):
if not self._policy_variables:
self._policy_variables = self._load_vars()
return self._policy_variables
def _generate_bucket(self):
pass
def _generate_roles(self):
self.logger.info('Generating IAM roles and policies')
template = self._j2env.get_template('roles.j2')
cf_data = template.render(self.get_policy_variables())
return cf_data
def _generate_kms(self):
self.logger.info('Generating KMS resources')
template = self._j2env.get_template('kms.j2')
cf_data = template.render(self.get_policy_variables())
return cf_data
def _generate_groups(self):
self.logger.info('Generating IAM groups')
template = self._j2env.get_template('groups.j2')
cf_data = template.render(self.get_policy_variables())
return cf_data
def _generate_bucket_policy(self):
self.logger.info('Generating S3 bucket policy')
template = self._j2env.get_template('bucket_policy.j2')
cf_data = template.render(self.get_policy_variables())
return cf_data
def _generate_outputs(self):
self.logger.info('Generating Cloudformation exports')
template = self._j2env.get_template('outputs.j2')
cf_data = template.render(self.get_policy_variables())
return cf_data
def _load_vars(self):
self._data = dict(vault=dict(bucket=self._config_manager.vault['bucket'],
path_all=self._config_manager.path_all,
roles=self._config_manager.roles))
if self._connection_factory:
self._data['vault']['roles'] = [role.with_connection_factory(self._connection_factory) for role in
self._config_manager.roles]
return self._data
[docs] def with_connection_factory(self, connection_factory):
self._connection_factory = connection_factory
return self
[docs] def with_account_id(self, account_id):
self._account_id = account_id
return self