From ac1467650d2bb2cee01207f1555ea4726916c187 Mon Sep 17 00:00:00 2001
From: Dudás Ádám <dudas.adam@cloud.bme.hu>
Date: Wed, 2 Apr 2014 15:18:55 +0200
Subject: [PATCH] vm: make flush into operation

---
 circle/vm/models/node.py       | 33 ++++++++-------------------------
 circle/vm/operations.py        | 38 ++++++++++++++++++++++++++++++++++++--
 circle/vm/tasks/local_tasks.py | 13 +++++++++++--
 circle/vm/tests/test_models.py | 30 ++++++++++++++++--------------
 4 files changed, 71 insertions(+), 43 deletions(-)

diff --git a/circle/vm/models/node.py b/circle/vm/models/node.py
index c317eaf..95a208f 100644
--- a/circle/vm/models/node.py
+++ b/circle/vm/models/node.py
@@ -1,28 +1,27 @@
 from __future__ import absolute_import, unicode_literals
 from logging import getLogger
+from warnings import warn
 
 from django.db.models import (
     CharField, IntegerField, ForeignKey, BooleanField, ManyToManyField,
     FloatField, permalink,
 )
+from django.utils import timezone
 from django.utils.translation import ugettext_lazy as _
 
-from warnings import warn
-
 from celery.exceptions import TimeoutError
 from model_utils.models import TimeStampedModel
 from taggit.managers import TaggableManager
 
 from common.models import method_cache, WorkerNotFound, HumanSortField
+from common.operations import OperatedMixin
 from firewall.models import Host
-from ..tasks import vm_tasks, local_tasks
-from .common import Trait
-
-from .activity import node_activity, NodeActivity
-
 from monitor.calvin.calvin import Query
 from monitor.calvin.calvin import GraphiteHandler
-from django.utils import timezone
+from ..tasks import vm_tasks
+from .activity import node_activity, NodeActivity
+from .common import Trait
+
 
 logger = getLogger(__name__)
 
@@ -38,7 +37,7 @@ def node_available(function):
     return decorate
 
 
-class Node(TimeStampedModel):
+class Node(OperatedMixin, TimeStampedModel):
 
     """A VM host machine, a hypervisor.
     """
@@ -131,22 +130,6 @@ class Node(TimeStampedModel):
                 self.enabled = False
                 self.save()
 
-    def flush(self, user=None, task_uuid=None):
-        """Disable node and move all instances to other ones.
-        """
-        with node_activity('flush', node=self, user=user,
-                           task_uuid=task_uuid) as act:
-            self.disable(user, act)
-            for i in self.instance_set.all():
-                with act.sub_activity('migrate_instance_%d' % i.pk):
-                    i.migrate()
-
-    def flush_async(self, user=None):
-        """Execute flush asynchronously.
-        """
-        return local_tasks.flush.apply_async(args=[self, user],
-                                             queue="localhost.man")
-
     def enable(self, user=None):
         ''' Enable the node. '''
         if self.enabled is not True:
diff --git a/circle/vm/operations.py b/circle/vm/operations.py
index 59c15d6..ecc2e67 100644
--- a/circle/vm/operations.py
+++ b/circle/vm/operations.py
@@ -11,8 +11,10 @@ from celery.exceptions import TimeLimitExceeded
 from common.operations import Operation, register_operation
 from storage.models import Disk
 from .tasks import vm_tasks
-from .tasks.local_tasks import async_instance_operation
-from .models import Instance, InstanceActivity, InstanceTemplate
+from .tasks.local_tasks import async_instance_operation, async_node_operation
+from .models import (
+    Instance, InstanceActivity, InstanceTemplate, Node, NodeActivity,
+)
 
 
 logger = getLogger(__name__)
@@ -409,3 +411,35 @@ class WakeUpOperation(InstanceOperation):
 
 
 register_instance_operation(WakeUpOperation)
