Source code for s3vaultlib.kms.kmsresolver

#!/usr/bin/env python
import logging

from s3vaultlib import __application__
from s3vaultlib.metadata.factory import MetadataFactory

__author__ = "Giuseppe Chiesa"
__copyright__ = "Copyright 2017-2021, Giuseppe Chiesa"
__credits__ = ["Giuseppe Chiesa"]
__license__ = "BSD"
__maintainer__ = "Giuseppe Chiesa"
__email__ = "mail@giuseppechiesa.it"
__status__ = "PerpetualBeta"


[docs]class KMSResolverException(Exception): pass
[docs]class KMSResolver(object): """ Object that resolves the KMS key associated to a role, or load a keyarn with a specified alias """ def __init__(self, connection_manager, keyalias='', role_name=''): self.logger = logging.getLogger('{a}.{m}'.format(a=__application__, m=self.__class__.__name__)) self._connection_manager = connection_manager """ :type s3vaultlib.connection.connectionmanager.ConnectionManager """ self._keyalias = keyalias self._role = role_name self._kms = self._connection_manager.client('kms') """ :type : pyboto3.kms """ def _get_key_from_alias(self, alias): key_id = 'alias/{a}'.format(a=alias.rpartition('alias/')[-1]) key_data = self._kms.describe_key(KeyId=key_id) if not key_data.get('KeyMetadata'): return '' return key_data['KeyMetadata'].get('Arn')
[docs] def retrieve_key_arn(self): """ Return the KMS arn of a key :return: key arn :rtype: basestring """ key_arn = '' if self._keyalias: key_arn = self._get_key_from_alias(self._keyalias) if key_arn: return key_arn if self._role: key_arn = self._get_key_from_alias(self._role) if key_arn: return key_arn metadata = MetadataFactory().get_instance(is_ec2=self._connection_manager.is_ec2, session_info=self._connection_manager.session_info) try: role = metadata.role except Exception as e: self.logger.error('Error while retrieving role. Type: {t}. Error: {e}'.format(t=str(type(e)), e=str(e))) raise key_arn = self._get_key_from_alias(role) if not key_arn: raise KMSResolverException('Unable to resolve the key from role: {r}'.format(r=role)) return key_arn