models.py 19.1 KB
Newer Older
1 2
# -*- coding: utf-8 -*-

3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright 2014 Budapest University of Technology and Economics (BME IK)
#
# This file is part of CIRCLE Cloud.
#
# CIRCLE is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along
# with CIRCLE.  If not, see <http://www.gnu.org/licenses/>.

20
from __future__ import unicode_literals
21 22

import logging
23
from os.path import join
24
import uuid
25
import re
26

Guba Sándor committed
27 28
from celery.contrib.abortable import AbortableAsyncResult
from django.db.models import (Model, BooleanField, CharField, DateTimeField,
29
                              ForeignKey)
30
from django.core.exceptions import ObjectDoesNotExist
31
from django.utils import timezone
32
from django.utils.translation import ugettext_lazy as _, ugettext_noop
33
from model_utils.models import TimeStampedModel
34
from sizefield.models import FileSizeField
35

Guba Sándor committed
36
from .tasks import local_tasks, storage_tasks
37
from celery.exceptions import TimeoutError
38
from common.models import (
39
    WorkerNotFound, HumanReadableException, humanize_exception, method_cache
40
)
41 42 43 44

logger = logging.getLogger(__name__)


45
class DataStore(Model):
Guba Sándor committed
46

47 48
    """Collection of virtual disks.
    """
49 50 51 52
    name = CharField(max_length=100, unique=True, verbose_name=_('name'))
    path = CharField(max_length=200, unique=True, verbose_name=_('path'))
    hostname = CharField(max_length=40, unique=True,
                         verbose_name=_('hostname'))
Guba Sándor committed
53

54 55 56 57 58 59 60 61
    class Meta:
        ordering = ['name']
        verbose_name = _('datastore')
        verbose_name_plural = _('datastores')

    def __unicode__(self):
        return u'%s (%s)' % (self.name, self.path)

62 63
    def get_remote_queue_name(self, queue_id, priority=None,
                              check_worker=True):
64 65
        logger.debug("Checking for storage queue %s.%s",
                     self.hostname, queue_id)
66
        if not check_worker or local_tasks.check_queue(self.hostname,
Guba Sándor committed
67 68 69 70 71 72
                                                       queue_id,
                                                       priority):
            queue_name = self.hostname + '.' + queue_id
            if priority is not None:
                queue_name = queue_name + '.' + priority
            return queue_name
73 74
        else:
            raise WorkerNotFound()
75

76 77 78
    def get_deletable_disks(self):
        return [disk.filename for disk in
                self.disk_set.filter(
79
                    destroyed__isnull=False) if disk.is_deletable]
80

81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
    @method_cache(30)
    def get_statistics(self, timeout=15):
        q = self.get_remote_queue_name("storage", priority="fast")
        return storage_tasks.get_storage_stat.apply_async(
            args=[self.path], queue=q).get(timeout=timeout)

    @method_cache(30)
    def get_orphan_disks(self, timeout=15):
        """Disk image files without Disk object in the database.
        """
        queue_name = self.get_remote_queue_name('storage', "slow")
        files = set(storage_tasks.list_files.apply_async(
            args=[self.path], queue=queue_name).get(timeout=timeout))
        disks = set([disk.filename for disk in self.disk_set.all()])

        orphans = []
        for i in files - disks:
            if not re.match('cloud-[0-9]*\.dump', i):
                orphans.append(i)
        return orphans

    @method_cache(30)
    def get_missing_disks(self, timeout=15):
        """Disk objects without disk image files.
        """
        queue_name = self.get_remote_queue_name('storage', "slow")
        files = set(storage_tasks.list_files.apply_async(
            args=[self.path], queue=queue_name).get(timeout=timeout))
        disks = Disk.objects.filter(destroyed__isnull=True, is_ready=True)
        return disks.exclude(filename__in=files)

112

Bach Dániel committed
113
class Disk(TimeStampedModel):
Guba Sándor committed
114

