From 4875947db963486f0baf7f1c7eb2944f95f18687 Mon Sep 17 00:00:00 2001 From: anrs Date: Tue, 24 Nov 2020 14:17:05 +0800 Subject: [PATCH] feat: transfer tool for renaming to workload --- scripts/meta_transfer_as_rename2workload.py | 263 ++++++++++++++++++++ types/resource.go | 32 +-- 2 files changed, 279 insertions(+), 16 deletions(-) create mode 100644 scripts/meta_transfer_as_rename2workload.py diff --git a/scripts/meta_transfer_as_rename2workload.py b/scripts/meta_transfer_as_rename2workload.py new file mode 100644 index 000000000..8acdb1628 --- /dev/null +++ b/scripts/meta_transfer_as_rename2workload.py @@ -0,0 +1,263 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import argparse +import functools +import json +import os +import sys + +import etcd3 + +def remove_prefix(s, prefix): + return s[len(prefix):].lstrip('/') if s.startswith(prefix) else s + +def range_prefix(meta, obj_prefix, fn): + orig_prefix = os.path.join(meta.orig_root_prefix, obj_prefix) + + for orig_value, orig_meta in meta.etcd.get_prefix(orig_prefix): + orig_key = orig_meta.key.decode('utf-8') + objname = remove_prefix(orig_key, orig_prefix) + + new_key = fn(objname, orig_value.decode('utf-8')) + if new_key: + print('convert %s to %s' % (orig_key, new_key)) + + +class Pod(object): + + def __init__(self, meta): + """Initializes a pod transfer.""" + self.meta = meta + self.pod_prefix = 'pod/info' + self.range_prefix = functools.partial(range_prefix, self.meta) + + def trans(self): + self.range_prefix(self.pod_prefix, self._trans) + + def _trans(self, podname, orig_value): + new_key = os.path.join(self.meta.new_root_prefix, self.pod_prefix, podname) + self.meta.etcd.put(new_key, orig_value) + return new_key + + +class Node(object): + + def __init__(self, meta): + """Initializes a node transfer.""" + self.meta = meta + self.info_prefix = 'node' + self.range_prefix = functools.partial(range_prefix, self.meta, self.info_prefix) + self.nodes = {} + + def trans(self): + self.range_prefix(self._trans_info) + self.range_prefix(self._trans_pod) + self.range_prefix(self._trans_cert) + self.range_prefix(self._trans_workload) + + def _trans_info(self, nodename, orig_value): + # skipping extra info. + if ':' in nodename: + return + + self.nodes[nodename] = json.loads(orig_value) + + new_key = os.path.join(self.meta.new_root_prefix, self.info_prefix, nodename) + self.meta.etcd.put(new_key, orig_value) + return new_key + + def _trans_pod(self, node_pod_pair, orig_value): + # parsering node-pod-pair itself only + if ':pod/' not in node_pod_pair: + return + + podname, _, nodename = node_pod_pair.partition(':pod/') + if not (podname and nodename): + raise ValueError('invalid podname or nodename for %s' % node_pod_pair) + + new_key = os.path.join(self.meta.new_root_prefix, self.info_prefix, '%s:pod' % podname, nodename) + self.meta.etcd.put(new_key, orig_value) + return new_key + + def _trans_cert(self, cert_key, orig_value): + nodename, _, cert_type = cert_key.partition(':') + + # parsering orig_key which ends with :ca, :cert, :key only + if cert_type not in ('ca', 'cert', 'key'): + return + + new_key = os.path.join(self.meta.new_root_prefix, self.info_prefix, '%s:%s' % (nodename, cert_type)) + self.meta.etcd.put(new_key, orig_value) + return new_key + + def _trans_workload(self, node_wrk_pair, orig_value): + nodename, _, wrk_id = node_wrk_pair.partition(':containers/') + + # parsering orig_key which belongs node-workload pair only. + if not (nodename and wrk_id): + return + + new_key = os.path.join(self.meta.new_root_prefix, self.info_prefix, '%s:worklods' % nodename, wrk_id) + wrk = Workload.conv(orig_value, self) + self.meta.etcd.put(new_key, json.dumps(wrk)) + return new_key + + def get_numa_node(self, cpumap, nodename): + """Ref core types/core.go GetNUMANode func.""" + numa_node_id = "" + + node = self.nodes.get(nodename) + if not node: + raise ValueError('invalid nodename %s' % nodename) + + numa = node.get('numa') + if not numa: + return numa_node_id + + for cpu_id in cpumap: + mem_node = numa.get(cpu_id) + if not mem_node: + continue + + if numa_node_id == '': + numa_node_id = mem_node + elif numa_node_id != mem_node: + numa_node_id = '' + + return numa_node_id + + +class Workload(object): + + def __init__(self, meta, node_transfer): + """Initializes a workload transfer.""" + self.meta = meta + self.container_prefix = 'containers' + self.deploy_prefix = 'deploy' + self.range_prefix = functools.partial(range_prefix, self.meta) + self.node_transfer = node_transfer + + def trans(self): + self.range_prefix(self.container_prefix, self._trans_container) + self.range_prefix(self.deploy_prefix, self._trans_deploy) + + def _trans_container(self, wrk_id, orig_value): + new_key = os.path.join(self.meta.new_root_prefix, self.container_prefix, wrk_id) + wrk = self.conv(orig_value, self.node_transfer) + self.meta.etcd.put(new_key, json.dumps(wrk)) + return new_key + + def _trans_deploy(self, deploy_key, orig_value): + parts = deploy_key.split('/') + if len(parts) != 4: + raise ValueError('invalid deploy key: %s' % deploy_key) + + appname, entrypoint, nodename, wrk_id = parts + + new_key = os.path.join(self.meta.new_root_prefix, self.deploy_prefix, appname, entrypoint, nodename, wrk_id) + wrk = self.conv(orig_value, self.node_transfer) + self.meta.etcd.put(new_key, json.dumps(wrk)) + + return new_key + + @classmethod + def conv(cls, orig_value, node_transfer): + + def delete(*keys): + for k in keys: + try: + del dic[k] + except KeyError: + pass + + del_keys = set() + def get(new_field, orig_field, transit_field, default=None): + value = None + + # don't use dic.get(new_field, dic[orig_field]), + # due to there isn't orig_field but has new_field. + if new_field in dic: + value = dic[new_field] + elif transit_field in dic: + value = dic[transit_field] + else: + if default is None: + value = dic[orig_field] + else: + value = dic.get(orig_field, default) + + del_keys.update({orig_field, transit_field}) + + return value + + dic = json.loads(orig_value) + dic['cpu'] = get('CPU', 'cpu', 'CPU') + + dic.update(dict( + create_time=1553990400, + cpu_quota_request=get('cpu_quota_request', 'quota', 'CPUQuotaRequest'), + cpu_quota_limit=get('cpu_quota_limit', 'quota', 'CPUQuotaLimit'), + memory_request=get('memory_request', 'memory', 'MemoryRequest'), + memory_limit=get('memory_limit', 'memory', 'MemoryLimit'), + volume_request=get('volume_request', 'volumes', 'VolumeRequest'), + volume_limit=get('volume_limit', 'volumes', 'VolumeLimit'), + volume_plan_request=get('volume_plan_request', 'volume_plan', 'VolumePlanRequest', default={}), + volume_plan_limit=get('volume_plan_limit', 'volume_plan', 'VolumePlanLimit', default={}), + volume_changed=dic.get('volume_changed', False), + storage_request=get('storage_request', 'storage', 'StorageRequest'), + storage_limit=get('storage_limit', 'storage', 'StorageLimit'), + )) + + dic['numa_node'] = '' + if dic['cpu'] and node_transfer: + numa_node = node_transfer.get_numa_node(dic['cpu'], dic['nodename']) + dic['numa_node'] = dic.get('NUMANode', numa_node) + + # don't removing *cpu* from the original dict. + try: + del_keys.remove('cpu') + except KeyError: + pass + + del_keys.update({'softlimit', 'VolumeChanged', 'NUMANode'}) + delete(*list(del_keys)) + + return dic + + +class Transfer(object): + + def __init__(self, etcd, orig_root_prefix, new_root_prefix): + """Initializes a transfer which includes common utilities.""" + self.etcd = etcd + self.orig_root_prefix = orig_root_prefix + self.new_root_prefix = new_root_prefix + + def trans(self): + Pod(self).trans() + + node_transfer = Node(self) + node_transfer.trans() + + Workload(self, node_transfer).trans() + +def getargs(): + ap = argparse.ArgumentParser() + ap.add_argument('-o', dest='orig_root_prefix', help='original prefix', default='/eru') + ap.add_argument('-n', dest='new_root_prefix', help='new prefix', default='/eru2') + ap.add_argument('--etcd-host', default='127.0.0.1') + ap.add_argument('--etcd-port', type=int, default=2379) + return ap.parse_args() + +def connect_etcd(host, port): + return etcd3.client(host=host, port=port) + +def main(): + args = getargs() + etcd = connect_etcd(args.etcd_host, args.etcd_port) + trans = Transfer(etcd, args.orig_root_prefix, args.new_root_prefix) + trans.trans() + return 0 + +if __name__ == '__main__': + sys.exit(main()) diff --git a/types/resource.go b/types/resource.go index 683a288fb..31473f5a0 100644 --- a/types/resource.go +++ b/types/resource.go @@ -23,22 +23,22 @@ type ResourceOptions struct { // ResourceMeta for messages and workload to store type ResourceMeta struct { - CPUQuotaRequest float64 - CPUQuotaLimit float64 - CPU ResourceMap - NUMANode string - - MemoryRequest int64 - MemoryLimit int64 - - VolumeRequest VolumeBindings - VolumeLimit VolumeBindings - VolumePlanRequest VolumePlan - VolumePlanLimit VolumePlan - VolumeChanged bool - - StorageRequest int64 - StorageLimit int64 + CPUQuotaRequest float64 `json:"cpu_quota_request"` + CPUQuotaLimit float64 `json:"cpu_quota_limit"` + CPU ResourceMap `json:"cpu"` + NUMANode string `json:"numa_node"` + + MemoryRequest int64 `json:"memory_request"` + MemoryLimit int64 `json:"memory_limit"` + + VolumeRequest VolumeBindings `json:"volume_request"` + VolumeLimit VolumeBindings `json:"volume_limit"` + VolumePlanRequest VolumePlan `json:"volume_plan_request"` + VolumePlanLimit VolumePlan `json:"volume_plan_limit"` + VolumeChanged bool `json:"volume_changed"` + + StorageRequest int64 `json:"storage_request"` + StorageLimit int64 `json:"storage_limit"` } // ResourceType .