diff --git a/circle/common/models.py b/circle/common/models.py index d63beb3..0a2153f 100644 --- a/circle/common/models.py +++ b/circle/common/models.py @@ -17,6 +17,7 @@ from collections import deque from contextlib import contextmanager +from functools import update_wrapper from hashlib import sha224 from itertools import chain, imap from logging import getLogger @@ -36,6 +37,7 @@ from django.utils.functional import Promise from django.utils.translation import ugettext_lazy as _, ugettext_noop from jsonfield import JSONField +from manager.mancelery import celery from model_utils.models import TimeStampedModel logger = getLogger(__name__) @@ -212,6 +214,38 @@ class ActivityModel(TimeStampedModel): self.result_data = None if value is None else value.to_dict() +@celery.task() +def compute_cached(method, instance, memcached_seconds, + key, start, *args, **kwargs): + """Compute and store actual value of cached method.""" + if isinstance(method, basestring): + model, id = instance + instance = model.objects.get(id=id) + try: + method = getattr(model, method) + while hasattr(method, '_original') or hasattr(method, 'fget'): + try: + method = method._original + except AttributeError: + method = method.fget + except AttributeError: + logger.exception("Couldnt get original method of %s", + unicode(method)) + raise + + # call the actual method + result = method(instance, *args, **kwargs) + # save to memcache + cache.set(key, result, memcached_seconds) + elapsed = time() - start + cache.set("%s.cached" % key, 2, max(memcached_seconds * 0.5, + memcached_seconds * 0.75 - elapsed)) + logger.debug('Value of <%s>.%s(%s)=<%s> saved to cache (%s elapsed).', + unicode(instance), method.__name__, unicode(args), + unicode(result), elapsed) + return result + + def method_cache(memcached_seconds=60, instance_seconds=5): # noqa """Cache return value of decorated method to memcached and memory. @@ -233,9 +267,11 @@ def method_cache(memcached_seconds=60, instance_seconds=5): # noqa def inner_cache(method): + method_name = method.__name__ + def get_key(instance, *args, **kwargs): return sha224(unicode(method.__module__) + - unicode(method.__name__) + + method_name + unicode(instance.id) + unicode(args) + unicode(kwargs)).hexdigest() @@ -254,21 +290,31 @@ def method_cache(memcached_seconds=60, instance_seconds=5): # noqa if vals['time'] + instance_seconds > now: # has valid on class cache, return that result = vals['value'] + setattr(instance, key, {'time': now, 'value': result}) if result is None: result = cache.get(key) if invalidate or (result is None): - # all caches failed, call the actual method - result = method(instance, *args, **kwargs) - # save to memcache and class attr - cache.set(key, result, memcached_seconds) + logger.debug("all caches failed, compute now") + result = compute_cached(method, instance, memcached_seconds, + key, time(), *args, **kwargs) setattr(instance, key, {'time': now, 'value': result}) - logger.debug('Value of <%s>.%s(%s)=<%s> saved to cache.', - unicode(instance), method.__name__, - unicode(args), unicode(result)) + elif not cache.get("%s.cached" % key): + logger.debug("caches expiring, compute async") + cache.set("%s.cached" % key, 1, memcached_seconds * 0.5) + try: + compute_cached.apply_async( + queue='localhost.man', kwargs=kwargs, args=[ + method_name, (instance.__class__, instance.id), + memcached_seconds, key, time()] + list(args)) + except: + logger.exception("Couldnt compute async %s", method_name) return result + + update_wrapper(x, method) + x._original = method return x return inner_cache diff --git a/circle/storage/tasks/local_tasks.py b/circle/storage/tasks/local_tasks.py index 3cec7f3..032bb2e 100644 --- a/circle/storage/tasks/local_tasks.py +++ b/circle/storage/tasks/local_tasks.py @@ -28,7 +28,7 @@ def check_queue(storage, queue_id, priority): if priority is not None: queue_name = queue_name + "." + priority inspect = celery.control.inspect() - inspect.timeout = 0.1 + inspect.timeout = 0.5 active_queues = inspect.active_queues() if active_queues is None: return False diff --git a/circle/vm/models/node.py b/circle/vm/models/node.py index ffbf358..1aa6edb 100644 --- a/circle/vm/models/node.py +++ b/circle/vm/models/node.py @@ -16,6 +16,7 @@ # with CIRCLE. If not, see <http://www.gnu.org/licenses/>. from __future__ import absolute_import, unicode_literals +from functools import update_wrapper from logging import getLogger from warnings import warn import requests @@ -51,6 +52,8 @@ def node_available(function): return function(self, *args, **kwargs) else: return None + update_wrapper(decorate, function) + decorate._original = function return decorate diff --git a/circle/vm/tasks/vm_tasks.py b/circle/vm/tasks/vm_tasks.py index 9046c84..fcedc56 100644 --- a/circle/vm/tasks/vm_tasks.py +++ b/circle/vm/tasks/vm_tasks.py @@ -55,7 +55,7 @@ def get_queues(): result = cache.get(key) if result is None: inspect = celery.control.inspect() - inspect.timeout = 0.1 + inspect.timeout = 0.5 result = inspect.active_queues() logger.debug('Queue list of length %d cached.', len(result)) cache.set(key, result, 10)