models.py 17.1 KB
Newer Older
1 2
# coding=utf-8

3
from contextlib import contextmanager
4
import logging
5
from os.path import join
6 7
import uuid

8
from django.db.models import (Model, BooleanField, CharField, DateTimeField,
9
                              ForeignKey)
10
from django.utils import timezone
11
from django.utils.translation import ugettext_lazy as _
12
from model_utils.models import TimeStampedModel
13
from sizefield.models import FileSizeField
14
from datetime import timedelta
15

16
from acl.models import AclBase
17
from .tasks import local_tasks, remote_tasks
18
from celery.exceptions import TimeoutError
19
from manager.mancelery import celery
20
from common.models import ActivityModel, activitycontextimpl, WorkerNotFound
21 22 23 24

logger = logging.getLogger(__name__)


25
class DataStore(Model):
Guba Sándor committed
26

27 28
    """Collection of virtual disks.
    """
29 30 31 32
    name = CharField(max_length=100, unique=True, verbose_name=_('name'))
    path = CharField(max_length=200, unique=True, verbose_name=_('path'))
    hostname = CharField(max_length=40, unique=True,
                         verbose_name=_('hostname'))
Guba Sándor committed
33

34 35 36 37 38 39 40 41
    class Meta:
        ordering = ['name']
        verbose_name = _('datastore')
        verbose_name_plural = _('datastores')

    def __unicode__(self):
        return u'%s (%s)' % (self.name, self.path)

42
    def get_remote_queue_name(self, queue_id, check_worker=True):
43 44
        logger.debug("Checking for storage queue %s.%s",
                     self.hostname, queue_id)
45 46
        if not check_worker or local_tasks.check_queue(self.hostname,
                                                       queue_id):
47 48 49
            return self.hostname + '.' + queue_id
        else:
            raise WorkerNotFound()
50

51 52 53
    def get_deletable_disks(self):
        return [disk.filename for disk in
                self.disk_set.filter(
54
                    destroyed__isnull=False) if disk.is_deletable]
55

56

57
class Disk(AclBase, TimeStampedModel):
Guba Sándor committed
58

59 60
    """A virtual disk.
    """
61 62 63 64 65
    ACL_LEVELS = (
        ('user', _('user')),          # see all details
        ('operator', _('operator')),
        ('owner', _('owner')),        # superuser, can delete, delegate perms
    )
66 67
    TYPES = [('qcow2-norm', 'qcow2 normal'), ('qcow2-snap', 'qcow2 snapshot'),
             ('iso', 'iso'), ('raw-ro', 'raw read-only'), ('raw-rw', 'raw')]
68
    name = CharField(blank=True, max_length=100, verbose_name=_("name"))
69 70
    filename = CharField(max_length=256, unique=True,
                         verbose_name=_("filename"))
71 72 73
    datastore = ForeignKey(DataStore, verbose_name=_("datastore"),
                           help_text=_("The datastore that holds the disk."))
    type = CharField(max_length=10, choices=TYPES)
74
    size = FileSizeField()
75 76
    base = ForeignKey('self', blank=True, null=True,
                      related_name='derivatives')
77 78
    ready = BooleanField(default=False,
                         help_text=_("The associated resource is ready."))
79
    dev_num = CharField(default='a', max_length=1,
80
                        verbose_name=_("device number"))
81
    destroyed = DateTimeField(blank=True, default=None, null=True)
82 83 84 85 86 87

    class Meta:
        ordering = ['name']
        verbose_name = _('disk')
        verbose_name_plural = _('disks')

88 89
    class WrongDiskTypeError(Exception):

90 91 92 93 94 95 96 97
        def __init__(self, type, message=None):
            if message is None:
                message = ("Operation can't be invoked on a disk of type '%s'."
                           % type)

            Exception.__init__(self, message)

            self.type = type
98

99 100
    class DiskInUseError(Exception):

101 102 103 104
        def __init__(self, disk, message=None):
            if message is None:
                message = ("The requested operation can't be performed on "
                           "disk '%s (%s)' because it is in use." %
Dudás Ádám committed
105
                           (disk.name, disk.filename))
106 107 108 109

            Exception.__init__(self, message)

            self.disk = disk
110

111 112
    @property
    def path(self):
113 114
        """The path where the files are stored.
        """