+
+
+class NodeOperation(Operation):
+    async_operation = async_node_operation
+
+    def __init__(self, node):
+        super(NodeOperation, self).__init__(subject=node)
+        self.node = node
+
+    def create_activity(self, user):
+        return NodeActivity.create(code_suffix=self.activity_code_suffix,
+                                   node=self.node, user=user)
+
+
+def register_node_operation(op_cls, op_id=None):
+    return register_operation(Node, op_cls, op_id)
+
+
+class FlushOperation(NodeOperation):
+    activity_code_suffix = 'flush'
+    id = 'flush'
+    name = _("flush")
+    description = _("""Disable node and move all instances to other ones.""")
+
+    def _operation(self, activity, user, system):
+        self.node.disable(user, activity)
+        for i in self.node.instance_set.all():
+            with activity.sub_activity('migrate_instance_%d' % i.pk):
+                i.migrate()
+
+
+register_node_operation(FlushOperation)
diff --git a/circle/vm/tasks/local_tasks.py b/circle/vm/tasks/local_tasks.py
index a31c869..d8abcb9 100644
--- a/circle/vm/tasks/local_tasks.py
+++ b/circle/vm/tasks/local_tasks.py
@@ -16,5 +16,14 @@ def async_instance_operation(operation_id, instance_pk, activity_pk, **kwargs):
 
 
 @celery.task
-def flush(node, user):
-    node.flush(task_uuid=flush.request.id, user=user)
+def async_node_operation(operation_id, node_pk, activity_pk, **kwargs):
+    from vm.models import Node, NodeActivity
+    node = Node.objects.get(pk=node_pk)
+    operation = getattr(node, operation_id)
+    activity = NodeActivity.objects.get(pk=activity_pk)
+
+    # save async task UUID to activity
+    activity.task_uuid = async_node_operation.request.id
+    activity.save()
+
+    return operation._exec_op(activity=activity, **kwargs)
diff --git a/circle/vm/tests/test_models.py b/circle/vm/tests/test_models.py
index ef5fc14..e059f5c 100644
--- a/circle/vm/tests/test_models.py
+++ b/circle/vm/tests/test_models.py
@@ -10,7 +10,7 @@ from ..models import (
 )
 from ..models.instance import find_unused_port, ActivityInProgressError
 from ..operations import (
-    DeployOperation, DestroyOperation, MigrateOperation
+    DeployOperation, DestroyOperation, FlushOperation, MigrateOperation,
 )
 
 
@@ -244,35 +244,37 @@ class InstanceActivityTestCase(TestCase):
         subact.__enter__.assert_called()
 
     def test_flush(self):
-        node = MagicMock(spec=Node, enabled=True)
-        user = MagicMock(spec=User)
         insts = [MagicMock(spec=Instance, migrate=MagicMock()),
                  MagicMock(spec=Instance, migrate=MagicMock())]
+        node = MagicMock(spec=Node, enabled=True)
+        node.instance_set.all.return_value = insts
+        user = MagicMock(spec=User)
+        flush_op = FlushOperation(node)
 
-        with patch('vm.models.node.node_activity') as na:
-            act = na.return_value.__enter__.return_value = MagicMock()
-            node.instance_set.all.return_value = insts
+        with patch.object(FlushOperation, 'create_activity') as create_act:
+            act = create_act.return_value = MagicMock()
 
-            Node.flush(node, user)
+            flush_op(user=user)
 
-            na.__enter__.assert_called()
+            create_act.assert_called()
             node.disable.assert_called_with(user, act)
             for i in insts:
                 i.migrate.assert_called()
 
     def test_flush_disabled_wo_user(self):
-        node = MagicMock(spec=Node, enabled=False)
         insts = [MagicMock(spec=Instance, migrate=MagicMock()),
                  MagicMock(spec=Instance, migrate=MagicMock())]
+        node = MagicMock(spec=Node, enabled=False)
+        node.instance_set.all.return_value = insts
+        flush_op = FlushOperation(node)
 
-        with patch('vm.models.node.node_activity') as na:
-            act = na.return_value.__enter__.return_value = MagicMock()
-            node.instance_set.all.return_value = insts
+        with patch.object(FlushOperation, 'create_activity') as create_act:
+            act = create_act.return_value = MagicMock()
 
-            Node.flush(node)
+            flush_op(system=True)
 
+            create_act.assert_called()
             node.disable.assert_called_with(None, act)
             # ^ should be called, but real method no-ops if disabled
-            na.__enter__.assert_called()
             for i in insts:
                 i.migrate.assert_called()
--
libgit2 0.26.0