vmdriver.py 16.4 KB
Newer Older
1
""" Driver for libvirt. """
tarokkk committed
2 3
import libvirt
import logging
4
import os
user committed
5
import sys
Guba Sándor committed
6 7
import socket
import json
8
from decorator import decorator
9
import lxml.etree as ET
10

11
from psutil import NUM_CPUS, virtual_memory, cpu_percent
12

13
from celery.contrib.abortable import AbortableTask
14

15
from vm import VMInstance, VMDisk, VMNetwork
Guba Sándor committed
16

17
from vmcelery import celery, lib_connection, to_bool
tarokkk committed
18

user committed
19 20
sys.path.append(os.path.dirname(os.path.basename(__file__)))

21
vm_xml_dump = None
tarokkk committed
22

Guba Sándor committed
23 24 25 26 27 28 29 30 31 32
state_dict = {0: 'NOSTATE',
              1: 'RUNNING',
              2: 'BLOCKED',
              3: 'PAUSED',
              4: 'SHUTDOWN',
              5: 'SHUTOFF',
              6: 'CRASHED',
              7: 'PMSUSPENDED'
              }

tarokkk committed
33

34
# class Singleton(type):
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
#
#    """ Singleton class."""
#
#    _instances = {}
#
#    def __call__(cls, *args, **kwargs):
#        if cls not in cls._instances:
#            cls._instances[cls] = super(Singleton, cls).__call__(*args,
#                                                                 **kwargs)
#        return cls._instances[cls]


class Connection(object):

    """ Singleton class to handle connection."""

#    __metaclass__ = Singleton
    connection = None

    @classmethod
    def get(cls):
        """ Return the libvirt connection."""

        return cls.connection

    @classmethod
    def set(cls, connection):
        """ Set the libvirt connection."""

        cls.connection = connection


67 68
@decorator
def req_connection(original_function, *args, **kw):
69 70 71 72 73 74
    """Connection checking decorator for libvirt.

    If envrionment variable LIBVIRT_KEEPALIVE is set
    it will use the connection from the celery worker.

    Return the decorateed function
75

76
    """
77
    logging.debug("Decorator running")
78
    if Connection.get() is None:
79 80 81 82 83 84 85 86 87 88 89 90 91
        connect()
        try:
            logging.debug("Decorator calling original function")
            return_value = original_function(*args, **kw)
        finally:
            logging.debug("Finally part of decorator")
            disconnect()
        return return_value
    else:
        logging.debug("Decorator calling original \
                        function with active connection")
        return_value = original_function(*args, **kw)
        return return_value
tarokkk committed
92 93


Guba Sándor committed
94 95
@decorator
def wrap_libvirtError(original_function, *args, **kw):
96 97 98 99 100
    """ Decorator to wrap libvirt error in simple Exception.

    Return decorated function

    """
Guba Sándor committed
101
    try:
102
        return original_function(*args, **kw)
Guba Sándor committed
103
    except libvirt.libvirtError as e:
104
        logging.error(e.get_error_message())
105 106 107 108
        e_msg = e.get_error_message()
        if vm_xml_dump is not None:
            e_msg += "\n"
            e_msg += vm_xml_dump
Guba Sándor committed
109 110 111 112 113 114
        new_e = Exception(e.get_error_message())
        new_e.libvirtError = True
        raise new_e


@wrap_libvirtError
tarokkk committed
115
def connect(connection_string='qemu:///system'):
116 117 118 119 120 121
    """ Connect to the libvirt daemon.

    String is specified in the connection_string parameter
    the default is the local root.

    """
122
    if not to_bool(os.getenv('LIBVIRT_KEEPALIVE', "False")):
123 124
        if Connection.get() is None:
            Connection.set(libvirt.open(connection_string))
125 126 127
            logging.debug("Connection estabilished to libvirt.")
        else:
            logging.debug("There is already an active connection to libvirt.")
tarokkk committed
128
    else:
129
        Connection.set(lib_connection)
Guba Sándor committed
130
        logging.debug("Using celery libvirt connection connection.")
tarokkk committed
131

tarokkk committed
132

Guba Sándor committed
133
@wrap_libvirtError
tarokkk committed
134
def disconnect():
135
    """ Disconnect from the active libvirt daemon connection."""
136
    if os.getenv('LIBVIRT_KEEPALIVE') is None:
137
        if Connection.get() is None:
138 139
            logging.debug('There is no available libvirt conection.')
        else:
