Source code for s3vaultlib.s3.s3fsobject

#!/usr/bin/env python
import copy
import json
import logging
import os

from dpath.util import merge

from .. import __application__

__author__ = "Giuseppe Chiesa"
__copyright__ = "Copyright 2017-2021, Giuseppe Chiesa"
__credits__ = ["Giuseppe Chiesa"]
__license__ = "BSD"
__maintainer__ = "Giuseppe Chiesa"
__email__ = "mail@giuseppechiesa.it"
__status__ = "PerpetualBeta"


[docs]class S3FsObjectException(Exception): pass
[docs]class S3FsObject(object): """ Implement the S3FsObject, an abstraction around a S3 file with SSE encryption """ def __init__(self, data, bucket, path, fs): """ :param data: json metadata from the file :param bucket: bucket :param path: path :param fs: s3 cient """ self.logger = logging.getLogger('{a}.{m}'.format(a=__application__, m=self.__class__.__name__)) self._data = data self._header = {} self._bucket = bucket self._path = path self._raw = None self._is_json = False self._fs = fs """ :type : pyboto3.s3 """ if not self._data.get('Key'): raise S3FsObjectException('Not a valid object') self.name = self._data['Key'].rpartition('/')[-1] @property def kms_arn(self): """ Return the KMS ARN used to encrypt the object :return: KMS ARN :rtype: basestring """ return self._header.get('SSEKMSKeyId', '') @property def is_encrypted(self): """ Return true if the object is encrypted :return: True or False :rtype: bool """ if self.kms_arn: return True return False @property def metadata(self): """ Return the metadata associated with the object (file metadata) :return: medatata :rtype: dict """ if not self._header: self._load_content() metadata = copy.deepcopy(self._header) return metadata def _load_content(self): """ Load the content of the file pointed by S3FsObject :return: content of the file """ object_path = os.path.join(self._path, self.name) try: self._header = self._fs.head_object(Bucket=self._bucket, Key=object_path) except Exception: self.logger.exception('Exception while fetching header for key: {k}'.format(k=object_path)) raise response = self._fs.get_object(Bucket=self._bucket, Key=object_path) if not response.get('Body'): raise S3FsObjectException('Unable to read the file content for key: {k}'.format(k=object_path)) self._raw = bytes(response['Body'].read()) return self._raw
[docs] @staticmethod def is_json(data): """ Return True if the content is a valid json :param data: content to evaluate :return: True or False :rtype: bool """ try: json.loads(data) except ValueError: return False return True
def __getitem__(self, key): """ Overrides the getitem method :param key: :return: """ if not self._raw: self._load_content() if not self.is_json(self._raw): raise KeyError(key) json_data = json.loads(self._raw) return self._get_value(json_data, key) @staticmethod def _set_value(d, path, value): if path == '.': d = value return d levels = path.split('.') leaf_key = levels.pop() tmp_dict = {leaf_key: value} for key in reversed(levels): tmp_dict = {key: tmp_dict} merge(d, tmp_dict) return d @staticmethod def _get_value(d, path): k, _, rest = path.partition('.') if not rest: return d[k] if not isinstance(d[k], dict): raise KeyError('{} is a leaf value, not a dict'.format(d[k])) return S3FsObject._get_value(d[k], rest) def __setitem__(self, key, value): """ Overrides the setitem method :param key: :param value: :return: """ if not self._raw: self._load_content() if not self.is_json(self._raw): raise KeyError(key) json_data = json.loads(self._raw) # if the key contains . separator then we assume the key is a nested key and we allocate the entire path json_data = self._set_value(json_data, key, value) self._raw = json.dumps(json_data).encode() def __getattr__(self, item): """ Override the setattr method :param item: :return: """ try: return self.__getitem__(item) except KeyError: raise AttributeError(item) def __str__(self): """ Override the str method :return: """ if not self._raw: self._load_content() return self._raw.decode()
[docs] def raw(self): if not self._raw: self._load_content() return self._raw