Source code for pacifica.ingest.tarutils

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""Utilities and classes for unbundling and archiving a tar file."""
from __future__ import print_function
import tarfile
import json
import hashlib
import time
from six import PY2
import requests
from .utils import get_unique_id
from .config import get_config


[docs]class HashValidationException(Exception): """Class to capture hashsum validation failures.""" pass
[docs]class FileIngester(object): """Class to ingest a single file from a tar file into the file archives.""" fileobj = None file_id = 0 recorded_hash = '' hashval = None server = ''
[docs] def __init__(self, hashtype, hashcode, file_id): """Constructor for FileIngester class.""" if hashtype in hashlib.algorithms_available: self.hashval = getattr(hashlib, hashtype)() else: raise ValueError('Invalid Hashtype {}'.format(hashtype)) self.recorded_hash = hashcode self.server = get_config().get('archiveinterface', 'url') self.file_id = file_id self.session = requests.session() retry_adapter = requests.adapters.HTTPAdapter(max_retries=5) self.session.mount('https://', retry_adapter) self.session.mount('http://', retry_adapter)
[docs] def read(self, size): """Read wrapper for requests that calculates the hashcode inline.""" buf = self.fileobj.read(size) # running checksum self.hashval.update(buf) return buf
[docs] def validate_hash(self): """Validate that the calculated hash matches the hash uploaded in the tar file.""" file_hash = self.hashval.hexdigest() if self.recorded_hash == file_hash: return True return False
[docs] def upload_file_in_file(self, info, tar): """Upload a file from inside a tar file.""" self.fileobj = tar.extractfile(info) size = info.size size_str = str(size) mod_time = time.ctime(info.mtime) self.fileobj.seek(0) url = '{}/{}'.format(self.server, str(self.file_id)) headers = {} headers['Last-Modified'] = mod_time headers['Content-Type'] = 'application/octet-stream' headers['Content-Length'] = size_str # pylint: disable=assignment-from-no-return req = self.session.put( url, data=self, headers=headers ) # pylint: enable=assignment-from-no-return self.fileobj.close() body = req.text ret_dict = json.loads(body) size = int(ret_dict['total_bytes']) if size != info.size: # pragma: no cover return False success = self.validate_hash() print('validated = ' + str(success)) if not success: # roll back upload raise HashValidationException( 'File {} failed to validate.'.format(self.file_id)) return True
[docs]class MetaParser(object): """Class used to hold and search metadata.""" # entire metadata meta = None # a map of filenames to hashcodes files = {} start_id = -999 transaction_id = -999 file_count = -999 meta_str = ''
[docs] def __init__(self): """Constructor.""" self.session = requests.session() retry_adapter = requests.adapters.HTTPAdapter(max_retries=5) self.session.mount('https://', retry_adapter) self.session.mount('http://', retry_adapter)
[docs] def file_obj_count(self, meta_list): """Count the file objects in metadata and keep the count.""" self.file_count = 0 for meta in meta_list: if meta['destinationTable'] == 'Files': self.file_count += 1
[docs] def read_meta(self, metafile, job_id): """Read the metadata from metafile and assume it's good.""" self.transaction_id = job_id meta_list = json.loads(open(metafile).read()) self.file_obj_count(meta_list) self.start_id = get_unique_id(self.file_count, 'file') self.files = {} # all we care about for now is the hash and the file path file_id = self.start_id for meta in meta_list: if meta['destinationTable'] == 'Files': meta['_id'] = file_id self.files[str(file_id)] = meta file_id += 1 trans = {} trans['destinationTable'] = 'Transactions._id' trans['value'] = self.transaction_id meta_list.append(trans) self.meta_str = json.dumps(meta_list, sort_keys=True, indent=4)
[docs] def load_meta(self, tar, job_id): """Load the metadata from a tar file into searchable structures.""" # transaction id is the unique upload job id created by the ingest frontend self.transaction_id = job_id string = tar.extractfile('metadata.txt').read() uni_str = string if PY2 else string.decode('utf8') meta_list = json.loads(uni_str) # get the start index for the file self.file_count = file_count(tar) self.start_id = get_unique_id(self.file_count, 'file') self.files = {} # all we care about for now is the hash and the file path file_id = self.start_id for meta in meta_list: if meta['destinationTable'] == 'Files': meta['_id'] = file_id self.files[str(file_id)] = meta file_id += 1 trans = {} trans['destinationTable'] = 'Transactions._id' trans['value'] = self.transaction_id meta_list.append(trans) self.meta_str = json.dumps(meta_list, sort_keys=True, indent=4)
[docs] def get_hash(self, file_id): """Return the hash string for a file name.""" file_element = self.files[file_id] # remove filetype if there is one file_hash = file_element['hashsum'].replace('sha1:', '') file_hash_type = file_element['hashtype'] return file_hash_type, file_hash
[docs] def get_fname(self, file_id): """Get the file name from the file ID.""" file_element = self.files[file_id] name = file_element['name'] return name
[docs] def get_subdir(self, file_id): """Get the sub directory element from the file ID.""" file_element = self.files[file_id] name = file_element['subdir'] return name
[docs] def clean_metadata(self): """clean /data from filepaths.""" meta_list = json.loads(self.meta_str) for meta in meta_list: if meta['destinationTable'] == 'Files': meta['subdir'] = get_clipped(meta['subdir']) self.meta_str = json.dumps(meta_list, sort_keys=True, indent=4)
[docs] def post_metadata(self): """Upload metadata to server.""" try: self.clean_metadata() ingest_md_url = get_config().get('metadata', 'ingest_url') headers = {'content-type': 'application/json'} # pylint: disable=assignment-from-no-return req = self.session.put( ingest_md_url, headers=headers, data=self.meta_str) # pylint: enable=assignment-from-no-return if req.json()['status'] == 'success': return True, '' # pylint: disable=broad-except except Exception as ex: return False, ex # pylint: enable=broad-except return False, req.json()
[docs]def get_clipped(fname): """Return a file path with the data separator removed.""" parts = fname.split('/') # this is posix tar standard if parts[0] == 'data': del parts[0] parts = [x for x in parts if x] return '/'.join(parts) # this is also posix tar standard
# pylint: disable=too-few-public-methods
[docs]class TarIngester(object): """Class to read a tar file and upload it to the metadata and file archives.""" tar = None meta = None
[docs] def __init__(self, tar, meta): """Constructor for TarIngester class.""" self.tar = tar self.meta = meta
[docs] def ingest(self): """Ingest a tar file into the file archive.""" for file_id in self.meta.files.keys(): file_hash_type, file_hash = self.meta.get_hash(file_id) name = self.meta.get_fname(file_id) path = self.meta.get_subdir(file_id) + '/' + name # this is for posix tar standard path = '/'.join(['data', get_clipped(path)]) uni_path = path.encode('utf-8') if PY2 else path info = self.tar.getmember(uni_path) print(info.name) ingest = FileIngester(file_hash_type, file_hash, file_id) ingest.upload_file_in_file(info, self.tar)
# pylint: enable=too-few-public-methods
[docs]def open_tar(fpath): """Seek to the location of fpath, returns a file stream pointer and file size.""" # check validity if not tarfile.is_tarfile(fpath): return None # open tar file try: tar = tarfile.open(fpath, 'r:') # not sure what exceptions would show up here and not be covered by is_tarfile except tarfile.TarError: # pragma: no cover print('Error opening: ' + fpath) return None return tar
[docs]def patch_files(meta_obj): """Patch the files in the archive interface.""" archive_url = get_config().get('archiveinterface', 'url') session = requests.session() retry_adapter = requests.adapters.HTTPAdapter(max_retries=5) session.mount('https://', retry_adapter) session.mount('http://', retry_adapter) for file_id in meta_obj.files.keys(): data = {'path': meta_obj.files[file_id]['source']} req = session.patch( '{}/{}'.format(archive_url, file_id), headers={'content-type': 'application/json'}, data=json.dumps(data) ) if req.json().get('message') != 'File Moved Successfully': raise Exception(json.dumps(req.json()))
[docs]def file_count(tar): """ Retrieve the file count for a tar file. Does not count metadata.txt as that is not uploaded to the file archive """ members = tar.getmembers() # don't count the metadata.txt file return len(members) - 1