Commit 4da7b486 by Szeberényi Imre

Fured fix

parent 58ca65ad
......@@ -5,7 +5,7 @@ CELERY_TIMEZONE = 'UTC'
CELERY_ENABLE_UTC = True
CELERY_ACCEPT_CONTENT = ['json', 'pickle_v2', 'pickle']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'pickle_v2'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_STORE_ERRORS_EVEN_IF_IGNORED = True
......
......@@ -11,6 +11,7 @@
<dt>{% trans "Host online" %}:</dt><dd> {{ node.online|yesno }}</dd>
<dt>{% trans "Minion online" %}:</dt><dd> {{ node.minion_online|yesno }}</dd>
<dt>{% trans "Priority" %}:</dt><dd>{{ node.priority }}</dd>
<dt>{% trans "Interop group" %}:</dt><dd>{{ node.capability }}</dd>
<dt>{% trans "Driver Version:" %}</dt>
<dd>
{% if node.driver_version %}
......
......@@ -74,7 +74,7 @@ from vm.operations import DeployOperation, ShutdownOperation
from request.models import TemplateAccessType, LeaseType
from request.forms import LeaseRequestForm, TemplateRequestForm
from ..models import Favourite
from manager.scheduler import has_traits
from manager.scheduler import has_traits, common_select
logger = logging.getLogger(__name__)
......@@ -446,7 +446,7 @@ class VmMigrateView(FormOperationMixin, VmOperationView):
inst = self.get_object()
try:
if isinstance(inst, Instance):
default = inst.select_node()
default = inst.select_node(excl=inst.node)
except SchedulerError:
logger.exception("scheduler error:")
......@@ -459,10 +459,13 @@ class VmMigrateView(FormOperationMixin, VmOperationView):
inst = self.get_object()
if isinstance(inst, Instance):
nodes_w_traits = [
n.pk for n in Node.objects.filter(enabled=True)
if n.online and has_traits(inst.req_traits.all(), n)
]
nodes_w_traits = [
n.pk for n in common_select(inst, Node.objects.filter(enabled=True))
]
# nodes_w_traits = [
# n.pk for n in Node.objects.filter(enabled=True)
# if n.online and has_traits(inst.req_traits.all(), n)
# ]
ctx['nodes_w_traits'] = nodes_w_traits
return ctx
......
......@@ -62,6 +62,10 @@ def common_select(instance, nodes):
nodes = [n for n in nodes
if n.schedule_enabled and n.online and
has_traits(instance.req_traits.all(), n)]
logger.error('capab: %s 0selected_nodes: %s', instance.capability_group, nodes)
if instance.capability_group:
nodes = [n for n in nodes if n.capability == instance.capability_group]
logger.error('capab: %s selected_nodes: %s', instance.capability_group, nodes)
if not nodes:
logger.warning('select_node: no usable node for %s', unicode(instance))
raise TraitsUnsatisfiableException()
......@@ -93,7 +97,7 @@ def common_random(instance, nodes):
def advanced_with_time_stamp(instance, nodes):
nodes = common_select(instance, nodes)
nodes.sort(key=sorting_key, reverse=True)
logger.info("SCHEDLOG: {}".format(json.dumps({
logger.debug("SCHEDLOG: {}".format(json.dumps({
"event": "after_sort",
"list": map(lambda node: unicode(node), nodes)})))
result = nodes[0]
......@@ -112,7 +116,7 @@ def select_node(instance, nodes):
else: # Default method is the random
result = common_random(instance, nodes)
logger.info("SCHEDLOG: {}".format(json.dumps(
logger.debug("SCHEDLOG: {}".format(json.dumps(
{"event": "select",
"node": unicode(result),
"vm": unicode(instance)})))
......@@ -131,7 +135,7 @@ def sorting_key(node):
key = free_cpu_time(node) * corr
else:
key = free_ram(node) * corr
logger.info("SCHEDLOG: {}".format(json.dumps({
logger.debug("SCHEDLOG: {}".format(json.dumps({
"event": "sort",
"node": unicode(node),
"sorting_key": unicode(key),
......@@ -169,7 +173,7 @@ def last_scheduled_correction_factor(node):
factor = 1
elif factor < 0:
factor = 1
logger.info('Scheduler set factor to %s', unicode(factor))
logger.debug('Scheduler set factor to %s', unicode(factor))
return factor
......
......@@ -28,7 +28,7 @@ def check_queue(storage, queue_id, priority):
if priority is not None:
queue_name = queue_name + "." + priority
inspect = celery.control.inspect()
inspect.timeout = 0.5
inspect.timeout = 3
active_queues = inspect.active_queues()
if active_queues is None:
return False
......
......@@ -261,6 +261,9 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin,
"destruction."))
objects = Manager()
active = QueryManager(destroyed_at=None)
capability_group = CharField(max_length=20, blank=True, null=True,
help_text=_("Capability group of the node where this instance was last running."),
)
class Meta:
app_label = 'vm'
......@@ -774,10 +777,13 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin,
timezone.now() + lease.suspend_interval,
timezone.now() + lease.delete_interval)
def select_node(self):
def select_node(self, excl=None):
"""Returns the node the VM should be deployed or migrated to.
"""
return scheduler.select_node(self, Node.objects.all())
nodes = Node.objects.all()
if excl is not None:
nodes = nodes.exclude(id=excl.id)
return scheduler.select_node(self, nodes)
def destroy_disks(self):
"""Destroy all associated disks.
......@@ -806,6 +812,8 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin,
def allocate_node(self):
if self.node is None:
self.node = self.select_node()
self.capability_group = self.node.capability
logger.info('allocate_node: %s', self.node)
self.save()
return self.node
......
......@@ -159,6 +159,10 @@ class Node(OperatedMixin, TimeStampedModel):
help_text=_("A timestamp for the node, used by the scheduler."),
verbose_name=_("Last Scheduled Time Stamp")
)
capability = CharField(max_length=20, blank=True, null=True,
help_text=_("Capability group of the node."),
verbose_name=_("Capability group")
)
class Meta:
app_label = 'vm'
......@@ -265,14 +269,16 @@ class Node(OperatedMixin, TimeStampedModel):
Throws Exception if there is no worker on the queue.
The result may include dead queues because of caching.
"""
queue_name = self.host.hostname + "." + queue_id
if priority is not None:
queue_name = queue_name + "." + priority
logger.info("get_remote_queue_name %s ", queue_name)
if vm_tasks.check_queue(self.host.hostname, queue_id, priority):
queue_name = self.host.hostname + "." + queue_id
if priority is not None:
queue_name = queue_name + "." + priority
self.node_online()
# logger.error("Node_online")
return queue_name
else:
logger.error("Node_offline %s", queue_name)
if self.enabled:
self.node_offline()
raise WorkerNotFound()
......
......@@ -57,6 +57,8 @@ from .tasks.local_tasks import (
abortable_async_instance_operation, abortable_async_node_operation,
)
from common.safe_celery_get import safe_celery_get
logger = getLogger(__name__)
......@@ -66,9 +68,10 @@ class RemoteOperationMixin(object):
def _operation(self, **kwargs):
args = self._get_remote_args(**kwargs)
logger.info("RemoteOpMixin %s %s %r", self.id, self._get_remote_queue(), args);
return self.task.apply_async(
ar = self.task.apply_async(
args=args, queue=self._get_remote_queue()
).get(timeout=self.remote_timeout)
)
return safe_celery_get(ar, timeout=self.remote_timeout)
def check_precond(self):
super(RemoteOperationMixin, self).check_precond()
......@@ -425,6 +428,7 @@ class DeployOperation(InstanceOperation):
self.instance.allocate_vnc_port()
if node is not None:
self.instance.node = node
self.instance.capability_group = node.capability
self.instance.save()
else:
self.instance.allocate_node()
......@@ -597,7 +601,7 @@ class MigrateOperation(RemoteInstanceOperation):
with activity.sub_activity('scheduling',
readable_name=ugettext_noop(
"schedule")) as sa:
to_node = self.instance.select_node()
to_node = self.instance.select_node(excl=self.instance.node)
sa.result = to_node
try:
......@@ -989,8 +993,9 @@ class WakeUpOperation(InstanceOperation):
def _operation(self, activity):
# Schedule vm
self.instance.allocate_vnc_port()
if self.instance.capability_group is None:
self.instance.capability_group = "old" # we suppose that this was suspened on old node
self.instance.allocate_node()
# Resume vm
self.instance._wake_up_vm(parent_activity=activity)
......
......@@ -22,7 +22,7 @@ from manager.mancelery import celery
logger = getLogger(__name__)
@celery.task()
def check_queue(node_hostname, queue_id, priority=None):
"""True if the queue is alive.
......@@ -55,9 +55,9 @@ def get_queues():
result = cache.get(key)
if result is None:
inspect = celery.control.inspect()
inspect.timeout = 0.5
inspect.timeout = 3
result = inspect.active_queues()
logger.debug('Queue list of length %d cached.', result and len(result))
logger.info('Queue list of length %d cached.', len(result or []))
cache.set(key, result, 10)
return result
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment