From 4b88e9f40a4d53365391bde41eb46f741656c65c Mon Sep 17 00:00:00 2001 From: Bálint Máhonfai <mbalint314@gmail.com> Date: Mon, 11 Nov 2019 15:00:32 +0100 Subject: [PATCH] Implement uploading exported disk to storeserver, remove disk from the datastore after upload --- disk.py | 302 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------------------------------------------------------------------------------------------------------------------------- requirements/base.txt | 1 + storagedriver.py | 4 ++-- 3 files changed, 166 insertions(+), 141 deletions(-) diff --git a/disk.py b/disk.py index b616d51..867c0ad 100644 --- a/disk.py +++ b/disk.py @@ -11,6 +11,7 @@ from time import sleep from hashlib import md5 import re +from requests_toolbelt import MultipartEncoder import requests logger = logging.getLogger(__name__) @@ -265,89 +266,152 @@ class Disk(object): raise Exception("Invalid file format. Only qcow and " "iso files are allowed. Image from: %s" % url) - def export(self, format): + def export(self, format, exported_name, upload_link): format_dict = { 'vmdk': 'vmdk', 'qcow2': 'qcow2', 'vdi': 'vdi', 'vpc': 'vhd', } + exported_path = self.get_path() + '.' + format_dict[format] cmdline = ['qemu-img', 'convert', '-O', format, self.get_path(), - self.get_path() + '.' + format_dict[format]] + exported_path] subprocess.check_output(cmdline) - def extract_iso_from_zip(self, disk_path): - with ZipFile(disk_path, 'r') as z: - isos = z.namelist() - if len(isos) != 1: - isos = [i for i in isos - if i.lower().endswith('.iso')] - if len(isos) == 1: - logger.info('Unzipping %s started.', disk_path) - f = open(disk_path + '~', 'wb') - zf = z.open(isos[0]) - with zf, f: - copyfileobj(zf, f) - f.flush() - move(disk_path + '~', disk_path) - else: - logger.info("Extracting %s failed, keeping original.", - disk_path) - - def snapshot(self): - """ Creating qcow2 snapshot with base image. - """ - # Check if snapshot type and qcow2 format matchmatch - if self.type != 'snapshot': - raise Exception('Invalid type: %s' % self.type) - # Check if file already exists - if os.path.isfile(self.get_path()): - raise Exception('File already exists: %s' % self.get_path()) - # Check if base file exist - if not os.path.isfile(self.get_base()): - raise Exception('Image Base does not exists: %s' % self.get_base()) - # Build list of Strings as command parameters - if self.format == 'iso': - os.symlink(self.get_base(), self.get_path()) - elif self.format == 'raw': - raise NotImplemented() + with open(exported_path, 'rb') as exported_disk: + try: + m = MultipartEncoder( + {'data': (exported_name + '.' + format_dict[format], exported_disk)} + ) + response = requests.post(upload_link, + data=m, + headers={'Content-Type': m.content_type}, + params={'no_redirect': ''}) + if response.status_code != 200: + raise Exception("Invalid response status code: %s" % + response.status_code) + finally: + os.unlink(exported_path) + + +def extract_iso_from_zip(self, disk_path): + with ZipFile(disk_path, 'r') as z: + isos = z.namelist() + if len(isos) != 1: + isos = [i for i in isos + if i.lower().endswith('.iso')] + if len(isos) == 1: + logger.info('Unzipping %s started.', disk_path) + f = open(disk_path + '~', 'wb') + zf = z.open(isos[0]) + with zf, f: + copyfileobj(zf, f) + f.flush() + move(disk_path + '~', disk_path) else: - cmdline = ['qemu-img', - 'create', - '-b', self.get_base(), - '-f', self.format, - self.get_path()] - # Call subprocess - subprocess.check_output(cmdline) - - def merge_disk_with_base(self, task, new_disk, parent_id=None): - proc = None - try: - cmdline = [ - 'qemu-img', 'convert', self.get_path(), - '-O', new_disk.format, new_disk.get_path()] - # Call subprocess - logger.debug( - "Merging %s into %s.", self.get_path(), - new_disk.get_path()) - percent = 0 - diff_disk = Disk.get(self.dir, self.name) - base_disk = Disk.get(self.dir, self.base_name) - clen = min(base_disk.actual_size + diff_disk.actual_size, - diff_disk.size) - output = new_disk.get_path() - proc = subprocess.Popen(cmdline) + logger.info("Extracting %s failed, keeping original.", + disk_path) + + +def snapshot(self): + """ Creating qcow2 snapshot with base image. + """ + # Check if snapshot type and qcow2 format matchmatch + if self.type != 'snapshot': + raise Exception('Invalid type: %s' % self.type) + # Check if file already exists + if os.path.isfile(self.get_path()): + raise Exception('File already exists: %s' % self.get_path()) + # Check if base file exist + if not os.path.isfile(self.get_base()): + raise Exception('Image Base does not exists: %s' % self.get_base()) + # Build list of Strings as command parameters + if self.format == 'iso': + os.symlink(self.get_base(), self.get_path()) + elif self.format == 'raw': + raise NotImplemented() + else: + cmdline = ['qemu-img', + 'create', + '-b', self.get_base(), + '-f', self.format, + self.get_path()] + # Call subprocess + subprocess.check_output(cmdline) + + +def merge_disk_with_base(self, task, new_disk, parent_id=None): + proc = None + try: + cmdline = [ + 'qemu-img', 'convert', self.get_path(), + '-O', new_disk.format, new_disk.get_path()] + # Call subprocess + logger.debug( + "Merging %s into %s.", self.get_path(), + new_disk.get_path()) + percent = 0 + diff_disk = Disk.get(self.dir, self.name) + base_disk = Disk.get(self.dir, self.base_name) + clen = min(base_disk.actual_size + diff_disk.actual_size, + diff_disk.size) + output = new_disk.get_path() + proc = subprocess.Popen(cmdline) + while True: + if proc.poll() is not None: + break + try: + actsize = os.path.getsize(output) + except OSError: + actsize = 0 + new_percent = min(100, round(actsize * 100.0 / clen)) + if new_percent > percent: + percent = new_percent + if not task.is_aborted(): + task.update_state( + task_id=parent_id, + state=task.AsyncResult(parent_id).state, + meta={'size': actsize, 'percent': percent}) + else: + logger.warning( + "Merging new disk %s is aborted by user.", + new_disk.get_path()) + raise AbortException() + sleep(1) + except AbortException: + proc.terminate() + logger.warning("Aborted merge job, removing %s", + new_disk.get_path()) + os.unlink(new_disk.get_path()) + + except: + if proc: + proc.terminate() + logger.exception("Unknown error occured, removing %s ", + new_disk.get_path()) + os.unlink(new_disk.get_path()) + raise + + +def merge_disk_without_base(self, task, new_disk, parent_id=None, + length=1024 * 1024): + try: + fsrc = open(self.get_path(), 'rb') + fdst = open(new_disk.get_path(), 'wb') + clen = self.size + actsize = 0 + percent = 0 + with fsrc, fdst: while True: - if proc.poll() is not None: + buf = fsrc.read(length) + if not buf: break - try: - actsize = os.path.getsize(output) - except OSError: - actsize = 0 + fdst.write(buf) + actsize += len(buf) new_percent = min(100, round(actsize * 100.0 / clen)) if new_percent > percent: percent = new_percent @@ -361,82 +425,42 @@ class Disk(object): "Merging new disk %s is aborted by user.", new_disk.get_path()) raise AbortException() - sleep(1) - except AbortException: - proc.terminate() - logger.warning("Aborted merge job, removing %s", - new_disk.get_path()) - os.unlink(new_disk.get_path()) - - except: - if proc: - proc.terminate() - logger.exception("Unknown error occured, removing %s ", - new_disk.get_path()) - os.unlink(new_disk.get_path()) - raise + except AbortException: + logger.warning("Aborted remove %s", new_disk.get_path()) + os.unlink(new_disk.get_path()) + except: + logger.exception("Unknown error occured removing %s ", + new_disk.get_path()) + os.unlink(new_disk.get_path()) + raise + + +def merge(self, task, new_disk, parent_id=None): + """ Merging a new_disk from the actual disk and its base. + """ - def merge_disk_without_base(self, task, new_disk, parent_id=None, - length=1024 * 1024): - try: - fsrc = open(self.get_path(), 'rb') - fdst = open(new_disk.get_path(), 'wb') - clen = self.size - actsize = 0 - percent = 0 - with fsrc, fdst: - while True: - buf = fsrc.read(length) - if not buf: - break - fdst.write(buf) - actsize += len(buf) - new_percent = min(100, round(actsize * 100.0 / clen)) - if new_percent > percent: - percent = new_percent - if not task.is_aborted(): - task.update_state( - task_id=parent_id, - state=task.AsyncResult(parent_id).state, - meta={'size': actsize, 'percent': percent}) - else: - logger.warning( - "Merging new disk %s is aborted by user.", - new_disk.get_path()) - raise AbortException() - except AbortException: - logger.warning("Aborted remove %s", new_disk.get_path()) - os.unlink(new_disk.get_path()) - except: - logger.exception("Unknown error occured removing %s ", - new_disk.get_path()) - os.unlink(new_disk.get_path()) - raise + if task.is_aborted(): + raise AbortException() - def merge(self, task, new_disk, parent_id=None): - """ Merging a new_disk from the actual disk and its base. - """ + # Check if file already exists + if os.path.isfile(new_disk.get_path()): + raise Exception('File already exists: %s' % self.get_path()) - if task.is_aborted(): - raise AbortException() + if self.format == "iso": + os.symlink(self.get_path(), new_disk.get_path()) + elif self.base_name: + self.merge_disk_with_base(task, new_disk, parent_id) + else: + self.merge_disk_without_base(task, new_disk, parent_id) - # Check if file already exists - if os.path.isfile(new_disk.get_path()): - raise Exception('File already exists: %s' % self.get_path()) - if self.format == "iso": - os.symlink(self.get_path(), new_disk.get_path()) - elif self.base_name: - self.merge_disk_with_base(task, new_disk, parent_id) - else: - self.merge_disk_without_base(task, new_disk, parent_id) +def delete(self): + """ Delete file. """ + if os.path.isfile(self.get_path()): + os.unlink(self.get_path()) - def delete(self): - """ Delete file. """ - if os.path.isfile(self.get_path()): - os.unlink(self.get_path()) - @classmethod - def list(cls, dir): - """ List all files in <dir> directory.""" - return [cls.get(dir, file) for file in os.listdir(dir)] +@classmethod +def list(cls, dir): + """ List all files in <dir> directory.""" + return [cls.get(dir, file) for file in os.listdir(dir)] diff --git a/requirements/base.txt b/requirements/base.txt index a36135c..6f9b296 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -1,3 +1,4 @@ celery==3.1.17 requests==2.5.3 +requests-toolbelt==0.9.1 filemagic==1.6 diff --git a/storagedriver.py b/storagedriver.py index e6ce525..225cc67 100644 --- a/storagedriver.py +++ b/storagedriver.py @@ -42,9 +42,9 @@ class download(AbortableTask): @celery.task() -def export(disk_desc, format): +def export(disk_desc, format, exported_name, upload_link): disk = Disk.deserialize(disk_desc) - disk.export(format) + disk.export(format, exported_name, upload_link) @celery.task() -- libgit2 0.26.0