140
            Connection.get().close()
141
            logging.debug('Connection closed to libvirt.')
142
            Connection.set(None)
tarokkk committed
143
    else:
144
        logging.debug('Keepalive connection should not close.')
tarokkk committed
145 146


Guba Sándor committed
147
@celery.task
tarokkk committed
148
@req_connection
Guba Sándor committed
149
@wrap_libvirtError
tarokkk committed
150
def define(vm):
151 152
    """ Define permanent virtual machine from xml. """
    Connection.get().defineXML(vm.dump_xml())
tarokkk committed
153 154 155
    logging.info("Virtual machine %s is defined from xml", vm.name)


Guba Sándor committed
156
@celery.task
tarokkk committed
157
@req_connection
Guba Sándor committed
158
@wrap_libvirtError
159
def create(vm_desc):
160 161 162
    """ Create and start non-permanent virtual machine from xml.

    Return the domain info dict.
tarokkk committed
163 164 165 166 167 168
    flags can be:
        VIR_DOMAIN_NONE = 0
        VIR_DOMAIN_START_PAUSED = 1
        VIR_DOMAIN_START_AUTODESTROY = 2
        VIR_DOMAIN_START_BYPASS_CACHE = 4
        VIR_DOMAIN_START_FORCE_BOOT = 8
169 170

    """
171 172
    vm = VMInstance.deserialize(vm_desc)
    # Setting proper hypervisor
173
    vm.vm_type = os.getenv("HYPERVISOR_TYPE", "test")
174 175
    if vm.vm_type == "test":
        vm.arch = "i686"
176 177
    vm_xml_dump = vm.dump_xml()
    logging.info(vm_xml_dump)
178 179
    # Emulating DOMAIN_START_PAUSED FLAG behaviour on test driver
    if vm.vm_type == "test":
180
        Connection.get().createXML(
181
            vm_xml_dump, libvirt.VIR_DOMAIN_NONE)
182
        domain = lookupByName(vm.name)
183
        domain.suspend()
Guba Sándor committed
184
    # Real driver create
185
    else:
186
        Connection.get().createXML(
187
            vm_xml_dump, libvirt.VIR_DOMAIN_START_PAUSED)
188
        logging.info("Virtual machine %s is created from xml", vm.name)
Guba Sándor committed
189 190 191 192 193 194 195 196 197
    # context
    try:
        sock = socket.create_connection(('127.0.0.1', 1235), 3)
        data = {'boot_token': vm.boot_token,
                'socket': '/var/lib/libvirt/serial/%s' % vm.name}
        sock.sendall(json.dumps(data))
        sock.close()
    except socket.error:
        logging.error('Unable to connect to context server')
198
    return vm_xml_dump
tarokkk committed
199 200


201
class shutdown(AbortableTask):
202
    """ Shutdown virtual machine (need ACPI support).
203 204 205
    Return When domain is missiing.
    This job is abortable:
        AbortableAsyncResult(id="<<jobid>>").abort()
206
    """
207 208 209
    time_limit = 120

    @req_connection
210
    def run(self, args):
211
        from time import sleep
212
        name, = args
213 214 215 216 217 218 219 220 221 222 223
        try:
            domain = lookupByName(name)
            domain.shutdown()
            while True:
                try:
                    Connection.get().lookupByName(name)
                except libvirt.libvirtError as e:
                    if e.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN:
                        return
                    else:
                        raise
224
                else:
225
                    if self.is_aborted():
226 227 228 229 230 231 232
                        logging.info("Shutdown aborted on vm: %s", name)
                        return
                    sleep(5)
        except libvirt.libvirtError as e:
            new_e = Exception(e.get_error_message())
            new_e.libvirtError = True
            raise new_e
233 234 235 236


@celery.task
@req_connection
Guba Sándor committed
237
@wrap_libvirtError
tarokkk committed
238
def delete(name):
239
    """ Destroy the running called 'name' virtual machine. """
tarokkk committed
240 241 242 243
    domain = lookupByName(name)
    domain.destroy()


Guba Sándor committed
244
@celery.task
tarokkk committed
245
@req_connection
Guba Sándor committed
246
@wrap_libvirtError
tarokkk committed
247
def list_domains():
248 249 250 251 252
    """ List the running domains.

    :return list: List of domains name in host.

    """
253
    domain_list = []
254 255
    for i in Connection.get().listDomainsID():
        dom = Connection.get().lookupByID(i)
