Source code for s3vaultlib.connection.tokenmanager

#!/usr/bin/env python
import json
import logging
import os
import uuid
from copy import deepcopy
from datetime import datetime
from stat import S_IRUSR, S_IWUSR

import pytz
from botocore.client import Config
from dateutil import parser

from s3vaultlib import __application__
from s3vaultlib.connection.connectionmanager import ConnectionManager
from s3vaultlib.metadata.factory import MetadataFactory
from .defaults import DEFAULT_TOKEN_FILENAME

__author__ = "Giuseppe Chiesa"
__copyright__ = "Copyright 2017-2021, Giuseppe Chiesa"
__credits__ = ["Giuseppe Chiesa"]
__license__ = "BSD"
__maintainer__ = "Giuseppe Chiesa"
__email__ = "mail@giuseppechiesa.it"
__status__ = "PerpetualBeta"


[docs]class TokenManagerException(Exception): pass
[docs]class TokenManager(object): TOKEN_FILENAME = DEFAULT_TOKEN_FILENAME def __init__(self, role_name=None, role_arn=None, external_id=None, connection_factory=None, is_ec2=False): self.logger = logging.getLogger('{a}.{m}'.format(a=__application__, m=self.__class__.__name__)) self._role_name = role_name self._role_arn = role_arn self._external_id = external_id self._client = None self._connection_factory = connection_factory if not self._connection_factory: self._connection_factory = ConnectionManager(config=Config(signature_version='s3v4'), is_ec2=is_ec2) @property def role_arn(self): if not self._role_arn: self._role_arn = self._get_role_arn() return self._role_arn def _get_role_arn(self): metadata = MetadataFactory().get_instance(is_ec2=self._connection_factory.is_ec2, session_info=self._connection_factory.session_info) if not self._role_arn and not self._role_name: raise TokenManagerException('TokenManager requires either role name or role arn to perform the action.') if self._role_arn: return self._role_arn return 'arn:aws:iam::{account_id}:role/{role_name}'.format(account_id=metadata.account_id, role_name=self._role_name)
[docs] def generate_token(self): # delete the token if exists self._delete_token() # generate a new session client = self._connection_factory.client('sts') """ :type: pyboto3.sts """ role_args = { 'RoleArn': self.role_arn, 'RoleSessionName': 's3vault_{}'.format(str(uuid.uuid4()).replace('-', '')), } if self._external_id: role_args['ExternalId'] = self._external_id response = None try: response = client.assume_role(**role_args) token_dict = deepcopy(response.get('Credentials', {})) except Exception as e: self.logger.error('Error while assuming role with info: {i}. Type: {t}. Error: ' '{e}'.format(i=role_args, t=str(type(e)), e=str(e))) raise TokenManagerException(e) # convert the date to string token_dict['Expiration'] = str(token_dict['Expiration']) token_dict['Region'] = self._connection_factory.region self._save_token(token_dict)
def _save_token(self, token_dict): with open(os.path.expanduser(self.TOKEN_FILENAME), 'wb') as f_token: f_token.write(json.dumps(token_dict).encode()) os.chmod(os.path.expanduser(self.TOKEN_FILENAME), S_IRUSR | S_IWUSR) def _delete_token(self): token_file = os.path.expanduser(self.TOKEN_FILENAME) if os.path.exists(token_file): os.unlink(os.path.expanduser(self.TOKEN_FILENAME)) def _read_token(self): if not os.path.exists(os.path.expanduser(self.TOKEN_FILENAME)): return None try: with open(os.path.expanduser(self.TOKEN_FILENAME), 'rb') as f_token: data = f_token.read() token_dict = json.loads(data.decode()) except ValueError: self.logger.error('Invalid token file: {f}'.format(f=os.path.expanduser(self.TOKEN_FILENAME))) return None # convert the date token_dict['Expiration'] = parser.parse(token_dict['Expiration']) return token_dict
[docs] @staticmethod def remaining_seconds(token_dict): t_delta = token_dict['Expiration'] - datetime.now(tz=pytz.utc) return t_delta.seconds
@staticmethod def _is_valid_token(token_dict): return datetime.now(tz=pytz.utc) < token_dict['Expiration']
[docs] def has_token(self): token_dict = self._read_token() if not token_dict: return False if not self._is_valid_token(token_dict): self.logger.warning('Token is expired') return False return True
@property def token(self): if not self.has_token(): return None return self._read_token()