Source code for s3vaultlib.connection.connectionmanager

#!/usr/bin/env python
import logging
from copy import deepcopy

import boto3

from s3vaultlib import __application__
from s3vaultlib.connection.defaults import DEFAULT_TOKEN_FILENAME
from s3vaultlib.metadata.factory import MetadataFactory

__author__ = "Giuseppe Chiesa"
__copyright__ = "Copyright 2017-2021, Giuseppe Chiesa"
__credits__ = ["Giuseppe Chiesa"]
__license__ = "BSD"
__maintainer__ = "Giuseppe Chiesa"
__email__ = "mail@giuseppechiesa.it"
__status__ = "PerpetualBeta"


[docs]class ConnectionManager(object): """ Object that allocate connection by supporting also connection profile and extended paramaters """ def __init__(self, region=None, endpoint=None, is_ec2=False, **params): self.logger = logging.getLogger('{a}.{m}'.format(a=__application__, m=self.__class__.__name__)) self.region = region self._endpoint = endpoint self._is_ec2 = is_ec2 self._params = params self.session_info = None @property def is_ec2(self): return self._is_ec2
[docs] def client(self, resource): """Returns a client connection""" return self._connection('client', resource)
def _get_identity_arg(self, session): client = session.client('sts') """ :type: pyboto3.sts """ try: response = client.get_caller_identity() arn = response['Arn'] except Exception as e: self.logger.error('Error while retrieving identity arn. Type: {t}. Error: ' '{e}'.format(t=str(type(e)), e=str(e))) return 'n/a' return arn def _connection(self, conn_type=None, resource=None): """Allocate a connection""" params = deepcopy(self._params) profile = params.pop('profile_name', None) token = params.pop('token', None) self.session_info = {'profile_name': profile} if token: self.logger.debug('Connection will use session token: {f}'.format(f=DEFAULT_TOKEN_FILENAME)) self.session_info = {'aws_access_key_id': token['AccessKeyId'], 'aws_secret_access_key': token['SecretAccessKey'], 'aws_session_token': token['SessionToken'], 'region_name': token['Region'] } # command line passed region takes precedence if self.region: self.session_info['region_name'] = self.region if conn_type not in ['both', 'resource', 'client']: raise ValueError('connection: {c} not supported'.format(c=conn_type)) session = boto3.session.Session(**self.session_info) self.logger.info('Using identity: {a}'.format(a=self._get_identity_arg(session))) if not self.region: metadata = MetadataFactory().get_instance(self._is_ec2, session_info=self.session_info) self.region = metadata.region if conn_type == 'resource': resource = session.resource(resource, region_name=self.region, endpoint_url=self._endpoint, **params) return resource elif conn_type == 'client': client = session.client(resource, region_name=self.region, endpoint_url=self._endpoint, **params) return client else: client = session.client(resource, region_name=self.region, endpoint_url=self._endpoint, **params) resource = session.resource(resource, region_name=self.region, endpoint_url=self._endpoint, **params) return client, resource