256
        domain_list.append(dom.name())
257
    return domain_list
tarokkk committed
258 259


Guba Sándor committed
260
@celery.task
tarokkk committed
261
@req_connection
Guba Sándor committed
262
@wrap_libvirtError
263 264 265 266 267 268 269 270 271
def list_domains_info():
    """ List the running domains.

    :return list: List of domains info dict.

    """
    domain_list = []
    for i in Connection.get().listDomainsID():
        dom = Connection.get().lookupByID(i)
272 273 274
        domain_dict = _parse_info(dom.info())
        domain_dict['name'] = dom.name()
        domain_list.append(domain_dict)
275 276 277 278 279 280
    return domain_list


@celery.task
@req_connection
@wrap_libvirtError
tarokkk committed
281
def lookupByName(name):
282 283
    """ Return with the requested Domain. """
    return Connection.get().lookupByName(name)
tarokkk committed
284 285


Guba Sándor committed
286
@celery.task
tarokkk committed
287
@req_connection
Guba Sándor committed
288
@wrap_libvirtError
tarokkk committed
289
def undefine(name):
290 291 292 293 294
    """ Undefine an already defined virtual machine.

    If it's running it becomes transient (lost on reboot)

    """
tarokkk committed
295 296 297 298
    domain = lookupByName(name)
    domain.undefine()


Guba Sándor committed
299
@celery.task
tarokkk committed
300
@req_connection
Guba Sándor committed
301
@wrap_libvirtError
302
def start(name):
303 304
    """ Start an already defined virtual machine."""

305
    domain = lookupByName(name)
tarokkk committed
306 307 308
    domain.create()


Guba Sándor committed
309
@celery.task
tarokkk committed
310
@req_connection
Guba Sándor committed
311
@wrap_libvirtError
312
def suspend(name):
313 314 315 316 317 318
    """ Stop virtual machine and keep memory in RAM.

    Return the domain info dict.

    """

319 320
    domain = lookupByName(name)
    domain.suspend()
321
    return _parse_info(domain.info())
322 323 324 325


@celery.task
@req_connection
Guba Sándor committed
326
@wrap_libvirtError
327
def save(name, path):
328 329
    """ Stop virtual machine and save its memory to path. """

330
    domain = lookupByName(name)
tarokkk committed
331 332 333
    domain.save(path)


Guba Sándor committed
334
@celery.task
tarokkk committed
335
@req_connection
Guba Sándor committed
336
@wrap_libvirtError
337
def restore(name, path):
338 339 340 341 342 343 344 345
    """ Restore a saved virtual machine.

    Restores the virtual machine from the memory image
    stored at path.
    Return the domain info dict.

    """
    Connection.get().restore(path)
346
    return domain_info(name)
Guba Sándor committed
347 348


Guba Sándor committed
349
@celery.task
Guba Sándor committed
350
@req_connection
Guba Sándor committed
351
@wrap_libvirtError
352
def resume(name):
353 354 355 356 357 358
    """ Resume stopped virtual machines.

    Return the domain info dict.

    """

359
    domain = lookupByName(name)
tarokkk committed
360
    domain.resume()
361
    return _parse_info(domain.info())
tarokkk committed
362 363


Guba Sándor committed
364
@celery.task
tarokkk committed
365
@req_connection
Guba Sándor committed
366
@wrap_libvirtError
367
def reset(name):
368 369 370 371 372 373
    """ Reset (power reset) virtual machine.

    Return the domain info dict.

    """

374
    domain = lookupByName(name)
375
    domain.reset(0)
376
    return _parse_info(domain.info())
tarokkk committed
377 378


Guba Sándor committed
379
@celery.task
tarokkk committed
380
@req_connection
Guba Sándor committed
381
@wrap_libvirtError
382
def reboot(name):
383 384 385 386 387
    """ Reboot (with guest acpi support) virtual machine.

    Return the domain info dict.

    """
388
    domain = lookupByName(name)
389
    domain.reboot(0)
390
    return _parse_info(domain.info())
391 392


