Source code for tesk.services.filer_s3

import logging
import os
import re
import sys

import boto3
import botocore

from tesk.services.transput import Transput, Type


[docs] class S3Transput(Transput): def __init__(self, path, url, ftype): Transput.__init__(self, path, url, ftype) self.bucket, self.file_path = self.get_bucket_name_and_file_path() self.bucket_obj = None def __enter__(self): client = boto3.resource('s3', endpoint_url=self.extract_endpoint()) if self.check_if_bucket_exists(client): sys.exit(1) self.bucket_obj = client.Bucket(self.bucket) return self
[docs] def extract_endpoint(self): return boto3.client('s3').meta.endpoint_url
[docs] def check_if_bucket_exists(self, client): try: client.meta.client.head_bucket(Bucket=self.bucket) except botocore.exceptions.ClientError as e: # If a client error is thrown, then check that it was a 404 error. # If it was a 404 error, then the bucket does not exist. logging.error('Got status code: %s', e.response['Error']['Code']) if e.response['Error']['Code'] == '404': logging.error( 'Failed to fetch Bucket, reason: %s', e.response['Error']['Message'] ) return 1 return 0
[docs] def get_bucket_name_and_file_path(self): """ If the S3 url is similar to s3://idr-bucket-1/README.txt format """ bucket = self.netloc file_path = self.url_path[1:] return bucket, file_path
[docs] def download_file(self): logging.debug( 'Downloading s3 object: "%s" Target: %s', f'{self.bucket}/{self.file_path}', self.path, ) basedir = os.path.dirname(self.path) os.makedirs(basedir, exist_ok=True) return self.get_s3_file(self.path, self.file_path)
[docs] def upload_file(self): logging.debug( 'Uploading s3 object: "%s" Target: %s', self.path, f'{self.bucket}/{self.file_path}', ) try: self.bucket_obj.upload_file(Filename=self.path, Key=self.file_path) except (botocore.exceptions.ClientError, OSError) as err: logging.error( "File upload failed for '%s'", f'{self.bucket}/{self.file_path}' ) logging.error(err) return 1 return 0
[docs] def upload_dir(self): logging.debug( 'Uploading s3 object: "%s" Target: %s', self.path, f'{self.bucket}/{self.file_path}', ) try: for item in os.listdir(self.path): path = os.path.join(self.path, item) if os.path.isdir(path): file_type = Type.Directory elif os.path.isfile(path): file_type = Type.File else: # An exception is raised, if the object type is neither # file or directory logging.error("Object is neither file or directory : '%s' ", path) raise OSError file_path = os.path.join(self.url, item) with S3Transput(path, file_path, file_type) as transfer: if transfer.upload(): return 1 except OSError as err: logging.error( "File upload failed for '%s'", f'{self.bucket}/{self.file_path}' ) logging.error(err) return 1 return 0
[docs] def download_dir(self): logging.debug( 'Downloading s3 object: "%s" Target: %s', f'{self.bucket}/{self.file_path}', self.path, ) client = boto3.client('s3', endpoint_url=self.extract_endpoint()) if not self.file_path.endswith('/'): self.file_path += '/' objects = client.list_objects_v2(Bucket=self.bucket, Prefix=self.file_path) # If the file path does not exists in s3 bucket, 'Contents' # key will not be present in objects if 'Contents' not in objects: logging.error('Got status code: %s', 404) logging.error('Invalid file path!.') return 1 # Looping through the list of objects and downloading them for obj in objects['Contents']: file_name = os.path.basename(obj['Key']) dir_name = os.path.dirname(obj['Key']) path_to_create = re.sub( r'^' + self.file_path.strip('/').replace('/', '\/') + '', '', dir_name ).strip('/') path_to_create = os.path.join(self.path, path_to_create) os.makedirs(path_to_create, exist_ok=True) if self.get_s3_file(os.path.join(path_to_create, file_name), obj['Key']): return 1 return 0
[docs] def get_s3_file(self, file_name, key): try: self.bucket_obj.download_file(Filename=file_name, Key=key) except botocore.exceptions.ClientError as err: logging.error('Got status code: %s', err.response['Error']['Code']) logging.error(err.response['Error']['Message']) return 1 return 0