#!/usr/bin/python
# -*- coding: utf-8 -*-
"""Module that contains all the amqp tasks that support the ingest infrastructure."""
from __future__ import absolute_import, print_function
import os
import traceback
import requests
from celery import Celery
from .tarutils import open_tar, MetaParser, TarIngester, patch_files
from .orm import update_state
from .config import get_config
INGEST_APP = Celery(
'ingest',
broker=get_config().get('celery', 'broker_url'),
backend=get_config().get('celery', 'backend_url')
)
[docs]class IngestException(Exception):
"""Ingest class exception."""
pass
[docs]def ingest_check_tarfile(job_id, filepath):
"""Check the ingest tarfile and return state or set it properly."""
update_state(job_id, 'OK', 'open tar', 0)
tar = open_tar(filepath)
if tar is None:
update_state(job_id, 'FAILED', 'open tar',
0, 'Failed to open tarfile.')
raise IngestException()
update_state(job_id, 'OK', 'open tar', 100)
return tar
[docs]def ingest_policy_check(job_id, meta_str):
"""Ingest check to validate metadata at policy."""
success, exception = validate_meta(meta_str)
if not success:
update_state(job_id, 'FAILED', 'Policy Validation', 0, exception)
raise IngestException()
update_state(job_id, 'OK', 'Policy Validation', 100)
[docs]def move_files(job_id, meta_obj):
"""Move the files to the archive interface."""
update_state(job_id, 'OK', 'move files', 0)
try:
patch_files(meta_obj)
# pylint: disable=broad-except
except Exception as ex:
# rollback files
stack_dump = traceback.format_exc()
update_state(job_id, 'FAILED', 'move files', 0,
u'{}\n{}'.format(stack_dump, str(ex)))
raise IngestException()
update_state(job_id, 'OK', 'move files', 100)
[docs]def ingest_files(job_id, ingest_obj):
"""Ingest the files to the archive interface."""
update_state(job_id, 'OK', 'ingest files', 0)
try:
ingest_obj.ingest()
# pylint: disable=broad-except
except Exception as ex:
# rollback files
stack_dump = traceback.format_exc()
update_state(job_id, 'FAILED', 'ingest files', 0,
u'{}\n{}'.format(stack_dump, str(ex)))
raise IngestException()
update_state(job_id, 'OK', 'ingest files', 100)
@INGEST_APP.task(ignore_result=False)
def move(job_id, filepath):
"""Move a MD bundle into the archive."""
try:
meta = move_metadata_parser(job_id, filepath)
ingest_policy_check(job_id, meta.meta_str)
move_files(job_id, meta)
ingest_metadata(job_id, meta)
os.unlink(filepath)
except IngestException:
return
@INGEST_APP.task(ignore_result=False)
def ingest(job_id, filepath):
"""Ingest a tar bundle into the archive."""
try:
tar = ingest_check_tarfile(job_id, filepath)
meta = ingest_metadata_parser(job_id, tar)
ingest_obj = TarIngester(tar, meta)
ingest_policy_check(job_id, meta.meta_str)
ingest_files(job_id, ingest_obj)
ingest_metadata(job_id, meta)
tar.close()
os.unlink(filepath)
except IngestException:
return
# pylint: enable=broad-except