#!/usr/bin/python
# -*- coding: utf-8 -*-
"""ORM for index server."""
from time import sleep
from datetime import datetime
from peewee import Model, OperationalError
from peewee import CharField, IntegerField, BigIntegerField, TextField, DateTimeField, DecimalField
from playhouse.db_url import connect
from pacifica.ingest.config import get_config
SCHEMA_MAJOR = 1
SCHEMA_MINOR = 0
DB = connect(get_config().get('database', 'peewee_url'))
[docs]class OrmSync(object):
"""
Special module for syncing the orm.
This module should incorporate a schema migration strategy.
The supported versions migrating forward must be in a versions array
containing tuples for major and minor versions.
The version tuples are directly translated to method names in the
orm_update class for the update between those versions.
Example Version Control::
class orm_update:
versions = [
(0, 1),
(0, 2),
(1, 0),
(1, 1)
]
def update_0_1_to_0_2():
pass
def update_0_2_to_1_0():
pass
The body of the update should follow peewee migration practices.
http://docs.peewee-orm.com/en/latest/peewee/playhouse.html#migrate
"""
versions = [
(0, 0),
(1, 0)
]
[docs] @staticmethod
def dbconn_blocking():
"""Wait for the db connection."""
dbcon_attempts = get_config().getint('database', 'connect_attempts')
dbcon_wait = get_config().getint('database', 'connect_wait')
while dbcon_attempts:
try:
IngestState.database_connect()
return
except OperationalError:
# couldnt connect, potentially wait and try again
sleep(dbcon_wait)
dbcon_attempts -= 1
raise OperationalError('Failed database connect retry.')
[docs] @classmethod
def update_0_0_to_1_0(cls):
"""Update by adding the boolean column."""
if not IngestState.table_exists():
IngestState.create_table()
[docs] @classmethod
def update_tables(cls):
"""Update the database to the current version."""
verlist = cls.versions
db_ver = IngestStateSystem.get_version()
if verlist.index(verlist[-1]) == verlist.index(db_ver):
# we have the current version don't update
return
with IngestState.atomic():
for db_ver in verlist[verlist.index(db_ver):-1]:
next_db_ver = verlist[verlist.index(db_ver)+1]
method_name = 'update_{}_to_{}'.format(
'{}_{}'.format(*db_ver),
'{}_{}'.format(*next_db_ver)
)
getattr(cls, method_name)()
IngestStateSystem.drop_table()
IngestStateSystem.create_table()
IngestStateSystem.get_or_create_version()
# pylint: disable=too-few-public-methods
[docs]class BaseModel(Model):
"""Auto-generated by pwiz."""
class Meta(object):
"""Map to the database connected above."""
database = DB
[docs]class IngestStateSystem(BaseModel):
"""Ingest State Schema Version Model."""
part = CharField(primary_key=True)
value = IntegerField(default=-1)
[docs] @classmethod
def get_or_create_version(cls):
"""Set or create the current version of the schema."""
if not cls.table_exists():
return (0, 0)
major = cls.get_or_create(part='major', value=SCHEMA_MAJOR)
minor = cls.get_or_create(part='minor', value=SCHEMA_MINOR)
return (major, minor)
[docs] @classmethod
def get_version(cls):
"""Get the current version as a tuple."""
if not cls.table_exists():
return (0, 0)
return (cls.get(part='major').value, cls.get(part='minor').value)
[docs] @classmethod
def is_equal(cls):
"""Check to see if schema version matches code version."""
major, minor = cls.get_version()
return major == SCHEMA_MAJOR and minor == SCHEMA_MINOR
[docs] @classmethod
def is_safe(cls):
"""Check to see if the schema version is safe for the code."""
major, _minor = cls.get_version()
return major == SCHEMA_MAJOR
[docs]class IngestState(BaseModel):
"""Map a python record to a mysql table."""
job_id = BigIntegerField(primary_key=True, column_name='id')
state = CharField()
task = CharField()
task_percent = DecimalField()
exception = TextField(default='')
created = DateTimeField(default=datetime.utcnow)
updated = DateTimeField(default=datetime.utcnow)
[docs] @classmethod
def atomic(cls):
"""Get the atomic context or decorator."""
# pylint: disable=no-member
return cls._meta.database.atomic()
# pylint: enable=no-member
[docs] @classmethod
def database_connect(cls):
"""
Make sure database is connected.
Trying to connect a second
time *does* cause problems.
"""
# pylint: disable=no-member
if not cls._meta.database.is_closed():
cls._meta.database.close()
cls._meta.database.connect()
# pylint: enable=no-member
[docs] @classmethod
def database_close(cls):
"""
Close the database connection.
Closing already closed database
is not a problem, so continue on.
"""
# pylint: disable=no-member
if not cls._meta.database.is_closed():
cls._meta.database.close()
# pylint: enable=no-member
class Meta(object):
"""Map to uniqueindex table."""
table_name = 'ingeststate'
# pylint: enable=too-few-public-methods
[docs]def update_state(job_id, state, task, task_percent, exception=''):
"""Update the state of an ingest job."""
if job_id and int(job_id) >= 0:
IngestState.database_connect()
record = IngestState.get_or_create(job_id=job_id,
defaults={'task': '', 'task_percent': 0, 'state': ''})[0]
record.state = state
record.task = task
record.task_percent = task_percent
record.exception = exception
record.updated = datetime.utcnow()
record.save()
IngestState.database_close()
[docs]def read_state(job_id):
"""Return the state of an ingest job as a json object."""
IngestState.database_connect()
if job_id and job_id >= 0:
record = IngestState.get(job_id=job_id)
else:
record = IngestState()
record.state = 'DATA_ACCESS_ERROR'
record.task = 'read_state'
record.task_percent = 0
IngestState.database_close()
return record