vmcelery.py 1.42 KB
Newer Older
Guba Sándor committed
1
""" Celery module for libvirt RPC calls. """
Guba Sándor committed
2 3
from celery import Celery
from kombu import Queue, Exchange
4
from os import getenv
Guba Sándor committed
5

6 7 8 9 10 11 12 13 14 15 16 17
from argparse import ArgumentParser

parser = ArgumentParser()
parser.add_argument("-n", "--hostname", dest="hostname",
                    help="Define the full queue name with"
                    "with priority", metavar="hostname.queue.priority")
(args, unknwon_args) = parser.parse_known_args()
HOSTNAME = vars(args).pop("hostname")
if HOSTNAME is None:
    raise Exception("You must define hostname as -n <hostname> or "
                    "--hostname=<hostname>.\n"
                    "Hostname format must be hostname.module.priority.")
Guba Sándor committed
18

19
AMQP_URI = getenv('AMQP_URI')
20
CACHE_URI = getenv('CACHE_URI')
21

Guba Sándor committed
22

23 24 25
def to_bool(value):
    return value.lower() in ("true", "yes", "y", "t")

26
# Global configuration parameters declaration
tarokkk committed
27
lib_connection = None
28 29 30 31 32 33 34
native_ovs = False

if to_bool(getenv('LIBVIRT_KEEPALIVE', "False")):
    import libvirt
    lib_connection = libvirt.open(getenv('LIBVIRT_URI'))
if to_bool(getenv('NATIVE_OVS', "False")):
    native_ovs = True
tarokkk committed
35

36
celery = Celery('vmcelery',
37 38
                broker=AMQP_URI,
                include=['vmdriver'])
Guba Sándor committed
39 40

celery.conf.update(
41 42
    CELERY_RESULT_BACKEND='cache',
    CELERY_CACHE_BACKEND=CACHE_URI,
Bach Dániel committed
43
    CELERY_TASK_RESULT_EXPIRES=300,
Guba Sándor committed
44
    CELERY_QUEUES=(
45
        Queue(HOSTNAME, Exchange(
Guba Sándor committed
46 47 48
            'vmdriver', type='direct'), routing_key="vmdriver"),
    )
)