From 41323d004fe00e3afeba4f37883fe88f440de1ed Mon Sep 17 00:00:00 2001 From: WillyPillow Date: Sun, 12 Jul 2020 23:02:07 +0800 Subject: [PATCH] Support for {reinstall,downgrade,upgrade} operations. Requires QubesOS/qubes-issues#5946 to be resolved. --- qubesadmin/tools/qvm_template.py | 100 ++++++++++++++++++++++++++----- 1 file changed, 85 insertions(+), 15 deletions(-) diff --git a/qubesadmin/tools/qvm_template.py b/qubesadmin/tools/qvm_template.py index ab73213a..9a686001 100644 --- a/qubesadmin/tools/qvm_template.py +++ b/qubesadmin/tools/qvm_template.py @@ -74,6 +74,12 @@ class TemplateState(enum.Enum): EXTRA = 'extra' UPGRADABLE = 'upgradable' +class VersionSelector(enum.Enum): + LATEST = enum.auto() + REINSTALL = enum.auto() + LATEST_LOWER = enum.auto() + LATEST_HIGHER = enum.auto() + # NOTE: Verifying RPMs this way is prone to TOCTOU. This is okay for local # files, but may create problems if multiple instances of `qvm-template` are # downloading the same file, so a lock is needed in that case. @@ -117,7 +123,8 @@ def parse_config(path): with open(path, 'r') as fd: return dict(line.rstrip('\n').split('=', 1) for line in fd) -def install(args, app): +def install(args, app, version_selector=VersionSelector.LATEST, + ignore_existing=False): # TODO: Lock, mentioned in the note above transaction_set = rpm.TransactionSet() @@ -130,11 +137,11 @@ def install(args, app): os.makedirs(args.cachedir, exist_ok=True) - dl_list = get_dl_list(args, app) + dl_list = get_dl_list(args, app, version_selector=version_selector) dl_list_copy = dl_list.copy() # Verify that the templates are not yet installed for name, (ver, _) in dl_list.items(): - if name in app.domains: + if not ignore_existing and name in app.domains: print(('Template \'%s\' already installed, skipping...' ' (You may want to use the {reinstall,upgrade,downgrade}' ' operations.)') % name, file=sys.stderr) @@ -147,7 +154,8 @@ def install(args, app): dl_list = dl_list_copy download(args, app, path_override=args.cachedir, - dl_list=dl_list, suffix=UNVERIFIED_SUFFIX) + dl_list=dl_list, suffix=UNVERIFIED_SUFFIX, + version_selector=version_selector) for rpmfile in rpm_list: path = rpmfile + UNVERIFIED_SUFFIX @@ -166,7 +174,7 @@ def install(args, app): name = package_name[len(PACKAGE_NAME_PREFIX):] # Another check for already-downloaded RPMs - if name in app.domains: + if not ignore_existing and name in app.domains: print(('Template \'%s\' already installed, skipping...' ' (You may want to use the {reinstall,upgrade,downgrade}' ' operations.)') % name, file=sys.stderr) @@ -342,28 +350,81 @@ def do_list(args, app): qubesadmin.tools.print_table(tpl_list) -def get_dl_list(args, app): - candid = {} +def get_dl_list(args, app, version_selector=VersionSelector.LATEST): + full_candid = {} for template in args.templates: + # This will be merged into `full_candid` later. + # It is separated so that we can check whether it is empty. + candid = {} + # Skip local RPMs if template.endswith('.rpm'): continue + query_res = qrexec_repoquery(args, app, PACKAGE_NAME_PREFIX + template) - if len(query_res) == 0: - parser.error('Package \'%s\' not found.' % template) - sys.exit(1) - # We only select one (latest) package for each distinct package name + + # We only select one package for each distinct package name #pylint: disable=unused-variable for name, epoch, version, release, reponame, dlsize, summary \ in query_res: ver = (epoch, version, release) - if name not in candid or rpm.labelCompare(candid[name], ver) < 0: - candid[name] = (ver, int(dlsize)) + if version_selector == VersionSelector.LATEST: + if name not in candid \ + or rpm.labelCompare(candid[name], ver) < 0: + candid[name] = (ver, int(dlsize)) + elif version_selector == VersionSelector.REINSTALL: + if name not in app.domains: + parser.error("Template '%s' not installed." % name) + vm = app.domains[name] + cur_ver = ( + vm.features['template-epoch'], + vm.features['template-version'], + vm.features['template-release']) + if rpm.labelCompare(ver, cur_ver) == 0: + candid[name] = (ver, int(dlsize)) + elif version_selector in [VersionSelector.LATEST_LOWER, + VersionSelector.LATEST_HIGHER]: + if name not in app.domains: + parser.error("Template '%s' not installed." % name) + vm = app.domains[name] + cur_ver = ( + vm.features['template-epoch'], + vm.features['template-version'], + vm.features['template-release']) + cmp_res = -1 \ + if version_selector == VersionSelector.LATEST_LOWER \ + else 1 + if rpm.labelCompare(ver, cur_ver) == cmp_res: + if name not in candid \ + or rpm.labelCompare(candid[name], ver) < 0: + candid[name] = (ver, int(dlsize)) + + if len(candid) == 0: + if version_selector == VersionSelector.LATEST: + parser.error('Template \'%s\' not found.' % template) + elif version_selector == VersionSelector.REINSTALL: + parser.error('Same version of template \'%s\' not found.' \ + % template) + elif version_selector == VersionSelector.LATEST_LOWER: + parser.error('Lower version of template \'%s\' not found.' \ + % template) + elif version_selector == VersionSelector.LATEST_HIGHER: + parser.error('Higher version of template \'%s\' not found.' \ + % template) + sys.exit(1) + + # Merge & choose the template with the highest version + for name, (ver, dlsize) in candid.items(): + if name not in full_candid \ + or rpm.labelCompare(full_candid[name], ver) < 0: + full_candid[name] = (ver, dlsize) + return candid -def download(args, app, path_override=None, dl_list=None, suffix=''): +def download(args, app, path_override=None, + dl_list=None, suffix='', version_selector=VersionSelector.LATEST): if dl_list is None: - dl_list = get_dl_list(args, app) + dl_list = get_dl_list(args, app, version_selector=version_selector) path = path_override if path_override is not None else args.downloaddir for name, (ver, dlsize) in dl_list.items(): @@ -416,6 +477,15 @@ def main(args=None, app=None): if args.operation == 'install': install(args, app) + elif args.operation == 'reinstall': + install(args, app, version_selector=VersionSelector.REINSTALL, + ignore_existing=True) + elif args.operation == 'downgrade': + install(args, app, version_selector=VersionSelector.LATEST_LOWER, + ignore_existing=True) + elif args.operation == 'upgrade': + install(args, app, version_selector=VersionSelector.LATEST_HIGHER, + ignore_existing=True) elif args.operation == 'list': do_list(args, app) elif args.operation == 'download':