node.py 10.3 KB
Newer Older
1
from __future__ import absolute_import, unicode_literals
Őry Máté committed
2 3 4
from logging import getLogger

from django.db.models import (
5
    CharField, IntegerField, ForeignKey, BooleanField, ManyToManyField,
6
    FloatField, permalink,
Őry Máté committed
7 8 9 10 11 12 13
)
from django.utils.translation import ugettext_lazy as _

from celery.exceptions import TimeoutError
from model_utils.models import TimeStampedModel
from taggit.managers import TaggableManager

14
from common.models import method_cache, WorkerNotFound
Őry Máté committed
15 16
from firewall.models import Host
from ..tasks import vm_tasks
17
from .common import Trait
Őry Máté committed
18

19
from .activity import node_activity, NodeActivity
20

Gregory Nagy committed
21 22
from monitor.calvin.calvin import Query
from monitor.calvin.calvin import GraphiteHandler
23
from django.utils import timezone
24

Őry Máté committed
25 26 27
logger = getLogger(__name__)


28 29 30 31 32 33 34 35 36 37 38
def node_available(function):
    """Decorate methods to ignore disabled Nodes.
    """
    def decorate(self, *args, **kwargs):
        if self.enabled is True and self.online is True:
            return function(self, *args, **kwargs)
        else:
            return None
    return decorate


Őry Máté committed
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
class Node(TimeStampedModel):

    """A VM host machine, a hypervisor.
    """
    name = CharField(max_length=50, unique=True,
                     verbose_name=_('name'),
                     help_text=_('Human readable name of node.'))
    priority = IntegerField(verbose_name=_('priority'),
                            help_text=_('Node usage priority.'))
    host = ForeignKey(Host, verbose_name=_('host'),
                      help_text=_('Host in firewall.'))
    enabled = BooleanField(verbose_name=_('enabled'), default=False,
                           help_text=_('Indicates whether the node can '
                                       'be used for hosting.'))
    traits = ManyToManyField(Trait, blank=True,
                             help_text=_("Declared traits."),
                             verbose_name=_('traits'))
    tags = TaggableManager(blank=True, verbose_name=_("tags"))
    overcommit = FloatField(default=1.0, verbose_name=_("overcommit ratio"),
                            help_text=_("The ratio of total memory with "
                                        "to without overcommit."))

    class Meta:
        app_label = 'vm'
        db_table = 'vm_node'
        permissions = ()

Őry Máté committed
66 67 68
    def __unicode__(self):
        return self.name

Őry Máté committed
69
    @method_cache(10, 5)
70
    def get_online(self):
71
        """Check if the node is online.
Őry Máté committed
72

73 74 75 76 77 78
        Runs a remote ping task if the worker is running.
        """
        try:
            return self.remote_query(vm_tasks.ping, timeout=1, default=False)
        except WorkerNotFound:
            return False
Őry Máté committed
79

80 81
    online = property(get_online)

82
    @node_available
Őry Máté committed
83
    @method_cache(300)
84
    def get_num_cores(self):
Őry Máté committed
85 86
        """Number of CPU threads available to the virtual machines.
        """
87

88
        return self.remote_query(vm_tasks.get_core_num, default=0)
89

90 91
    num_cores = property(get_num_cores)

92 93
    @property
    def state(self):
94
        """The state combined of online and enabled attributes.
95
        """
96
        if self.enabled and self.online:
97
            return 'ONLINE'
98
        elif self.enabled and not self.online:
99
            return 'MISSING'
100
        elif not self.enabled and self.online:
101
            return 'DISABLED'
102
        else:
103
            return 'OFFLINE'
104 105 106

    def disable(self, user=None):
        ''' Disable the node.'''
107 108 109 110
        if self.enabled is True:
            with node_activity(code_suffix='disable', node=self, user=user):
                self.enabled = False
                self.save()
111 112 113

    def enable(self, user=None):
        ''' Enable the node. '''
114 115 116 117 118 119
        if self.enabled is not True:
            with node_activity(code_suffix='enable', node=self, user=user):
                self.enabled = True
                self.save()
            self.get_num_cores(invalidate_cache=True)
            self.get_ram_size(invalidate_cache=True)
Őry Máté committed
120

121
    @node_available
Őry Máté committed
122
    @method_cache(300)
123
    def get_ram_size(self):
Őry Máté committed
124 125
        """Bytes of total memory in the node.
        """
126
        return self.remote_query(vm_tasks.get_ram_size, default=0)
Őry Máté committed
127

128 129
    ram_size = property(get_ram_size)

Őry Máté committed
130
    @property
131
    @node_available
Őry Máté committed
132 133 134 135 136
    def ram_size_with_overcommit(self):
        """Bytes of total memory including overcommit margin.
        """
        return self.ram_size * self.overcommit

137
    @method_cache(30)
Őry Máté committed
138
    def get_remote_queue_name(self, queue_id):
139 140
        """Return the name of the remote celery queue for this node.

141
        throws Exception if there is no worker on the queue.
142
        Until the cache provide reult there can be dead queues.
143
        """
144

145
        if vm_tasks.check_queue(self.host.hostname, queue_id):
146
            self.node_online()
147 148
            return self.host.hostname + "." + queue_id
        else:
149 150
            if self.enabled is True:
                self.node_offline()
151
            raise WorkerNotFound()
Őry Máté committed
152