Guba Sándor committed
393
@celery.task
394
@req_connection
Guba Sándor committed
395
@wrap_libvirtError
396
def node_info():
397 398 399 400
    """ Get info from Host as dict.

    Return dict:

401 402 403 404 405 406 407 408 409 410 411 412
    model   string indicating the CPU model
    memory  memory size in kilobytes
    cpus    the number of active CPUs
    mhz     expected CPU frequency
    nodes    the number of NUMA cell, 1 for unusual NUMA
             topologies or uniform memory access;
             check capabilities XML for the actual NUMA topology
    sockets  number of CPU sockets per node if nodes > 1,
             1 in case of unusual NUMA topology
    cores    number of cores per socket, total number of
             processors in case of unusual NUMA topolog
    threads  number of threads per core, 1 in case of unusual numa topology
413 414 415

    """

416 417
    keys = ['model', 'memory', 'cpus', 'mhz',
            'nodes', 'sockets', 'cores', 'threads']
418
    values = Connection.get().getInfo()
419 420 421
    return dict(zip(keys, values))


422
def _parse_info(values):
423 424 425 426 427 428
    """ Parse libvirt domain info into dict.

    Return the info dict.

    """

429 430 431 432 433 434 435
    keys = ['state', 'maxmem', 'memory', 'virtcpunum', 'cputime']
    info = dict(zip(keys, values))
    # Change state to proper ENUM
    info['state'] = state_dict[info['state']]
    return info


Guba Sándor committed
436
@celery.task
437
@req_connection
Guba Sándor committed
438
@wrap_libvirtError
439
def domain_info(name):
440 441 442
    """ Get the domain info from libvirt.

    Return the domain info dict:
443 444 445 446 447
    state   the running state, one of virDomainState
    maxmem  the maximum memory in KBytes allowed
    memory  the memory in KBytes used by the domain
    virtcpunum    the number of virtual CPUs for the domain
    cputime    the CPU time used in nanoseconds
448 449

    """
450
    dom = lookupByName(name)
451
    return _parse_info(dom.info())
452 453


Guba Sándor committed
454
@celery.task
455
@req_connection
Guba Sándor committed
456
@wrap_libvirtError
457
def network_info(name, network):
458 459
    """ Return the network info dict.

460 461 462 463 464 465 466 467
    rx_bytes
    rx_packets
    rx_errs
    rx_drop
    tx_bytes
    tx_packets
    tx_errs
    tx_drop
468 469

    """
470 471 472 473 474 475 476 477
    keys = ['rx_bytes', 'rx_packets', 'rx_errs', 'rx_drop',
            'tx_bytes', 'tx_packets', 'tx_errs', 'tx_drop']
    dom = lookupByName(name)
    values = dom.interfaceStats(network)
    info = dict(zip(keys, values))
    return info


Guba Sándor committed
478
@celery.task
479
@req_connection
Guba Sándor committed
480
@wrap_libvirtError
481
def send_key(name, key_code):
482 483 484 485 486 487
    """ Sending linux key_code to the name vm.

    key_code can be optained from linux_keys.py
    e.x: linuxkeys.KEY_RIGHTCTRL

    """
488 489 490 491 492
    domain = lookupByName(name)
    domain.sendKey(libvirt.VIR_KEYCODE_SET_LINUX, 100, [key_code], 1, 0)


def _stream_handler(stream, buf, opaque):
493
    opaque.write(buf)
494 495


Guba Sándor committed
496
@celery.task
497
@req_connection
Guba Sándor committed
498
@wrap_libvirtError
499
def screenshot(name):
500
    """Save screenshot of virtual machine.
501
    Returns a ByteIO object that contains the screenshot in png format.
502
    """
503 504
    from io import BytesIO
    from PIL import Image
505 506 507 508 509 510 511 512
    # Import linuxkeys to get defines
    import linuxkeys
    # Connection need for the stream object
    domain = lookupByName(name)
    # Send key to wake up console
    domain.sendKey(libvirt.VIR_KEYCODE_SET_LINUX,
                   100, [linuxkeys.KEY_RIGHTCTRL], 1, 0)
    # Create Stream to get data
513
    stream = Connection.get().newStream(0)
514 515
    # Take screenshot accessible by stream (return mimetype)
    domain.screenshot(stream, 0, 0)
516
    # Get file to save data (send on AMQP?)
517
    fd = BytesIO()
518 519 520 521 522
    try:
        # Save data with handler
        stream.recvAll(_stream_handler, fd)
    finally:
        stream.finish()
523 524
    # Convert ppm to png
    # Seek to the beginning of the stream
525 526 527 528 529 530
    fd.seek(0)
    # Get the image
    image = BytesIO()
    ppm = Image.open(fd)
    ppm.save(image, format='PNG')
    return image
531 532


