From 437844f11c85e47f25184edf22956d22b116b58a Mon Sep 17 00:00:00 2001 From: Guba Sándor <guba.sandor@cloud.bme.hu> Date: Tue, 18 Feb 2014 09:05:34 +0100 Subject: [PATCH] storage: added queue checkig mechanism --- circle/storage/models.py | 9 +++++++-- circle/storage/tasks/local_tasks.py | 19 +++++++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/circle/storage/models.py b/circle/storage/models.py index ee4b339..066f0a0 100644 --- a/circle/storage/models.py +++ b/circle/storage/models.py @@ -13,7 +13,7 @@ from sizefield.models import FileSizeField from acl.models import AclBase from .tasks import local_tasks, remote_tasks -from common.models import ActivityModel, activitycontextimpl +from common.models import ActivityModel, activitycontextimpl, WorkerNotFound logger = logging.getLogger(__name__) @@ -36,7 +36,12 @@ class DataStore(Model): return u'%s (%s)' % (self.name, self.path) def get_remote_queue_name(self, queue_id): - return self.hostname + '.' + queue_id + logger.debug("Checking for storage queue %s.%s", + self.hostname, queue_id) + if local_tasks.check_queue(self.hostname, queue_id): + return self.hostname + '.' + queue_id + else: + raise WorkerNotFound() class Disk(AclBase, TimeStampedModel): diff --git a/circle/storage/tasks/local_tasks.py b/circle/storage/tasks/local_tasks.py index 3ac9d53..59b5bda 100644 --- a/circle/storage/tasks/local_tasks.py +++ b/circle/storage/tasks/local_tasks.py @@ -2,6 +2,25 @@ from manager.mancelery import celery @celery.task +def check_queue(storage, queue_id): + ''' Celery inspect job to check for active workers at queue_id + return True/False + ''' + drivers = ['storage', 'download'] + worker_list = [storage + "." + d for d in drivers] + queue_name = storage + "." + queue_id + # v is List of List of queues dict + active_queues = celery.control.inspect(worker_list).active_queues() + if active_queues is not None: + node_workers = [v for k, v in active_queues.iteritems()] + for worker in node_workers: + for queue in worker: + if queue['name'] == queue_name: + return True + return False + + +@celery.task def deploy(disk, user): disk.deploy(task_uuid=deploy.request.id, user=user) -- libgit2 0.26.0