153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
    def node_online(self):
        """Create activity and log entry when node reappears.
        """

        try:
            act = self.activity_log.order_by('-pk')[0]
        except IndexError:
            pass  # no monitoring activity at all
        else:
            logger.debug("The last activity was %s" % act)
            if act.activity_code.endswith("offline"):
                act = NodeActivity.create(code_suffix='monitor_succes_online',
                                          node=self, user=None)
                act.started = timezone.now()
                act.finished = timezone.now()
                act.succeeded = True
                act.save()
                logger.info("Node %s is ONLINE." % self.name)
                self.get_num_cores(invalidate_cache=True)
                self.get_ram_size(invalidate_cache=True)

    def node_offline(self):
        """Called when a node disappears.

        If the node is not already offline, record an activity and a log entry.
        """

        try:
            act = self.activity_log.order_by('-pk')[0]
        except IndexError:
            pass  # no activity at all
        else:
            logger.debug("The last activity was %s" % act)
            if act.activity_code.endswith("offline"):
                return
        act = NodeActivity.create(code_suffix='monitor_failed_offline',
                                  node=self, user=None)
        act.started = timezone.now()
        act.finished = timezone.now()
        act.succeeded = False
        act.save()
        logger.critical("Node %s is OFFLINE%s.", self.name,
                        ", but enabled" if self.enabled else "")
        # TODO: check if we should reschedule any VMs?

198
    def remote_query(self, task, timeout=30, raise_=False, default=None):
Őry Máté committed
199 200
        """Query the given task, and get the result.

201 202 203 204
        If the result is not ready or worker not reachable
        in timeout secs, return default value or raise a
        TimeoutError or WorkerNotFound exception.
        """
Őry Máté committed
205
        try:
206 207
            r = task.apply_async(
                queue=self.get_remote_queue_name('vm'), expires=timeout + 60)
Őry Máté committed
208
            return r.get(timeout=timeout)
209
        except (TimeoutError, WorkerNotFound):
Őry Máté committed
210 211 212 213 214
            if raise_:
                raise
            else:
                return default

215
    @node_available
216
    def get_monitor_info(self):
217 218
        try:
            handler = GraphiteHandler()
219 220 221
        except RuntimeError:
            return self.remote_query(vm_tasks.get_node_metrics, 30)

222
        query = Query()
Gregory Nagy committed
223 224 225
        query.set_target(self.host.hostname + ".circle")
        query.set_format("json")
        query.set_relative_start(5, "minutes")
226

227
        metrics = ["cpu.usage", "memory.usage"]
228
        for metric in metrics:
Gregory Nagy committed
229
            query.set_metric(metric)
230 231 232
            query.generate()
            handler.put(query)
            handler.send()
233 234

        collected = {}
235
        for metric in metrics:
Gregory Nagy committed
236
            response = handler.pop()
237 238 239 240
            try:
                cache = response[0]["datapoints"][-1][0]
            except (IndexError, KeyError):
                cache = 0
241 242 243 244 245
            if cache is None:
                cache = 0
            collected[metric] = cache
        return collected

246
    @property
247
    @node_available
248
    def cpu_usage(self):
249
        return float(self.get_monitor_info()["cpu.usage"]) / 100
250

251
    @property
252
    @node_available
253
    def ram_usage(self):
254
        return float(self.get_monitor_info()["memory.usage"]) / 100
255

256
    @property
257
    @node_available
258 259 260
    def byte_ram_usage(self):
        return self.ram_usage * self.ram_size

261
    @node_available
Őry Máté committed
262
    def update_vm_states(self):
263 264 265 266 267
        """Update state of Instances running on this Node.

        Query state of all libvirt domains, and notify Instances by their
        vm_state_changed hook.
        """
Őry Máté committed
268
        domains = {}
269 270 271 272 273
        domain_list = self.remote_query(vm_tasks.list_domains_info, timeout=5)
        if domain_list is None:
            logger.info("Monitoring failed at: %s", self.name)
            return
        for i in domain_list:
Őry Máté committed
274 275 276 277 278 279 280 281
            # [{'name': 'cloud-1234', 'state': 'RUNNING', ...}, ...]
            try:
                id = int(i['name'].split('-')[1])
            except:
                pass  # name format doesn't match
            else:
                domains[id] = i['state']

282 283
        instances = [{'id': i.id, 'state': i.state}
                     for i in self.instance_set.order_by('id').all()]
Őry Máté committed
284 285 286 287 288 289
        for i in instances:
            try:
                d = domains[i['id']]
            except KeyError:
                logger.info('Node %s update: instance %s missing from '
                            'libvirt', self, i['id'])
290 291
                # Set state to STOPPED when instance is missing
                self.instance_set.get(id=i['id']).vm_state_changed('STOPPED')
Őry Máté committed
292 293 294 295 296
            else:
                if d != i['state']:
                    logger.info('Node %s update: instance %s state changed '
                                '(libvirt: %s, db: %s)',
                                self, i['id'], d, i['state'])
297
                    self.instance_set.get(id=i['id']).vm_state_changed(d)
Őry Máté committed
298 299 300 301 302

                del domains[i['id']]
        for i in domains.keys():
            logger.info('Node %s update: domain %s in libvirt but not in db.',
                        self, i)
303 304 305

    @classmethod
    def get_state_count(cls, online, enabled):
306 307
        return len([1 for i in cls.objects.filter(enabled=enabled).all()
                    if i.online == online])
308 309 310 311

    @permalink
    def get_absolute_url(self):
        return ('dashboard.views.node-detail', None, {'pk': self.id})