115
        return join(self.datastore.path, self.filename)
116 117

    @property
118
    def vm_format(self):
119 120
        """Returns the proper file format for different type of images.
        """
121 122 123
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
124
            'iso': 'raw',
125 126 127 128
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

129
    @property
130
    def format(self):
131 132
        """Returns the proper file format for different types of images.
        """
133 134 135 136 137 138 139 140 141
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
            'iso': 'iso',
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

    @property
142
    def device_type(self):
143 144
        """Returns the proper device prefix for different types of images.
        """
145
        return {
146 147
            'qcow2-norm': 'vd',
            'qcow2-snap': 'vd',
148
            'iso': 'hd',
149 150 151
            'raw-ro': 'vd',
            'raw-rw': 'vd',
        }[self.type]
152

153 154 155 156 157 158 159 160 161 162 163 164 165
    def is_downloading(self):
        da = DiskActivity.objects.filter(disk=self).latest("created")
        return (da.activity_code == "storage.Disk.download"
                and da.succeeded is None)

    def get_download_percentage(self):
        if not self.is_downloading():
            return None

        task = DiskActivity.objects.latest("created").task_uuid
        result = celery.AsyncResult(id=task)
        return result.info.get("percent")

166
    @property
167
    def is_deletable(self):
168
        """True if the associated file can be deleted.
169
        """
170
        # Check if all children and the disk itself is destroyed.
171 172
        yesterday = timezone.now() - timedelta(days=1)
        return (self.destroyed is not None
173
                and self.destroyed < yesterday) and self.children_deletable
174

175 176 177
    @property
    def children_deletable(self):
        """True if all children of the disk are deletable.
178
        """
179
        return all(i.is_deletable for i in self.derivatives.all())
180

181
    @property
182
    def is_in_use(self):
183
        """True if disk is attached to an active VM.
184 185 186 187

        'In use' means the disk is attached to a VM which is not STOPPED, as
        any other VMs leave the disk in an inconsistent state.
        """
188
        return any(i.state != 'STOPPED' for i in self.instance_set.all())
189

190 191
    def get_exclusive(self):
        """Get an instance of the disk for exclusive usage.
192

193 194 195
        This method manipulates the database only.
        """
        type_mapping = {
196 197 198
            'qcow2-norm': 'qcow2-snap',
            'iso': 'iso',
            'raw-ro': 'raw-rw',
199 200 201 202 203 204
        }

        if self.type not in type_mapping.keys():
            raise self.WrongDiskTypeError(self.type)

        new_type = type_mapping[self.type]
205

206 207 208
        return Disk.create(base=self, datastore=self.datastore,
                           name=self.name, size=self.size,
                           type=new_type)
209 210

    def get_vmdisk_desc(self):
211 212
        """Serialize disk object to the vmdriver.
        """
213
        return {
214
            'source': self.path,
215
            'driver_type': self.vm_format,
216
            'driver_cache': 'none',
217
            'target_device': self.device_type + self.dev_num,
218
            'disk_device': 'cdrom' if self.type == 'iso' else 'disk'
219 220
        }

221
    def get_disk_desc(self):
222 223
        """Serialize disk object to the storage driver.
        """
224 225 226 227 228 229
        return {
            'name': self.filename,
            'dir': self.datastore.path,
            'format': self.format,
            'size': self.size,
            'base_name': self.base.filename if self.base else None,
230
            'type': 'snapshot' if self.base else 'normal'
231 232
        }

233
    def get_remote_queue_name(self, queue_id='storage', check_worker=True):
234 235
        """Returns the proper queue name based on the datastore.
        """
236
        if self.datastore:
237
            return self.datastore.get_remote_queue_name(queue_id, check_worker)
238 239 240
        else:
            return None

241
    def __unicode__(self):
242
        return u"%s (#%d)" % (self.name, self.id or 0)
243

244 245 246 247 248
    def clean(self, *args, **kwargs):
        if self.size == "" and self.base:
            self.size = self.base.size
        super(Disk, self).clean(*args, **kwargs)

249
    def deploy(self, user=None, task_uuid=None, timeout=15):