115 116 117 118
    """A virtual disk.
    """
    TYPES = [('qcow2-norm', 'qcow2 normal'), ('qcow2-snap', 'qcow2 snapshot'),
             ('iso', 'iso'), ('raw-ro', 'raw read-only'), ('raw-rw', 'raw')]
119
    BUS_TYPES = (('virtio', 'virtio'), ('ide', 'ide'), ('scsi', 'scsi'))
120
    name = CharField(blank=True, max_length=100, verbose_name=_("name"))
121 122
    filename = CharField(max_length=256, unique=True,
                         verbose_name=_("filename"))
123 124 125
    datastore = ForeignKey(DataStore, verbose_name=_("datastore"),
                           help_text=_("The datastore that holds the disk."))
    type = CharField(max_length=10, choices=TYPES)
126 127
    bus = CharField(max_length=10, choices=BUS_TYPES, null=True, blank=True,
                    default=None)
128
    size = FileSizeField(null=True, default=None)
129 130 131
    base = ForeignKey('self', blank=True, null=True,
                      related_name='derivatives')
    dev_num = CharField(default='a', max_length=1,
132
                        verbose_name=_("device number"))
133
    destroyed = DateTimeField(blank=True, default=None, null=True)
134

Guba Sándor committed
135 136
    is_ready = BooleanField(default=False)

137 138 139 140
    class Meta:
        ordering = ['name']
        verbose_name = _('disk')
        verbose_name_plural = _('disks')
141 142
        permissions = (
            ('create_empty_disk', _('Can create an empty disk.')),
143 144 145
            ('download_disk', _('Can download a disk.')),
            ('resize_disk', _('Can resize a disk.'))
        )
146

147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
    class DiskError(HumanReadableException):
        admin_message = None

        def __init__(self, disk, params=None, level=None, **kwargs):
            kwargs.update(params or {})
            self.disc = kwargs["disk"] = disk
            super(Disk.DiskError, self).__init__(
                level, self.message, self.admin_message or self.message,
                kwargs)

    class WrongDiskTypeError(DiskError):
        message = ugettext_noop("Operation can't be invoked on disk "
                                "'%(name)s' of type '%(type)s'.")

        admin_message = ugettext_noop(
            "Operation can't be invoked on disk "
            "'%(name)s' (%(pk)s) of type '%(type)s'.")

        def __init__(self, disk, params=None, **kwargs):
            super(Disk.WrongDiskTypeError, self).__init__(
                disk, params, type=disk.type, name=disk.name, pk=disk.pk)

    class DiskInUseError(DiskError):
        message = ugettext_noop(
            "The requested operation can't be performed on "
            "disk '%(name)s' because it is in use.")

        admin_message = ugettext_noop(
            "The requested operation can't be performed on "
            "disk '%(name)s' (%(pk)s) because it is in use.")

        def __init__(self, disk, params=None, **kwargs):
Guba Sándor committed
179
            super(Disk.DiskInUseError, self).__init__(
180 181 182 183 184 185 186 187 188 189 190 191 192
                disk, params, name=disk.name, pk=disk.pk)

    class DiskIsNotReady(DiskError):
        message = ugettext_noop(
            "The requested operation can't be performed on "
            "disk '%(name)s' because it has never been deployed.")

        admin_message = ugettext_noop(
            "The requested operation can't be performed on "
            "disk '%(name)s' (%(pk)s) [%(filename)s] because it has never been"
            "deployed.")

        def __init__(self, disk, params=None, **kwargs):
Guba Sándor committed
193
            super(Disk.DiskIsNotReady, self).__init__(
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
                disk, params, name=disk.name, pk=disk.pk,
                filename=disk.filename)

    class DiskBaseIsNotReady(DiskError):
        message = ugettext_noop(
            "The requested operation can't be performed on "
            "disk '%(name)s' because its base has never been deployed.")

        admin_message = ugettext_noop(
            "The requested operation can't be performed on "
            "disk '%(name)s' (%(pk)s) [%(filename)s] because its base "
            "'%(b_name)s' (%(b_pk)s) [%(b_filename)s] has never been"
            "deployed.")

        def __init__(self, disk, params=None, **kwargs):
