From 6d9a0b97582badd5662a577c8d655a558d2027ad Mon Sep 17 00:00:00 2001 From: Guba Sándor <guba.sandor@cloud.bme.hu> Date: Mon, 6 Oct 2014 10:14:24 +0200 Subject: [PATCH] vm: rework cleanup to handle different queues --- circle/vm/models/activity.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/circle/vm/models/activity.py b/circle/vm/models/activity.py index e2b75ca..3bde8c7 100644 --- a/circle/vm/models/activity.py +++ b/circle/vm/models/activity.py @@ -20,7 +20,6 @@ from contextlib import contextmanager from logging import getLogger from warnings import warn -from celery.signals import worker_ready from celery.contrib.abortable import AbortableAsyncResult from django.core.urlresolvers import reverse @@ -278,17 +277,17 @@ def node_activity(code_suffix, node, task_uuid=None, user=None, return activitycontextimpl(act) -@worker_ready.connect() def cleanup(conf=None, **kwargs): # TODO check if other manager workers are running - from celery.task.control import discard_all - discard_all() msg_txt = ugettext_noop("Manager is restarted, activity is cleaned up. " "You can try again now.") message = create_readable(msg_txt, msg_txt) + queue_name = kwargs.get('queue_name', None) for i in InstanceActivity.objects.filter(finished__isnull=True): - i.finish(False, result=message) - logger.error('Forced finishing stale activity %s', i) + op = i.get_operation() + if op and op.async_queue == queue_name: + i.finish(False, result=message) + logger.error('Forced finishing stale activity %s', i) for i in NodeActivity.objects.filter(finished__isnull=True): i.finish(False, result=message) logger.error('Forced finishing stale activity %s', i) -- libgit2 0.26.0