250 251 252 253 254
        """Reify the disk model on the associated data store.

        :param self: the disk model to reify
        :type self: storage.models.Disk

255 256 257 258 259 260 261
        :param user: The user who's issuing the command.
        :type user: django.contrib.auth.models.User

        :param task_uuid: The task's UUID, if the command is being executed
                          asynchronously.
        :type task_uuid: str

262 263 264 265
        :return: True if a new reification of the disk has been created;
                 otherwise, False.
        :rtype: bool
        """
266 267 268 269
        if self.destroyed:
            self.destroyed = None
            self.save()

270
        if self.ready:
271
            return False
272

273 274 275 276
        with disk_activity(code_suffix='deploy', disk=self,
                           task_uuid=task_uuid, user=user) as act:

            # Delegate create / snapshot jobs
277
            queue_name = self.get_remote_queue_name('storage')
278
            disk_desc = self.get_disk_desc()
279
            if self.base is not None:
280 281
                with act.sub_activity('creating_snapshot'):
                    remote_tasks.snapshot.apply_async(args=[disk_desc],
282 283
                                                      queue=queue_name
                                                      ).get(timeout=timeout)
284 285 286
            else:
                with act.sub_activity('creating_disk'):
                    remote_tasks.create.apply_async(args=[disk_desc],
287 288
                                                    queue=queue_name
                                                    ).get(timeout=timeout)
289 290 291

            self.ready = True
            self.save()
292

293
            return True
294

295
    def deploy_async(self, user=None):
296 297
        """Execute deploy asynchronously.
        """
298 299
        return local_tasks.deploy.apply_async(args=[self, user],
                                              queue="localhost.man")
300

301 302
    @classmethod
    def create(cls, **params):
303 304
        datastore = params.pop('datastore', DataStore.objects.get())
        disk = cls(filename=str(uuid.uuid4()), datastore=datastore, **params)
305 306
        disk.save()
        return disk
307

308
    @classmethod
309
    def create_empty(cls, instance=None, user=None, **kwargs):
310 311
        """Create empty Disk object.

312 313
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
314
        :param user: Creator of the disk.
315
        :type user: django.contrib.auth.User
316 317

        :return: Disk object without a real image, to be .deploy()ed later.
318
        """
319 320
        disk = cls.create(**kwargs)
        with disk_activity(code_suffix="create", user=user, disk=disk):
321 322 323
            if instance:
                instance.disks.add(disk)
            return disk
324 325

    @classmethod
326
    def create_from_url_async(cls, url, instance=None, user=None, **kwargs):
327
        """Create disk object and download data from url asynchrnously.
328

329 330
        :param url: URL of image to download.
        :type url: string
331 332
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
333 334 335 336 337 338
        :param user: owner of the disk
        :type user: django.contrib.auth.User

        :return: Task
        :rtype: AsyncResult
        """
339 340
        kwargs.update({'cls': cls, 'url': url,
                       'instance': instance, 'user': user})
cloud committed
341 342
        return local_tasks.create_from_url.apply_async(
            kwargs=kwargs, queue='localhost.man')
343

344
    @classmethod
345 346
    def create_from_url(cls, url, instance=None, user=None,
                        task_uuid=None, abortable_task=None, **kwargs):
347 348 349 350
        """Create disk object and download data from url synchronusly.

        :param url: image url to download.
        :type url: url
351 352
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
353 354
        :param user: owner of the disk
        :type user: django.contrib.auth.User
355 356
        :param task_uuid: TODO
        :param abortable_task: TODO
357

358 359
        :return: The created Disk object
        :rtype: Disk
360
        """
361
        kwargs.setdefault('name', url.split('/')[-1])
362
        disk = Disk.create(type="iso", size=1, **kwargs)
363
        # TODO get proper datastore
364
        disk.datastore = DataStore.objects.get()
365 366
        if instance:
            instance.disks.add(disk)
367 368
        queue_name = disk.get_remote_queue_name('storage')

369 370 371 372 373 374 375 376 377
        def __on_abort(activity, error):
            activity.disk.destroyed = timezone.now()
            activity.disk.save()

        if abortable_task:
            from celery.contrib.abortable import AbortableAsyncResult

            class AbortException(Exception):
                pass