209
            base = kwargs.get('base')
Guba Sándor committed
210
            super(Disk.DiskBaseIsNotReady, self).__init__(
211 212 213
                disk, params, name=disk.name, pk=disk.pk,
                filename=disk.filename, b_name=base.name,
                b_pk=base.pk, b_filename=base.filename)
Guba Sándor committed
214

215 216
    @property
    def path(self):
217 218
        """The path where the files are stored.
        """
219
        return join(self.datastore.path, self.filename)
220 221

    @property
222
    def vm_format(self):
223 224
        """Returns the proper file format for different type of images.
        """
225 226 227
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
228
            'iso': 'raw',
229 230 231 232
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

233
    @property
234
    def format(self):
235 236
        """Returns the proper file format for different types of images.
        """
237 238 239 240 241 242 243 244 245
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
            'iso': 'iso',
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

    @property
246
    def device_type(self):
247 248
        """Returns the proper device prefix for different types of images.
        """
249
        return {
250 251
            'qcow2-norm': 'vd',
            'qcow2-snap': 'vd',
252
            'iso': 'sd',
253 254 255
            'raw-ro': 'vd',
            'raw-rw': 'vd',
        }[self.type]
256

257
    @property
258 259 260
    def device_bus(self):
        """Returns the proper device prefix for different types of images.
        """
261 262
        if self.bus:
            return self.bus
263 264 265
        return {
            'qcow2-norm': 'virtio',
            'qcow2-snap': 'virtio',
266
            'iso': 'ide',
267 268 269 270 271
            'raw-ro': 'virtio',
            'raw-rw': 'virtio',
        }[self.type]

    @property
272
    def is_deletable(self):
273
        """True if the associated file can be deleted.
274
        """
275
        # Check if all children and the disk itself is destroyed.
276
        return (self.destroyed is not None) and self.children_deletable
277

278 279 280
    @property
    def children_deletable(self):
        """True if all children of the disk are deletable.
281
        """
282
        return all(i.is_deletable for i in self.derivatives.all())
283

284
    @property
285
    def is_in_use(self):
286
        """True if disk is attached to an active VM.
287 288 289 290

        'In use' means the disk is attached to a VM which is not STOPPED, as
        any other VMs leave the disk in an inconsistent state.
        """
291
        return any(i.state != 'STOPPED' for i in self.instance_set.all())
292

293
    def get_appliance(self):
Bach Dániel committed
294 295
        """Return the Instance or InstanceTemplate object where the disk
        is used
296
        """
Bach Dániel committed
297
        try:
298 299 300 301
            app = self.template_set.all() or self.instance_set.all()
            return app.get()
        except ObjectDoesNotExist:
            return None
302

303 304
    def get_exclusive(self):
        """Get an instance of the disk for exclusive usage.
305

306 307 308
        This method manipulates the database only.
        """
        type_mapping = {
309 310 311
            'qcow2-norm': 'qcow2-snap',
            'iso': 'iso',
            'raw-ro': 'raw-rw',
312 313 314
        }

        if self.type not in type_mapping.keys():
315
            raise self.WrongDiskTypeError(self)
316 317

        new_type = type_mapping[self.type]
318

319 320
        return Disk.create(base=self, datastore=self.datastore,
                           name=self.name, size=self.size,
321
                           type=new_type, dev_num=self.dev_num)
322 323

    def get_vmdisk_desc(self):
324 325
        """Serialize disk object to the vmdriver.
        """
