Source code for tests.install_vm

#!/usr/bin/python3

import argparse
import os
import shlex
import subprocess
import sys
import time


KNOWN_DISTROS = [
    "fedora",
    "centos7",
    "centos8",
    "centos9",
    "rhel7",
    "rhel8",
    "rhel9",
]

DISTRO_URL = {
    "fedora":
        "https://download.fedoraproject.org/pub/fedora/linux/releases/39/Everything/x86_64/os",
    "centos7": "http://mirror.centos.org/centos/7/os/x86_64",
    "centos8": "http://mirror.centos.org/centos/8-stream/BaseOS/x86_64/os/",
    "centos9": "http://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/",
}
DISTRO_EXTRA_REPO = {
    "centos8": "http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/",
    "centos9": "http://mirror.stream.centos.org/9-stream/AppStream/x86_64/os/",
}


[docs] def path_from_tests(path): return os.path.relpath(os.path.join(os.path.dirname(__file__), path))
[docs] def parse_args(): parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument( "--libvirt", dest="libvirt", default="qemu:///session", help="What hypervisor should be used when installing VM.", ) parser.add_argument( "--kickstart", dest="kickstart", default=path_from_tests("kickstarts/test_suite.cfg"), help="Path to a kickstart file for installation of a VM.", ) parser.add_argument( "--distro", dest="distro", required=True, choices=KNOWN_DISTROS, help="What distribution to install.", ) parser.add_argument( "--domain", dest="domain", required=True, help="What name should the new domain have.", ) parser.add_argument( "--disk-dir", dest="disk_dir", default=None, help="Location of the VM qcow2 disk file (ignored when --disk is specified).", ) parser.add_argument( "--disk-size", dest="disk_size", default=20, help="Size of the VM qcow2 disk, default is 20 GiB (ignored when --disk is specified).", ) parser.add_argument( "--disk", dest="disk", help="Full disk type/spec, ie. pool=MyPool,bus=sata,cache=unsafe.", ) parser.add_argument( "--ram", dest="ram", default=3072, type=int, help="Amount of RAM configured for the VM.", ) parser.add_argument( "--cpu", dest="cpu", default=2, type=int, help="Number of CPU cores configured for the VM.", ) parser.add_argument( "--network", dest="network", help="Network type/spec, ie. bridge=br0 or network=name.", ) parser.add_argument( "--url", dest="url", default=None, help="URL to an installation tree on a remote server.", ) parser.add_argument( "--extra-repo", dest="extra_repo", default=None, help="URL to an extra repository to be used during installation (e.g. AppStream).", ) parser.add_argument( "--dry", dest="dry", action="store_true", help="Print command line instead of triggering command.", ) parser.add_argument( "--ssh-pubkey", dest="ssh_pubkey", default=None, help="Path to an SSH public key which will be used to access the VM.", ) parser.add_argument( "--uefi", dest="uefi", choices=[ "secureboot", "normal", ], help="Perform UEFI based installation, optionally with secure boot support.", ) parser.add_argument( "--install-gui", dest="install_gui", action="store_true", help="Perform a GUI installation (default is installation without GUI).", ) parser.add_argument( "--console", dest="console", action="store_true", help="Connect to a serial console of the VM (to monitor installation progress).", ) parser.add_argument( "--disk-unsafe", dest="disk_unsafe", action="store_true", help="Set cache unsafe.", ) return parser.parse_args()
[docs] def wait_vm_not_running(domain): timeout = 300 print("Waiting for {0} VM to shutdown (max. {1}s)".format(domain, timeout)) end_time = time.time() + timeout try: while True: time.sleep(5) cmd = ["virsh", "domstate", domain] if subprocess.getoutput(cmd).rstrip() != "running": return if time.time() < end_time: continue print("Timeout reached: {0} VM failed to shutdown, cancelling wait." .format(domain)) return except KeyboardInterrupt: print("Interrupted, cancelling wait.") return
[docs] def err(rc, msg): print(msg, file=sys.stderr) sys.exit(rc)
[docs] def handle_url(data): if not data.url: data.url = DISTRO_URL.get(data.distro, None) data.extra_repo = DISTRO_EXTRA_REPO.get(data.distro, None) if data.url: return err(1, "For the '{0}' distro the `--url` option needs to be provided.".format(data.distro))
[docs] def handle_ssh_pubkey(data): data.ssh_pubkey_used = bool(data.ssh_pubkey) if not data.ssh_pubkey: username = os.environ.get("SUDO_USER", "") home_dir = os.path.expanduser("~" + username) data.ssh_pubkey = home_dir + "/.ssh/id_rsa.pub" if not os.path.isfile(data.ssh_pubkey): err(1, """Error: SSH public key not found at {0} You can use the `--ssh-pubkey` to specify which key should be used.""".format(data.ssh_pubkey)) with open(data.ssh_pubkey) as f: data.pub_key_content = f.readline().rstrip()
[docs] def handle_disk(data): disk_spec = [ "size={0}".format(data.disk_size), "format=qcow2", ] if data.disk: disk_spec.extend(data.disk.split(",")) elif data.disk_dir: disk_path = os.path.join(data.disk_dir, data.domain) + ".qcow2" print("Location of VM disk: {0}".format(disk_path)) disk_spec.append("path={0}".format(disk_path)) if data.disk_unsafe: disk_spec.append("cache=unsafe") data.disk_spec = ",".join(disk_spec)
[docs] def handle_kickstart(data): data.ks_basename = os.path.basename(data.kickstart) tmp_kickstart = "/tmp/" + data.ks_basename with open(data.kickstart) as infile, open(tmp_kickstart, "w") as outfile: content = infile.read() content = content.replace("&&HOST_PUBLIC_KEY&&", data.pub_key_content) if data.distro != "fedora": content = content.replace("&&YUM_REPO_URL&&", data.url) repo_cmd = "" if data.extra_repo: # extra repository repo_cmd = "repo --name=extra-repository --baseurl={0}".format(data.extra_repo) content = content.replace("&&YUM_EXTRA_REPO_URL&&", data.extra_repo) content = content.replace("&&YUM_EXTRA_REPO&&", repo_cmd) if data.uefi: content = content.replace( "part /boot --fstype=xfs --size=512", "part /boot --fstype=xfs --size=312\npart /boot/efi --fstype=efi --size=200", ).replace( "part biosboot ", "# part biosboot ", ) if data.install_gui: gui_group = "\n%packages\n@^graphical-server-environment\n" if data.distro == "fedora": gui_group = "\n%packages\n@^Fedora Workstation\n" content = content.replace("\n%packages\n", gui_group) outfile.write(content) data.kickstart = tmp_kickstart
[docs] def handle_rest(data): if not data.network: if data.libvirt == "qemu:///system": data.network = "network=default" else: data.network = "bridge=virbr0" if data.console: data.wait_opt = 0 else: data.wait_opt = -1
[docs] def join_extented_opt(opt_name, delim, opts): if opts: return ["{0}={1}".format(opt_name, delim.join(opts))] return []
[docs] def get_virt_install_command(data): command = [ "virt-install", "--connect={0}".format(data.libvirt), "--name={0}".format(data.domain), "--memory={0}".format(data.ram), "--vcpus={0}".format(data.cpu), "--network={0}".format(data.network), "--disk={0}".format(data.disk_spec), "--initrd-inject={0}".format(data.kickstart), "--serial=pty", "--noautoconsole", "--rng=/dev/random", "--wait={0}".format(data.wait_opt), "--location={0}".format(data.url), ] boot_opts = [] extra_args_opts = [ "inst.ks=file:/{0}".format(data.ks_basename), "inst.ks.device=eth0", # The kernel option "net.ifnames=0" is used to disable predictable network # interface names, for more details see: # https://www.freedesktop.org/wiki/Software/systemd/PredictableNetworkInterfaceNames/ "net.ifnames=0", "console=ttyS0,115200", ] features_opts = [] if data.install_gui: command.append("--graphics=vnc") extra_args_opts.append("inst.graphical") else: command.append("--graphics=none") extra_args_opts.append("inst.cmdline") if data.uefi: boot_opts.append("uefi") if data.uefi == "secureboot": boot_opts.extend([ "loader.secure=yes", ]) features_opts.append("smm=on") else: boot_opts.append("loader.secure=no") command.extend(join_extented_opt("--boot", ",", boot_opts)) command.extend(join_extented_opt("--extra-args", " ", extra_args_opts)) command.extend(join_extented_opt("--features", ",", features_opts)) return command
[docs] def run_virt_install(data, command): if data.dry: print("\nThe following command would be used for the VM installation:") print(shlex.join(command)) return subprocess.call(command) if data.console: subprocess.call(["unbuffer", "virsh", "console", data.domain]) wait_vm_not_running(data.domain) subprocess.call(["virsh", "start", data.domain])
[docs] def give_info(data): if data.libvirt == "qemu:///system": ip_cmd = "sudo virsh domifaddr {0}".format(data.domain) else: # command evaluation in fish shell is simply surrounded by # parenthesis for example: (echo foo). In other shells you # need to prepend the $ symbol as: $(echo foo) from os import environ cmd_eval = "" if environ["SHELL"][-4:] == "fish" else "$" ip_cmd = "arp -n | grep {0}(virsh -q domiflist {1} | awk '{{print $5}}')".format( cmd_eval, data.domain) print(""" To determine the IP address of the {domain} VM use: {ip_cmd} To connect to the {domain} VM use: ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null root@IP To connect to the VM serial console, use: virsh console {domain}""".format(** data.__dict__, ip_cmd=ip_cmd)) if data.ssh_pubkey_used: print(""" Add: -o IdentityFile={ssh_pubkey} option to your ssh command and export the: export SSH_ADDITIONAL_OPTIONS='-o IdentityFile={ssh_pubkey}' before running the Automatus.""".format(** data.__dict__)) if data.libvirt == "qemu:///system": print(""" IMPORTANT: When running Automatus use: sudo -E to make sure that your SSH key is used.""")
[docs] def main(): data = parse_args() handle_url(data) handle_ssh_pubkey(data) print("Using SSH public key from file: {0}".format(data.ssh_pubkey)) print("Using hypervisor: {0}".format(data.libvirt)) handle_disk(data) handle_kickstart(data) print("Using kickstart file: {0}".format(data.kickstart)) handle_rest(data) command = get_virt_install_command(data) run_virt_install(data, command) give_info(data)
if __name__ == "__main__": main()