378 379

        with disk_activity(code_suffix='download', disk=disk,
380 381 382 383 384 385 386 387 388 389 390 391 392 393
                           task_uuid=task_uuid, user=user,
                           on_abort=__on_abort):
            result = remote_tasks.download.apply_async(
                kwargs={'url': url, 'parent_id': task_uuid,
                        'disk': disk.get_disk_desc()},
                queue=queue_name)
            while True:
                try:
                    size = result.get(timeout=5)
                    break
                except TimeoutError:
                    if abortable_task and abortable_task.is_aborted():
                        AbortableAsyncResult(result.id).abort()
                        raise AbortException("Download aborted by user.")
394
            disk.size = size
395
            disk.ready = True
396
            disk.save()
397
        return disk
398

399
    def destroy(self, user=None, task_uuid=None):
400 401 402
        if self.destroyed:
            return False

403 404 405 406
        with disk_activity(code_suffix='destroy', disk=self,
                           task_uuid=task_uuid, user=user):
            self.destroyed = timezone.now()
            self.save()
407

408
            return True
409

410
    def destroy_async(self, user=None):
411 412
        """Execute destroy asynchronously.
        """
413 414
        return local_tasks.destroy.apply_async(args=[self, user],
                                               queue='localhost.man')
415

416
    def restore(self, user=None, task_uuid=None):
417
        """Recover destroyed disk from trash if possible.
418 419 420 421 422 423 424 425
        """
        # TODO
        pass

    def restore_async(self, user=None):
        local_tasks.restore.apply_async(args=[self, user],
                                        queue='localhost.man')

426 427 428 429 430
    def save_as_async(self, disk, task_uuid=None, timeout=300, user=None):
        return local_tasks.save_as.apply_async(args=[disk, timeout, user],
                                               queue="localhost.man")

    def save_as(self, user=None, task_uuid=None, timeout=300):
431 432 433 434 435
        """Save VM as template.

        VM must be in STOPPED state to perform this action.
        The timeout parameter is not used now.
        """
436 437 438 439 440 441
        mapping = {
            'qcow2-snap': ('qcow2-norm', self.base),
        }
        if self.type not in mapping.keys():
            raise self.WrongDiskTypeError(self.type)

442
        if self.is_in_use:
443 444 445 446 447
            raise self.DiskInUseError(self)

        # from this point on, the caller has to guarantee that the disk is not
        # going to be used until the operation is complete

448
        new_type, new_base = mapping[self.type]
449

450 451 452
        disk = Disk.create(base=new_base, datastore=self.datastore,
                           name=self.name, size=self.size,
                           type=new_type)
453

454 455 456
        disk.save()
        with disk_activity(code_suffix="save_as", disk=self,
                           user=user, task_uuid=None):
457
            queue_name = self.get_remote_queue_name('storage')
458 459
            remote_tasks.merge.apply_async(args=[self.get_disk_desc(),
                                                 disk.get_disk_desc()],
460
                                           queue=queue_name
461
                                           ).get()  # Timeout
462 463 464
            disk.ready = True
            disk.save()

465
        return disk
466 467 468 469 470 471 472 473


class DiskActivity(ActivityModel):
    disk = ForeignKey(Disk, related_name='activity_log',
                      help_text=_('Disk this activity works on.'),
                      verbose_name=_('disk'))

    @classmethod
474
    def create(cls, code_suffix, disk, task_uuid=None, user=None):
475
        act = cls(activity_code='storage.Disk.' + code_suffix,
476
                  disk=disk, parent=None, started=timezone.now(),
477
                  task_uuid=task_uuid, user=user)
478
        act.save()
479
        return act
480

481 482 483
    def create_sub(self, code_suffix, task_uuid=None):
        act = DiskActivity(
            activity_code=self.activity_code + '.' + code_suffix,
484
            disk=self.disk, parent=self, started=timezone.now(),
485 486 487
            task_uuid=task_uuid, user=self.user)
        act.save()
        return act
488

489 490 491
    @contextmanager
    def sub_activity(self, code_suffix, task_uuid=None):
        act = self.create_sub(code_suffix, task_uuid)
492
        return activitycontextimpl(act)
493

494

495
@contextmanager
496 497
def disk_activity(code_suffix, disk, task_uuid=None, user=None,
                  on_abort=None, on_commit=None):
498
    act = DiskActivity.create(code_suffix, disk, task_uuid, user)
499
    return activitycontextimpl(act, on_abort=on_abort, on_commit=on_commit)