326
        return {
327
            'source': self.path,
328
            'driver_type': self.vm_format,
329
            'driver_cache': 'none',
330
            'target_device': self.device_type + self.dev_num,
331
            'target_bus': self.device_bus,
332
            'disk_device': 'cdrom' if self.type == 'iso' else 'disk'
333 334
        }

335
    def get_disk_desc(self):
336 337
        """Serialize disk object to the storage driver.
        """
338 339 340 341 342 343
        return {
            'name': self.filename,
            'dir': self.datastore.path,
            'format': self.format,
            'size': self.size,
            'base_name': self.base.filename if self.base else None,
344
            'type': 'snapshot' if self.base else 'normal'
345 346
        }

347 348
    def get_remote_queue_name(self, queue_id='storage', priority=None,
                              check_worker=True):
349 350
        """Returns the proper queue name based on the datastore.
        """
351
        if self.datastore:
352 353
            return self.datastore.get_remote_queue_name(queue_id, priority,
                                                        check_worker)
354 355 356
        else:
            return None

357
    def __unicode__(self):
358
        return u"%s (#%d)" % (self.name, self.id or 0)
359

360
    def clean(self, *args, **kwargs):
Guba Sándor committed
361
        if (self.size is None or "") and self.base:
362 363 364
            self.size = self.base.size
        super(Disk, self).clean(*args, **kwargs)

365
    def deploy(self, user=None, task_uuid=None, timeout=15):
366 367 368 369 370
        """Reify the disk model on the associated data store.

        :param self: the disk model to reify
        :type self: storage.models.Disk

371 372 373 374 375 376 377
        :param user: The user who's issuing the command.
        :type user: django.contrib.auth.models.User

        :param task_uuid: The task's UUID, if the command is being executed
                          asynchronously.
        :type task_uuid: str

378 379 380 381
        :return: True if a new reification of the disk has been created;
                 otherwise, False.
        :rtype: bool
        """
382 383 384 385
        if self.destroyed:
            self.destroyed = None
            self.save()

386
        if self.is_ready:
387
            return True
388
        if self.base and not self.base.is_ready:
389
            raise self.DiskBaseIsNotReady(self, base=self.base)
Guba Sándor committed
390 391 392 393 394 395 396 397 398 399
        queue_name = self.get_remote_queue_name('storage', priority="fast")
        disk_desc = self.get_disk_desc()
        if self.base is not None:
            storage_tasks.snapshot.apply_async(args=[disk_desc],
                                               queue=queue_name
                                               ).get(timeout=timeout)
        else:
            storage_tasks.create.apply_async(args=[disk_desc],
                                             queue=queue_name
                                             ).get(timeout=timeout)
400

401 402
        self.is_ready = True
        self.save()
Guba Sándor committed
403
        return True
404

405
    @classmethod
Guba Sándor committed
406 407
    def create(cls, user=None, **params):
        disk = cls.__create(user, params)
Guba Sándor committed
408
        disk.clean()
409
        disk.save()
410 411
        logger.debug(u"Disk created from: %s",
                     unicode(params.get("base", "nobase")))
412
        return disk
413

414
    @classmethod
Guba Sándor committed
415 416 417 418
    def __create(cls, user, params):
        datastore = params.pop('datastore', DataStore.objects.get())
        filename = params.pop('filename', str(uuid.uuid4()))
        disk = cls(filename=filename, datastore=datastore, **params)
419
        return disk
420 421

    @classmethod
Guba Sándor committed
422
    def download(cls, url, task, user=None, **params):
423 424 425 426
        """Create disk object and download data from url synchronusly.

        :param url: image url to download.
        :type url: url
427 428
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
429 430
        :param user: owner of the disk
        :type user: django.contrib.auth.User
431 432
        :param task_uuid: UUID of the local task
        :param abortable_task: UUID of the remote running abortable task.
433

434 435
        :return: The created Disk object
        :rtype: Disk
436
        """
Guba Sándor committed
437
        params.setdefault('name', url.split('/')[-1])
