Source code for pacifica.ingest.tasks

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""Module that contains all the amqp tasks that support the ingest infrastructure."""
from __future__ import absolute_import, print_function
import os
import traceback
import requests
from celery import Celery
from .tarutils import open_tar, MetaParser, TarIngester, patch_files
from .orm import update_state
from .config import get_config


INGEST_APP = Celery(
    'ingest',
    broker=get_config().get('celery', 'broker_url'),
    backend=get_config().get('celery', 'backend_url')
)


[docs]class IngestException(Exception): """Ingest class exception.""" pass
[docs]def ingest_check_tarfile(job_id, filepath): """Check the ingest tarfile and return state or set it properly.""" update_state(job_id, 'OK', 'open tar', 0) tar = open_tar(filepath) if tar is None: update_state(job_id, 'FAILED', 'open tar', 0, 'Failed to open tarfile.') raise IngestException() update_state(job_id, 'OK', 'open tar', 100) return tar
[docs]def move_metadata_parser(job_id, metafile): """Ingest the metadata and set the state appropriately.""" update_state(job_id, 'OK', 'load metadata', 0) meta = MetaParser() meta.read_meta(metafile, job_id) update_state(job_id, 'OK', 'load metadata', 100) return meta
[docs]def ingest_metadata_parser(job_id, tar): """Ingest the metadata and set the state appropriately.""" update_state(job_id, 'OK', 'load metadata', 0) meta = MetaParser() try: meta.load_meta(tar, job_id) # pylint: disable=broad-except except Exception as ex: update_state(job_id, 'FAILED', 'load metadata', 0, str(ex)) raise IngestException() update_state(job_id, 'OK', 'load metadata', 100) return meta
[docs]def ingest_policy_check(job_id, meta_str): """Ingest check to validate metadata at policy.""" success, exception = validate_meta(meta_str) if not success: update_state(job_id, 'FAILED', 'Policy Validation', 0, exception) raise IngestException() update_state(job_id, 'OK', 'Policy Validation', 100)
[docs]def move_files(job_id, meta_obj): """Move the files to the archive interface.""" update_state(job_id, 'OK', 'move files', 0) try: patch_files(meta_obj) # pylint: disable=broad-except except Exception as ex: # rollback files stack_dump = traceback.format_exc() update_state(job_id, 'FAILED', 'move files', 0, u'{}\n{}'.format(stack_dump, str(ex))) raise IngestException() update_state(job_id, 'OK', 'move files', 100)
[docs]def ingest_files(job_id, ingest_obj): """Ingest the files to the archive interface.""" update_state(job_id, 'OK', 'ingest files', 0) try: ingest_obj.ingest() # pylint: disable=broad-except except Exception as ex: # rollback files stack_dump = traceback.format_exc() update_state(job_id, 'FAILED', 'ingest files', 0, u'{}\n{}'.format(stack_dump, str(ex))) raise IngestException() update_state(job_id, 'OK', 'ingest files', 100)
[docs]def ingest_metadata(job_id, meta): """Ingest metadata to the metadata service.""" update_state(job_id, 'OK', 'ingest metadata', 0) success, exception = meta.post_metadata() if not success: # rollback files update_state(job_id, 'FAILED', 'ingest metadata', 0, str(exception)) raise IngestException() update_state(job_id, 'OK', 'ingest metadata', 100)
@INGEST_APP.task(ignore_result=False) def move(job_id, filepath): """Move a MD bundle into the archive.""" try: meta = move_metadata_parser(job_id, filepath) ingest_policy_check(job_id, meta.meta_str) move_files(job_id, meta) ingest_metadata(job_id, meta) os.unlink(filepath) except IngestException: return @INGEST_APP.task(ignore_result=False) def ingest(job_id, filepath): """Ingest a tar bundle into the archive.""" try: tar = ingest_check_tarfile(job_id, filepath) meta = ingest_metadata_parser(job_id, tar) ingest_obj = TarIngester(tar, meta) ingest_policy_check(job_id, meta.meta_str) ingest_files(job_id, ingest_obj) ingest_metadata(job_id, meta) tar.close() os.unlink(filepath) except IngestException: return
[docs]def validate_meta(meta_str): """Validate metadata.""" try: ingest_policy_url = get_config().get('policy', 'ingest_url') headers = {'content-type': 'application/json'} session = requests.session() retry_adapter = requests.adapters.HTTPAdapter(max_retries=5) session.mount('http://', retry_adapter) session.mount('https://', retry_adapter) req = session.post(ingest_policy_url, headers=headers, data=meta_str) req_json = req.json() if req_json['status'] == 'success': return True, '' return False, '{} ({})'.format(req_json['message'], req_json['traceback']) # pylint: disable=broad-except except Exception as ex: return False, ex
# pylint: enable=broad-except