diff --git a/circle/vm/models/node.py b/circle/vm/models/node.py index 07abebf..e44f21a 100644 --- a/circle/vm/models/node.py +++ b/circle/vm/models/node.py @@ -16,10 +16,11 @@ from firewall.models import Host from ..tasks import vm_tasks from .common import Trait -from .activity import node_activity +from .activity import node_activity, NodeActivity from monitor.calvin.calvin import Query from monitor.calvin.calvin import GraphiteHandler +from django.utils import timezone logger = getLogger(__name__) @@ -56,8 +57,14 @@ class Node(TimeStampedModel): @method_cache(10, 5) def get_online(self): + """Check if the node is online. - return self.remote_query(vm_tasks.ping, timeout=1, default=False) + Runs a remote ping task if the worker is running. + """ + try: + return self.remote_query(vm_tasks.ping, timeout=1, default=False) + except WorkerNotFound: + return False online = property(get_online) @@ -66,15 +73,14 @@ class Node(TimeStampedModel): """Number of CPU threads available to the virtual machines. """ - return self.remote_query(vm_tasks.get_core_num) + return self.remote_query(vm_tasks.get_core_num, default=0) num_cores = property(get_num_cores) @property def state(self): - """Node state. + """The state combined of online and enabled attributes. """ - if self.enabled and self.online: return 'ONLINE' elif self.enabled and not self.online: @@ -86,22 +92,25 @@ class Node(TimeStampedModel): def disable(self, user=None): ''' Disable the node.''' - with node_activity(code_suffix='disable', node=self, user=user): - self.enabled = False - self.save() + if self.enabled is True: + with node_activity(code_suffix='disable', node=self, user=user): + self.enabled = False + self.save() def enable(self, user=None): ''' Enable the node. ''' - with node_activity(code_suffix='enable', node=self, user=user): - self.enabled = True - self.save() + if self.enabled is not True: + with node_activity(code_suffix='enable', node=self, user=user): + self.enabled = True + self.save() + self.get_num_cores(invalidate_cache=True) + self.get_ram_size(invalidate_cache=True) @method_cache(300) def get_ram_size(self): """Bytes of total memory in the node. """ - - return self.remote_query(vm_tasks.get_ram_size) + return self.remote_query(vm_tasks.get_ram_size, default=0) ram_size = property(get_ram_size) @@ -113,25 +122,77 @@ class Node(TimeStampedModel): @method_cache(30) def get_remote_queue_name(self, queue_id): - """ Return the remote queue name + """Return the name of the remote celery queue for this node. + throws Exception if there is no worker on the queue. - Until the cache provide reult there can be dead quques. + Until the cache provide reult there can be dead queues. """ + if vm_tasks.check_queue(self.host.hostname, queue_id): + self.node_online() return self.host.hostname + "." + queue_id else: + if self.enabled is True: + self.node_offline() raise WorkerNotFound() + def node_online(self): + """Create activity and log entry when node reappears. + """ + + try: + act = self.activity_log.order_by('-pk')[0] + except IndexError: + pass # no monitoring activity at all + else: + logger.debug("The last activity was %s" % act) + if act.activity_code.endswith("offline"): + act = NodeActivity.create(code_suffix='monitor_succes_online', + node=self, user=None) + act.started = timezone.now() + act.finished = timezone.now() + act.succeeded = True + act.save() + logger.info("Node %s is ONLINE." % self.name) + self.get_num_cores(invalidate_cache=True) + self.get_ram_size(invalidate_cache=True) + + def node_offline(self): + """Called when a node disappears. + + If the node is not already offline, record an activity and a log entry. + """ + + try: + act = self.activity_log.order_by('-pk')[0] + except IndexError: + pass # no activity at all + else: + logger.debug("The last activity was %s" % act) + if act.activity_code.endswith("offline"): + return + act = NodeActivity.create(code_suffix='monitor_failed_offline', + node=self, user=None) + act.started = timezone.now() + act.finished = timezone.now() + act.succeeded = False + act.save() + logger.critical("Node %s is OFFLINE%s.", self.name, + ", but enabled" if self.enabled else "") + # TODO: check if we should reschedule any VMs? + def remote_query(self, task, timeout=30, raise_=False, default=None): """Query the given task, and get the result. - If the result is not ready in timeout secs, return default value or - raise a TimeoutError.""" - r = task.apply_async( - queue=self.get_remote_queue_name('vm'), expires=timeout + 60) + If the result is not ready or worker not reachable + in timeout secs, return default value or raise a + TimeoutError or WorkerNotFound exception. + """ try: + r = task.apply_async( + queue=self.get_remote_queue_name('vm'), expires=timeout + 60) return r.get(timeout=timeout) - except TimeoutError: + except (TimeoutError, WorkerNotFound): if raise_: raise else: @@ -175,6 +236,11 @@ class Node(TimeStampedModel): return float(self.get_monitor_info()["memory.usage"]) / 100 def update_vm_states(self): + """Update state of Instances running on this Node. + + Query state of all libvirt domains, and notify Instances by their + vm_state_changed hook. + """ domains = {} domain_list = self.remote_query(vm_tasks.list_domains_info, timeout=5) if domain_list is None: