diff --git a/ceph.py b/ceph.py index ea90e6a..99b744e 100644 --- a/ceph.py +++ b/ceph.py @@ -6,30 +6,62 @@ import libvirt import lxml.etree as ET from base64 import b64decode import logging +import re +import json from util import req_connection, wrap_libvirtError, Connection + logger = logging.getLogger(__name__) DUMP_SIZE_LIMIT = int(os.getenv("DUMP_SIZE_LIMIT", 20 * 1024 ** 3)) # 20GB +mon_regex = re.compile(r"^\[(?P<address>.+)\]\:(?P<port>\d+).*$") + + +class CephConfig: + + def __init__(self, user=None, config_path=None, keyring_path=None): + + self.user = user + self.config_path = config_path + self.keyring_path = keyring_path + + if user is None: + self.user = "admin" + if config_path is None: + self.config_path = os.getenv("CEPH_CONFIG", + "/etc/ceph/ceph.conf") + if keyring_path is None: + default_keyring = "/etc/ceph/ceph.client.%s.keyring" % self.user + self.keyring_path = os.getenv("CEPH_KEYRING", default_keyring) + + def cmd_args(self): + return ["--keyring", self.keyring_path, + "--id", self.user, + "--conf", self.config_path] + class CephConnection: - def __init__(self, pool_name, ceph_config=None): + def __init__(self, pool_name, conf=None): self.pool_name = pool_name - self.ceph_config = ceph_config + self.conf = conf + + if conf is None: + self.conf = CephConfig() + self.cluster = None self.ioctx = None def __enter__(self): try: - if self.ceph_config is None: - self.ceph_config = os.getenv("CEPH_CONFIG", - "/etc/ceph/ceph.conf") - self.cluster = rados.Rados(conffile=self.ceph_config) - self.cluster.connect(timeout=2) + self.cluster = rados.Rados( + conffile=self.conf.config_path, + conf=dict(keyring=self.conf.keyring_path)) + timeout = os.getenv("CEPH_TIMEOUT", 2) + self.cluster.connect(timeout=timeout) self.ioctx = self.cluster.open_ioctx(self.pool_name) except rados.InterruptedOrTimeoutError as e: raise Exception(e) @@ -46,12 +78,41 @@ def sudo(*args): subprocess.check_output(["/bin/sudo"] + list(args)) -def map_rbd(ceph_path, local_path): +def unmap_rbd(conf, local_path): + sudo("/bin/rbd", "unmap", local_path, *conf.cmd_args()) + + +def map_rbd(conf, ceph_path, local_path): try: - sudo("/bin/rbd", "map", ceph_path) + sudo("/bin/rbd", "map", ceph_path, *conf.cmd_args()) except: - sudo("/bin/rbd", "unmap", local_path) - sudo("/bin/rbd", "map", ceph_path) + unmap_rbd(conf, local_path) + sudo("/bin/rbd", "map", ceph_path, *conf.cmd_args()) + + +def get_secret_key(conf, user): + return subprocess.check_output((["/bin/ceph", + "auth", "print-key", "client.%s" % user] + + conf.cmd_args())) + + +def parse_endpoint(mon): + m = mon_regex.match(mon["addr"]) + return (m.group("address"), m.group("port")) + + +def _get_endpoints(conf): + output = subprocess.check_output((["/bin/ceph", + "mon", "dump", "--format=json"] + + conf.cmd_args())) + mon_data = json.loads(output) + mons = mon_data["mons"] + return map(parse_endpoint, mons) + + +def get_endpoints(user): + conf = CephConfig(user=user) + return _get_endpoints(conf) def save(domain, poolname, diskname): @@ -69,13 +130,13 @@ def save(domain, poolname, diskname): rbd_inst.remove(conn.ioctx, diskname) rbd_inst.create(conn.ioctx, diskname, disk_size) try: - map_rbd(ceph_path, local_path) + map_rbd(conn.conf, ceph_path, local_path) domain.save(local_path) except: rbd_inst.remove(conn.ioctx, diskname) raise finally: - sudo("/bin/rbd", "unmap", local_path) + unmap_rbd(conn.conf, local_path) def restore(connection, poolname, diskname): @@ -84,9 +145,9 @@ def restore(connection, poolname, diskname): ceph_path = os.path.join(poolname, diskname) local_path = os.path.join("/dev/rbd", ceph_path) - map_rbd(ceph_path, local_path) + map_rbd(connection.conf, ceph_path, local_path) connection.restore(local_path) - sudo("/bin/rbd", "unmap", local_path) + unmap_rbd(connection.conf, local_path) with CephConnection(poolname) as conn: rbd_inst = rbd.RBD() rbd_inst.remove(conn.ioctx, diskname) @@ -123,7 +184,10 @@ def find_secret(user): @req_connection @wrap_libvirtError -def create_secret(user, secretkey): +def create_secret(user): + conf = CephConfig() + secretkey = get_secret_key(conf, user) + xml = generate_secret_xml(user) conn = Connection.get() secret = conn.secretDefineXML(xml) @@ -141,9 +205,9 @@ def delete_secret(user): logger.info("Secret with uuid: '%s' deleted", secret.UUIDString()) -def check_secret(user, secretkey): +def check_secret(user): secret = find_secret(user) if secret is None: - secret = create_secret(user, secretkey) + secret = create_secret(user) return secret.UUIDString() diff --git a/vm.py b/vm.py index c05c146..c87e509 100644 --- a/vm.py +++ b/vm.py @@ -1,7 +1,7 @@ import lxml.etree as ET from vmcelery import native_ovs -from ceph import check_secret +from ceph import check_secret, get_endpoints # VM Instance class @@ -246,7 +246,6 @@ class CephVMDisk(VMDisk): def __init__(self, source, - endpoints, disk_device="disk", driver_name="qemu", driver_type="raw", @@ -254,8 +253,7 @@ class CephVMDisk(VMDisk): target_device="vda", target_bus="virtio", protocol="rbd", - ceph_user=None, - secret=None): + ceph_user=None): super(CephVMDisk, self).__init__( source=source, @@ -267,12 +265,11 @@ class CephVMDisk(VMDisk): target_device=target_device, target_bus=target_bus) - self.endpoints = endpoints self.protocol = protocol self.ceph_user = ceph_user - self.secret = secret - if ceph_user is not None and secret is not None: - check_secret(ceph_user, secret) + if ceph_user is not None: + check_secret(ceph_user) + self.endpoints = get_endpoints(ceph_user) @classmethod def deserialize(cls, desc): @@ -291,7 +288,7 @@ class CephVMDisk(VMDisk): ET.SubElement(source, "host", attrib={"name": name, "port": unicode(port)}) - if self.ceph_user is not None and self.secret is not None: + if self.ceph_user is not None: auth = ET.SubElement( xml_top, "auth", diff --git a/vmdriver.py b/vmdriver.py index 0c60340..16729f9 100644 --- a/vmdriver.py +++ b/vmdriver.py @@ -586,6 +586,6 @@ def get_node_metrics(): @celery.task -def refresh_secret(user, secret): +def refresh_secret(user): ceph.delete_secret(user) - ceph.check_secret(user, secret) + ceph.check_secret(user)