# -*- coding: utf-8 -*- # Copyright 2014 Budapest University of Technology and Economics (BME IK) # # This file is part of CIRCLE Cloud. # # CIRCLE is free software: you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) # any later version. # # CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along # with CIRCLE. If not, see <http://www.gnu.org/licenses/>. import logging import uuid import time import re from celery.contrib.abortable import AbortableAsyncResult from celery.result import allow_join_result from celery.exceptions import TimeoutError from django.core.exceptions import ObjectDoesNotExist from django.urls import reverse from django.db.models import (Model, BooleanField, CharField, DateTimeField, IntegerField, ForeignKey) from django.db import models from django.utils import timezone from django.utils.translation import ugettext_lazy as _, ugettext_noop from model_utils.models import TimeStampedModel from os.path import join from sizefield.models import FileSizeField from common.models import ( WorkerNotFound, HumanReadableException, humanize_exception, join_activity_code, method_cache ) from .tasks import local_tasks, storage_tasks logger = logging.getLogger(__name__) class DataStore(Model): """Collection of virtual disks. """ name = CharField(max_length=100, unique=True, verbose_name=_('name')) path = CharField(max_length=200, unique=True, verbose_name=_('path')) hostname = CharField(max_length=40, unique=False, verbose_name=_('hostname')) driver_cache = CharField(max_length=20, unique=False, verbose_name=_('cache-mode - qemu'), default='none') class Meta: ordering = ['name'] verbose_name = _('datastore') verbose_name_plural = _('datastores') def __str__(self): return '%s (%s)' % (self.name, self.path) def get_remote_queue_name(self, queue_id, priority=None, check_worker=True): logger.debug("Checking for storage queue %s.%s", self.hostname, queue_id) if not check_worker or local_tasks.check_queue(self.hostname, queue_id, priority): queue_name = self.hostname + '.' + queue_id if priority is not None: queue_name = queue_name + '.' + priority return queue_name else: raise WorkerNotFound() def get_deletable_disks(self): return [disk.filename for disk in self.disk_set.filter( destroyed__isnull=False) if disk.is_deletable] @method_cache(30) def get_statistics(self, timeout=15): q = self.get_remote_queue_name("storage", priority="fast") return storage_tasks.get_storage_stat.apply_async( args=[self.path], queue=q).get(timeout=timeout) @method_cache(30) def get_orphan_disks(self, timeout=25): """Disk image files without Disk object in the database. """ queue_name = self.get_remote_queue_name('storage', "slow") files = set(storage_tasks.list_files.apply_async( args=[self.path], queue=queue_name).get(timeout=timeout)) disks = set([disk.filename for disk in self.disk_set.all()]) orphans = [] for i in files - disks: if not re.match('cloud-[0-9]*\.dump', i): orphans.append(i) return orphans @method_cache(30) def get_missing_disks(self, timeout=25): """Disk objects without disk image files. """ queue_name = self.get_remote_queue_name('storage', "slow") files = set(storage_tasks.list_files.apply_async( args=[self.path], queue=queue_name).get(timeout=timeout)) disks = Disk.objects.filter(destroyed__isnull=True, is_ready=True, datastore=self) return disks.exclude(filename__in=files) @method_cache(120) def get_file_statistics(self, timeout=90): queue_name = self.get_remote_queue_name('storage', "slow") data = storage_tasks.get_file_statistics.apply_async( args=[self.path], queue=queue_name).get(timeout=timeout) return data class Disk(TimeStampedModel): """A virtual disk. """ TYPES = [('qcow2-norm', 'qcow2 normal'), ('qcow2-snap', 'qcow2 snapshot'), ('iso', 'iso'), ('raw-ro', 'raw read-only'), ('raw-rw', 'raw')] BUS_TYPES = (('virtio', 'virtio'), ('ide', 'ide'), ('scsi', 'scsi')) EXPORT_FORMATS = (('qcow2', _('QEMU disk image')), ('vmdk', _('VMware disk image')), ('vdi', _('VirtualBox disk image')), ('vpc', _('HyperV disk image'))) name = CharField(blank=True, max_length=100, verbose_name=_("name")) filename = CharField(max_length=256, unique=True, verbose_name=_("filename")) datastore = ForeignKey(DataStore, verbose_name=_("datastore"), help_text=_("The datastore that holds the disk."), on_delete=models.CASCADE) type = CharField(max_length=10, choices=TYPES) bus = CharField(max_length=10, choices=BUS_TYPES, null=True, blank=True, default=None) size = FileSizeField(null=True, default=None) base = ForeignKey('self', blank=True, null=True, related_name='derivatives',on_delete=models.CASCADE) dev_num = CharField(default='a', max_length=1, verbose_name=_("device number")) destroyed = DateTimeField(blank=True, default=None, null=True) ci_disk = BooleanField(default=False) is_ready = BooleanField(default=False) cache_size = IntegerField(default=1024, help_text=_("Disk metadata cache max size (Kbyte)"), verbose_name=_('cache size')) class Meta: ordering = ['name'] verbose_name = _('disk') verbose_name_plural = _('disks') permissions = ( ('create_empty_disk', _('Can create an empty disk.')), ('download_disk', _('Can download a disk.')), ('resize_disk', _('Can resize a disk.')), ('import_disk', _('Can import a disk.')), ('export_disk', _('Can export a disk.')) ) class DiskError(HumanReadableException): admin_message = None def __init__(self, disk, params=None, level=None, **kwargs): kwargs.update(params or {}) self.disc = kwargs["disk"] = disk super(Disk.DiskError, self).__init__( level, self.message, self.admin_message or self.message, kwargs) class WrongDiskTypeError(DiskError): message = ugettext_noop("Operation can't be invoked on disk " "'%(name)s' of type '%(type)s'.") admin_message = ugettext_noop( "Operation can't be invoked on disk " "'%(name)s' (%(pk)s) of type '%(type)s'.") def __init__(self, disk, params=None, **kwargs): super(Disk.WrongDiskTypeError, self).__init__( disk, params, type=disk.type, name=disk.name, pk=disk.pk) class DiskInUseError(DiskError): message = ugettext_noop( "The requested operation can't be performed on " "disk '%(name)s' because it is in use.") admin_message = ugettext_noop( "The requested operation can't be performed on " "disk '%(name)s' (%(pk)s) because it is in use.") def __init__(self, disk, params=None, **kwargs): super(Disk.DiskInUseError, self).__init__( disk, params, name=disk.name, pk=disk.pk) class DiskIsNotReady(DiskError): message = ugettext_noop( "The requested operation can't be performed on " "disk '%(name)s' because it has never been deployed.") admin_message = ugettext_noop( "The requested operation can't be performed on " "disk '%(name)s' (%(pk)s) [%(filename)s] because it has never been" "deployed.") def __init__(self, disk, params=None, **kwargs): super(Disk.DiskIsNotReady, self).__init__( disk, params, name=disk.name, pk=disk.pk, filename=disk.filename) class DiskBaseIsNotReady(DiskError): message = ugettext_noop( "The requested operation can't be performed on " "disk '%(name)s' because its base has never been deployed.") admin_message = ugettext_noop( "The requested operation can't be performed on " "disk '%(name)s' (%(pk)s) [%(filename)s] because its base " "'%(b_name)s' (%(b_pk)s) [%(b_filename)s] has never been" "deployed.") def __init__(self, disk, params=None, **kwargs): base = kwargs.get('base') super(Disk.DiskBaseIsNotReady, self).__init__( disk, params, name=disk.name, pk=disk.pk, filename=disk.filename, b_name=base.name, b_pk=base.pk, b_filename=base.filename) @property def path(self): """The path where the files are stored. """ return join(self.datastore.path, self.filename) @property def vm_format(self): """Returns the proper file format for different type of images. """ return { 'qcow2-norm': 'qcow2', 'qcow2-snap': 'qcow2', 'iso': 'raw', 'raw-ro': 'raw', 'raw-rw': 'raw', }[self.type] @property def format(self): """Returns the proper file format for different types of images. """ return { 'qcow2-norm': 'qcow2', 'qcow2-snap': 'qcow2', 'iso': 'iso', 'raw-ro': 'raw', 'raw-rw': 'raw', }[self.type] @property def device_type(self): """Returns the proper device prefix for different types of images. """ return { 'qcow2-norm': 'vd', 'qcow2-snap': 'vd', 'iso': 'sd', 'raw-ro': 'vd', 'raw-rw': 'vd', }[self.type] @property def device_bus(self): """Returns the proper device prefix for different types of images. """ if self.bus: return self.bus return { 'qcow2-norm': 'virtio', 'qcow2-snap': 'virtio', 'iso': 'ide', 'raw-ro': 'virtio', 'raw-rw': 'virtio', }[self.type] @property def is_deletable(self): """True if the associated file can be deleted. """ # Check if all children and the disk itself is destroyed. return (self.destroyed is not None) and self.children_deletable @property def children_deletable(self): """True if all children of the disk are deletable. """ return all(i.is_deletable for i in self.derivatives.all()) @property def is_in_use(self): """True if disk is attached to an active VM. 'In use' means the disk is attached to a VM which is not STOPPED, as any other VMs leave the disk in an inconsistent state. """ return any(i.state != 'STOPPED' for i in self.instance_set.all()) def get_appliance(self): """Return the Instance or InstanceTemplate object where the disk is used """ try: app = self.template_set.all() or self.instance_set.all() return app.get() except ObjectDoesNotExist: return None def get_exclusive(self): """Get an instance of the disk for exclusive usage. This method manipulates the database only. """ type_mapping = { 'qcow2-norm': 'qcow2-snap', 'iso': 'iso', 'raw-ro': 'raw-rw', } if self.type not in list(type_mapping.keys()): raise self.WrongDiskTypeError(self) new_type = type_mapping[self.type] return Disk.create(base=self, datastore=self.datastore, name=self.name, size=self.size, type=new_type, dev_num=self.dev_num) def get_vmdisk_desc(self): """Serialize disk object to the vmdriver. """ return { 'name': self.name, 'source': self.path, 'driver_type': self.vm_format, 'driver_cache': self.datastore.driver_cache, 'target_device': self.device_type + self.dev_num, 'target_bus': self.device_bus, 'disk_device': 'cdrom' if self.type == 'iso' else 'disk', 'cache_size': self.cache_size } def get_disk_desc(self): """Serialize disk object to the storage driver. """ return { 'name': self.filename, 'dir': self.datastore.path, 'format': self.format, 'size': self.size, 'base_name': self.base.filename if self.base else None, 'type': 'snapshot' if self.base else 'normal', 'cache_size': self.cache_size } def get_remote_queue_name(self, queue_id='storage', priority=None, check_worker=True): """Returns the proper queue name based on the datastore. """ if self.datastore: return self.datastore.get_remote_queue_name(queue_id, priority, check_worker) else: return None def __str__(self): return "%s (#%d)" % (self.name, self.id or 0) def clean(self, *args, **kwargs): if (self.size is None or "") and self.base: self.size = self.base.size super(Disk, self).clean(*args, **kwargs) def deploy(self, user=None, task_uuid=None, timeout=15): """Reify the disk model on the associated data store. :param self: the disk model to reify :type self: storage.models.Disk :param user: The user who's issuing the command. :type user: django.contrib.auth.models.User :param task_uuid: The task's UUID, if the command is being executed asynchronously. :type task_uuid: str :return: True if a new reification of the disk has been created; otherwise, False. :rtype: bool """ if self.destroyed: self.destroyed = None self.save() if self.is_ready: return True if self.base and not self.base.is_ready: raise self.DiskBaseIsNotReady(self, base=self.base) queue_name = self.get_remote_queue_name('storage', priority="fast") disk_desc = self.get_disk_desc() if self.base is not None: storage_tasks.snapshot.apply_async(args=[disk_desc], queue=queue_name ).get(timeout=timeout) else: storage_tasks.create.apply_async(args=[disk_desc], queue=queue_name ).get(timeout=timeout) self.is_ready = True self.save() return True @classmethod def create(cls, user=None, datastore='default', **params): disk = cls.__create(user, datastore, params) disk.clean() disk.save() logger.debug("Disk created from: %s", str(params.get("base", "nobase"))) return disk @classmethod def __create(cls, user, datastore, params): if isinstance(datastore, str): datastore = DataStore.objects.filter(name=datastore).get() datastore = params.pop('datastore', datastore) filename = params.pop('filename', str(uuid.uuid4())) disk = cls(filename=filename, datastore=datastore, **params) return disk @classmethod def _run_abortable_task(cls, remote, task): while not remote.ready(): time.sleep(5) logger.debug("Waiting for abortable task. Status: %s", AbortableAsyncResult(remote.id).status) if task is not None and task.is_aborted(): AbortableAsyncResult(remote.id).abort() raise humanize_exception(ugettext_noop("Operation aborted by user."), TimeoutError("Abort")) with allow_join_result(): result = remote.get() return result @classmethod def create_ci_disk(cls, meta_data, user_data, network_data, user = None, **params): params.setdefault('name', 'ci-disk') params.setdefault('type', 'raw-ro') params.setdefault('size', None) disk = cls.__create(params=params, user=user, datastore='default') queue_name = disk.get_remote_queue_name('storage', priority="fast") disk_desc = disk.get_disk_desc() result = storage_tasks.create_ci_disk.apply_async(args=[disk_desc, meta_data, user_data, network_data], queue=queue_name ).get(timeout=15) disk.size = result['size'] disk.type = result['type'] disk.checksum = result['checksum'] disk.dev_num = 'c' # disk.bus = 'virtio' disk.is_ready = True disk.ci_disk = True disk.save() return disk @classmethod def download(cls, url, task, user=None, resize = None, datastore = None, **params): """Create disk object and download data from url synchronusly. :param url: image url to download. :type url: url :param instance: Instance or template attach the Disk to. :type instance: vm.models.Instance or InstanceTemplate or NoneType :param user: owner of the disk :type user: django.contrib.auth.User :param task_uuid: UUID of the local task :param abortable_task: UUID of the remote running abortable task. :return: The created Disk object :rtype: Disk """ params.setdefault('name', url.split('/')[-1]) params.setdefault('type', 'iso') params.setdefault('size', None) if not datastore: datastore = DataStore.objects.filter(name='default').get().name disk = cls.__create(params=params, user=user, datastore=datastore) queue_name = disk.get_remote_queue_name('storage', priority='slow') remote = storage_tasks.download.apply_async( kwargs={'url': url, 'parent_id': task.request.id, 'disk': disk.get_disk_desc(), 'resize': resize }, queue=queue_name) result = cls._run_abortable_task(remote, task) disk.size = result['size'] disk.type = result['type'] disk.checksum = result['checksum'] disk.is_ready = True disk.ci_disk = False disk.save() return disk @classmethod def import_disk(cls, user, name, download_link, port, task): params = {'name': name, 'type': 'qcow2-norm'} disk = cls.__create(user=user, params=params) queue_name = disk.get_remote_queue_name('storage', priority='slow') remote = storage_tasks.import_disk.apply_async( kwargs={ "disk_desc": disk.get_disk_desc(), "url": download_link, "port": port }, queue=queue_name ) result = cls._run_abortable_task(remote, task) disk.size = result["size"] disk.checksum = result["checksum"] disk.is_ready = True disk.save() return disk def export(self, disk_format, upload_link, port, task): queue_name = self.get_remote_queue_name('storage', priority='slow') remote = storage_tasks.export_disk.apply_async( kwargs={ "disk_desc": self.get_disk_desc(), "disk_format": disk_format, "upload_link": upload_link, "port": port }, queue=queue_name) return self._run_abortable_task(remote, task) def destroy(self, user=None, task_uuid=None): if self.destroyed: return False self.destroyed = timezone.now() self.save() return True def restore(self, user=None, task_uuid=None, timeout=15): """Recover destroyed disk from trash if possible. """ queue_name = self.datastore.get_remote_queue_name( 'storage', priority='slow') logger.info("Image: %s at Datastore: %s recovered from trash." % (self.filename, self.datastore.path)) storage_tasks.recover_from_trash.apply_async( args=[self.datastore.path, self.filename], queue=queue_name).get(timeout=timeout) def save_as(self, task=None, user=None, task_uuid=None, datastore=None, timeout=300): """Save VM as template. Based on disk type: qcow2-norm, qcow2-snap --> qcow2-norm iso --> iso (with base) VM must be in STOPPED state to perform this action. The timeout parameter is not used now. """ mapping = { 'qcow2-snap': ('qcow2-norm', None), 'qcow2-norm': ('qcow2-norm', None), 'iso': ("iso", self), } if self.type not in list(mapping.keys()): raise self.WrongDiskTypeError(self) if self.is_in_use: raise self.DiskInUseError(self) if not self.is_ready: raise self.DiskIsNotReady(self) # from this point on, the caller has to guarantee that the disk is not # going to be used until the operation is complete new_type, new_base = mapping[self.type] if not datastore: datastore = self.datastore disk = Disk.create(datastore=datastore, base=new_base, name=self.name, size=self.size, type=new_type, dev_num=self.dev_num) queue_name = self.get_remote_queue_name("storage", priority="slow") remote = storage_tasks.merge.apply_async(kwargs={ "old_json": self.get_disk_desc(), "new_json": disk.get_disk_desc(), "parent_id": task.request.id}, queue=queue_name ) # Timeout while True: try: remote.get(timeout=5) break except TimeoutError as e: if task is not None and task.is_aborted(): AbortableAsyncResult(remote.id).abort() disk.destroy() raise humanize_exception(ugettext_noop( "Operation aborted by user."), e) except: disk.destroy() raise disk.is_ready = True disk.save() return disk def get_absolute_url(self): return reverse('dashboard.views.disk-detail', kwargs={'pk': self.pk}) @property def is_resizable(self): return self.type in ('qcow2-norm', 'raw-rw', 'qcow2-snap',) @property def is_exportable(self): return self.type in ('qcow2-norm', 'qcow2-snap', 'raw-rw', 'raw-ro') from common.models import ActivityModel class StorageActivity(ActivityModel): disk = ForeignKey('storage.Disk', blank=True, null=True, verbose_name=_('disk'), on_delete=models.CASCADE, help_text=_('Disks which are to be mounted.')) ACTIVITY_CODE_BASE = join_activity_code('st', 'Storage') class Meta: db_table = 'st_act' ordering = ['-finished', '-started', '-id'] @classmethod def create(cls, code_suffix, task_uuid=None, user=None): activity_code = cls.construct_activity_code(code_suffix) act = cls(activity_code=activity_code, parent=None, started=timezone.now(), task_uuid=task_uuid, user=user) act.save() return act