Guba Sándor committed
533
@celery.task
534
@req_connection
Guba Sándor committed
535
@wrap_libvirtError
536
def migrate(name, host, live=False):
537
    """ Migrate domain to host. """
538 539 540 541 542 543 544 545 546
    flags = libvirt.VIR_MIGRATE_PEER2PEER
    if live:
        flags = flags | libvirt.VIR_MIGRATE_LIVE
    domain = lookupByName(name)
    domain.migrateToURI(
        duri="qemu+tcp://" + host + "/system",
        flags=flags,
        dname=name,
        bandwidth=0)
547
    # return _parse_info(domain.info())
Őry Máté committed
548

549

Őry Máté committed
550
@celery.task
551 552 553
@req_connection
@wrap_libvirtError
def attach_disk(name, disk):
Guba Sándor committed
554
    """ Attach Disk to a running virtual machine. """
555 556 557 558 559 560 561 562 563
    domain = lookupByName(name)
    disk = VMDisk.deserialize(disk)
    domain.attachDevice(disk.dump_xml())


@celery.task
@req_connection
@wrap_libvirtError
def detach_disk(name, disk):
Guba Sándor committed
564
    """ Detach disk from a running virtual machine. """
565 566 567
    domain = lookupByName(name)
    disk = VMDisk.deserialize(disk)
    domain.detachDevice(disk.dump_xml())
Guba Sándor committed
568 569 570 571 572 573 574 575 576 577 578 579 580
    # Libvirt does NOT report failed detach so test it.
    __check_detach(domain, disk.source)


def __check_detach(domain, disk):
    """ Test if detach was successfull by searching
    for disk in the XML"""
    xml = domain.XMLDesc()
    root = ET.fromstring(xml)
    devices = root.find('devices')
    for d in devices.findall("disk"):
        if disk in d.find('source').attrib.values()[0]:
            raise Exception("Disk could not been detached. "
Guba Sándor committed
581 582
                            "Check if hot plug support is "
                            "enabled (acpiphp module on Linux).")
583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604


@celery.task
@req_connection
@wrap_libvirtError
def attach_network(name, net):
    domain = lookupByName(name)
    net = VMNetwork.deserialize(net)
    logging.error(net.dump_xml())
    domain.attachDevice(net.dump_xml())


@celery.task
@req_connection
@wrap_libvirtError
def detach_network(name, net):
    domain = lookupByName(name)
    net = VMNetwork.deserialize(net)
    domain.detachDevice(net.dump_xml())


@celery.task
Guba Sándor committed
605 606 607 608
@req_connection
@wrap_libvirtError
def resize_disk(name, path, size):
    domain = lookupByName(name)
609 610 611
    # domain.blockResize(path, int(size),
    #                    flags=libvirt.VIR_DOMAIN_BLOCK_RESIZE_BYTES)
    # To be compatible with libvirt < 0.9.11
612
    domain.blockResize(path, int(size)/1024, 0)
Guba Sándor committed
613 614 615


@celery.task
Őry Máté committed
616 617
def ping():
    return True
618 619 620


@celery.task
621 622 623 624 625 626 627 628 629
@req_connection
@wrap_libvirtError
def get_architecture():
    xml = Connection.get().getCapabilities()
    return ET.fromstring(xml).getchildren()[0].getchildren(
    )[1].getchildren()[0].text


@celery.task
630 631 632
def get_core_num():
    return NUM_CPUS

Őry Máté committed
633

634 635 636
@celery.task
def get_ram_size():
    return virtual_memory().total
637 638 639


@celery.task
640 641
def get_driver_version():
    from git import Repo
642 643
    try:
        repo = Repo(path=os.getcwd())
644 645 646
        lc = repo.head.commit
        return {'branch': repo.active_branch.name,
                'commit': lc.hexsha,
647
                'commit_text': lc.summary,
648 649 650
                'is_dirty': repo.is_dirty()}
    except Exception as e:
        logging.exception("Unhandled exception: %s", e)
651
        return None
652 653 654


@celery.task
655 656 657
def get_info():
    return {'core_num': get_core_num(),
            'ram_size': get_ram_size(),
658 659
            'architecture': get_architecture(),
            'driver_version': get_driver_version()}
660 661 662


@celery.task
663 664 665
def get_node_metrics():
    result = {}
    result['cpu.usage'] = cpu_percent(0)
666
    result['memory.usage'] = virtual_memory().percent
667
    return result