438 439 440
        params.setdefault('type', 'iso')
        params.setdefault('size', None)
        disk = cls.__create(params=params, user=user)
Guba Sándor committed
441 442
        queue_name = disk.get_remote_queue_name('storage', priority='slow')
        remote = storage_tasks.download.apply_async(
443
            kwargs={'url': url, 'parent_id': task.request.id,
Guba Sándor committed
444 445 446 447
                    'disk': disk.get_disk_desc()},
            queue=queue_name)
        while True:
            try:
448
                result = remote.get(timeout=5)
Guba Sándor committed
449
                break
450
            except TimeoutError as e:
Guba Sándor committed
451 452
                if task is not None and task.is_aborted():
                    AbortableAsyncResult(remote.id).abort()
453 454
                    raise humanize_exception(ugettext_noop(
                        "Operation aborted by user."), e)
455 456
        disk.size = result['size']
        disk.type = result['type']
457
        disk.checksum = result.get('checksum', None)
458
        disk.is_ready = True
Guba Sándor committed
459
        disk.save()
460
        return disk
461

462
    def destroy(self, user=None, task_uuid=None):
463 464 465
        if self.destroyed:
            return False

Guba Sándor committed
466 467 468
        self.destroyed = timezone.now()
        self.save()
        return True
469

470
    def restore(self, user=None, task_uuid=None, timeout=15):
471
        """Recover destroyed disk from trash if possible.
472
        """
473 474 475 476 477 478 479
        queue_name = self.datastore.get_remote_queue_name(
            'storage', priority='slow')
        logger.info("Image: %s at Datastore: %s recovered from trash." %
                    (self.filename, self.datastore.path))
        storage_tasks.recover_from_trash.apply_async(
            args=[self.datastore.path, self.filename],
            queue=queue_name).get(timeout=timeout)
480

481
    def save_as(self, task=None, user=None, task_uuid=None, timeout=300):
482 483
        """Save VM as template.

484 485 486 487
        Based on disk type:
        qcow2-norm, qcow2-snap --> qcow2-norm
        iso                    --> iso (with base)

488 489 490
        VM must be in STOPPED state to perform this action.
        The timeout parameter is not used now.
        """
491
        mapping = {
492 493 494
            'qcow2-snap': ('qcow2-norm', None),
            'qcow2-norm': ('qcow2-norm', None),
            'iso': ("iso", self),
495 496
        }
        if self.type not in mapping.keys():
497
            raise self.WrongDiskTypeError(self)
498

499
        if self.is_in_use:
500 501
            raise self.DiskInUseError(self)

502
        if not self.is_ready:
Guba Sándor committed
503 504
            raise self.DiskIsNotReady(self)

505 506 507
        # from this point on, the caller has to guarantee that the disk is not
        # going to be used until the operation is complete

508
        new_type, new_base = mapping[self.type]
509

510 511
        disk = Disk.create(datastore=self.datastore,
                           base=new_base,
512
                           name=self.name, size=self.size,
513
                           type=new_type, dev_num=self.dev_num)
514

Guba Sándor committed
515
        queue_name = self.get_remote_queue_name("storage", priority="slow")
516 517
        remote = storage_tasks.merge.apply_async(kwargs={
            "old_json": self.get_disk_desc(),
518 519
            "new_json": disk.get_disk_desc(),
            "parent_id": task.request.id},
520 521 522 523 524 525
            queue=queue_name
        )  # Timeout
        while True:
            try:
                remote.get(timeout=5)
                break
526
            except TimeoutError as e:
527 528 529
                if task is not None and task.is_aborted():
                    AbortableAsyncResult(remote.id).abort()
                    disk.destroy()
530 531
                    raise humanize_exception(ugettext_noop(
                        "Operation aborted by user."), e)
532 533 534
            except:
                disk.destroy()
                raise
535 536
        disk.is_ready = True
        disk.save()
Guba Sándor committed
537
        return disk