local_tasks.py 3.92 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
# Copyright 2014 Budapest University of Technology and Economics (BME IK)
#
# This file is part of CIRCLE Cloud.
#
# CIRCLE is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along
# with CIRCLE.  If not, see <http://www.gnu.org/licenses/>.

18 19 20
from logging import getLogger
from socket import gethostname

21
import django.conf
22
from django.core.cache import cache
23
from celery.exceptions import TimeoutError
24

25
# from manager.mancelery import celery
26
from common.models import WorkerNotFound
27

28
settings = django.conf.settings.FIREWALL_SETTINGS
29
logger = getLogger(__name__)
30

31

32
def _apply_once(name, tasks, queues, task, data):
33 34
    """Reload given networking component if needed.
    """
35

36
    if name not in tasks:
37 38
        return

39
    data = data()
40
    for queue in queues:
41
        try:
42
            task.apply_async(args=data, queue=queue, expires=60).get(timeout=2)
43 44 45
            logger.info("%s configuration is reloaded. (queue: %s)",
                        name, queue)
        except TimeoutError as e:
46
            logger.critical('%s (queue: %s, task: %s)', e, queue, name)
47
        except:
48 49
            logger.critical('Unhandled exception: queue: %s data: %s task: %s',
                            queue, data, name, exc_info=True)
50 51 52 53 54 55 56 57 58 59 60


def get_firewall_queues():
    from firewall.models import Firewall
    retval = []
    for fw in Firewall.objects.all():
        try:
            retval.append(fw.get_remote_queue_name('firewall'))
        except WorkerNotFound:
            logger.critical('Firewall %s is offline', fw.name)
    return list(retval)
61

62

63
#@celery.task
64
def reloadtask_worker():
65
    from firewall.fw import BuildFirewall, dhcp, dns, ipset, vlan
66 67
    from remote_tasks import (reload_dns, reload_dhcp, reload_firewall,
                              reload_firewall_vlan, reload_blacklist)
68

69 70 71 72 73 74 75 76 77
    tasks = []
    for i in ('dns', 'dhcp', 'firewall', 'firewall_vlan', 'blacklist'):
        lockname = "%s_lock" % i
        if cache.get(lockname):
            tasks.append(i)
        cache.delete(lockname)

    logger.info("reloadtask_worker: Reload %s", ", ".join(tasks))

78
    firewall_queues = get_firewall_queues()
79 80
    dns_queues = [("%s.dns" % i) for i in
                  settings.get('dns_queues', [gethostname()])]
81

82
    _apply_once('dns', tasks, dns_queues, reload_dns,
83
                lambda: (dns(), ))
84
    _apply_once('dhcp', tasks, firewall_queues, reload_dhcp,
85
                lambda: (dhcp(), ))
86
    _apply_once('firewall', tasks, firewall_queues, reload_firewall,
87
                lambda: (BuildFirewall().build_ipt()))
88
    _apply_once('firewall_vlan', tasks, firewall_queues, reload_firewall_vlan,
89
                lambda: (vlan(), ))
90
    _apply_once('blacklist', tasks, firewall_queues, reload_blacklist,
91
                lambda: (list(ipset()), ))
92

93

94
#@celery.task
95
def reloadtask(type='Host', timeout=15, sync=False):
96 97 98 99 100 101 102 103 104
    reload = {
        'Host': ['dns', 'dhcp', 'firewall'],
        'Record': ['dns'],
        'Domain': ['dns'],
        'Vlan': ['dns', 'dhcp', 'firewall', 'firewall_vlan'],
        'Firewall': ['firewall'],
        'Rule': ['firewall'],
        'SwitchPort': ['firewall_vlan'],
        'EthernetDevice': ['firewall_vlan'],
105
        'BlacklistItem': ['blacklist'],
106 107 108
    }[type]
    logger.info("Reload %s on next periodic iteration applying change to %s.",
                ", ".join(reload), type)
Bach Dániel committed
109
    if all([cache.add("%s_lock" % i, 'true', 30) for i in reload]):
110
        res = reloadtask_worker.apply_async(queue='localhost.man', countdown=5)
111
        if sync:
112
            res.get(timeout)