Skip to content

Commit

Permalink
feat: transfer tool for renaming to workload (#272)
Browse files Browse the repository at this point in the history
* feat: transfer tool for renaming to workload

* fix: key misleading

Co-authored-by: anrs <[email protected]>
  • Loading branch information
2 people authored and CMGS committed Nov 25, 2020
1 parent 18eb694 commit 7fa95cc
Show file tree
Hide file tree
Showing 2 changed files with 280 additions and 16 deletions.
264 changes: 264 additions & 0 deletions scripts/meta_transfer_as_rename2workload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import functools
import json
import os
import sys

import etcd3

def remove_prefix(s, prefix):
return s[len(prefix):].lstrip('/') if s.startswith(prefix) else s

def range_prefix(meta, obj_prefix, fn):
orig_prefix = os.path.join(meta.orig_root_prefix, obj_prefix)

for orig_value, orig_meta in meta.etcd.get_prefix(orig_prefix):
orig_key = orig_meta.key.decode('utf-8')
objname = remove_prefix(orig_key, orig_prefix)

new_key = fn(objname, orig_value.decode('utf-8'))
if new_key:
print('convert %s to %s' % (orig_key, new_key))


class Pod(object):

def __init__(self, meta):
"""Initializes a pod transfer."""
self.meta = meta
self.pod_prefix = 'pod/info'
self.range_prefix = functools.partial(range_prefix, self.meta)

def trans(self):
self.range_prefix(self.pod_prefix, self._trans)

def _trans(self, podname, orig_value):
new_key = os.path.join(self.meta.new_root_prefix, self.pod_prefix, podname)
self.meta.etcd.put(new_key, orig_value)
return new_key


class Node(object):

def __init__(self, meta):
"""Initializes a node transfer."""
self.meta = meta
self.info_prefix = 'node'
self.range_prefix = functools.partial(range_prefix, self.meta, self.info_prefix)
self.nodes = {}

def trans(self):
self.range_prefix(self._trans_info)
self.range_prefix(self._trans_pod)
self.range_prefix(self._trans_cert)
self.range_prefix(self._trans_workload)

def _trans_info(self, nodename, orig_value):
# skipping extra info.
if ':' in nodename:
return

self.nodes[nodename] = json.loads(orig_value)

new_key = os.path.join(self.meta.new_root_prefix, self.info_prefix, nodename)
self.meta.etcd.put(new_key, orig_value)
return new_key

def _trans_pod(self, node_pod_pair, orig_value):
# parsering node-pod-pair itself only
if ':pod/' not in node_pod_pair:
return

podname, _, nodename = node_pod_pair.partition(':pod/')
if not (podname and nodename):
raise ValueError('invalid podname or nodename for %s' % node_pod_pair)

new_key = os.path.join(self.meta.new_root_prefix, self.info_prefix, '%s:pod' % podname, nodename)
self.meta.etcd.put(new_key, orig_value)
return new_key

def _trans_cert(self, cert_key, orig_value):
nodename, _, cert_type = cert_key.partition(':')

# parsering orig_key which ends with :ca, :cert, :key only
if cert_type not in ('ca', 'cert', 'key'):
return

new_key = os.path.join(self.meta.new_root_prefix, self.info_prefix, '%s:%s' % (nodename, cert_type))
self.meta.etcd.put(new_key, orig_value)
return new_key

def _trans_workload(self, node_wrk_pair, orig_value):
nodename, _, wrk_id = node_wrk_pair.partition(':containers/')

# parsering orig_key which belongs node-workload pair only.
if not (nodename and wrk_id):
return

new_key = os.path.join(self.meta.new_root_prefix, self.info_prefix, '%s:worklods' % nodename, wrk_id)
wrk = Workload.conv(orig_value, self)
self.meta.etcd.put(new_key, json.dumps(wrk))
return new_key

def get_numa_node(self, cpumap, nodename):
"""Ref core types/core.go GetNUMANode func."""
numa_node_id = ""

node = self.nodes.get(nodename)
if not node:
raise ValueError('invalid nodename %s' % nodename)

numa = node.get('numa')
if not numa:
return numa_node_id

for cpu_id in cpumap:
mem_node = numa.get(cpu_id)
if not mem_node:
continue

if numa_node_id == '':
numa_node_id = mem_node
elif numa_node_id != mem_node:
numa_node_id = ''

return numa_node_id


class Workload(object):

def __init__(self, meta, node_transfer):
"""Initializes a workload transfer."""
self.meta = meta
self.container_prefix = 'containers'
self.wrk_prefix = 'workloads'
self.deploy_prefix = 'deploy'
self.range_prefix = functools.partial(range_prefix, self.meta)
self.node_transfer = node_transfer

def trans(self):
self.range_prefix(self.container_prefix, self._trans_container)
self.range_prefix(self.deploy_prefix, self._trans_deploy)

def _trans_container(self, wrk_id, orig_value):
new_key = os.path.join(self.meta.new_root_prefix, self.wrk_prefix, wrk_id)
wrk = self.conv(orig_value, self.node_transfer)
self.meta.etcd.put(new_key, json.dumps(wrk))
return new_key

def _trans_deploy(self, deploy_key, orig_value):
parts = deploy_key.split('/')
if len(parts) != 4:
raise ValueError('invalid deploy key: %s' % deploy_key)

appname, entrypoint, nodename, wrk_id = parts

new_key = os.path.join(self.meta.new_root_prefix, self.deploy_prefix, appname, entrypoint, nodename, wrk_id)
wrk = self.conv(orig_value, self.node_transfer)
self.meta.etcd.put(new_key, json.dumps(wrk))

return new_key

@classmethod
def conv(cls, orig_value, node_transfer):

def delete(*keys):
for k in keys:
try:
del dic[k]
except KeyError:
pass

del_keys = set()
def get(new_field, orig_field, transit_field, default=None):
value = None

# don't use dic.get(new_field, dic[orig_field]),
# due to there isn't orig_field but has new_field.
if new_field in dic:
value = dic[new_field]
elif transit_field in dic:
value = dic[transit_field]
else:
if default is None:
value = dic[orig_field]
else:
value = dic.get(orig_field, default)

del_keys.update({orig_field, transit_field})

return value

dic = json.loads(orig_value)
dic['cpu'] = get('CPU', 'cpu', 'CPU')

dic.update(dict(
create_time=1553990400,
cpu_quota_request=get('cpu_quota_request', 'quota', 'CPUQuotaRequest'),
cpu_quota_limit=get('cpu_quota_limit', 'quota', 'CPUQuotaLimit'),
memory_request=get('memory_request', 'memory', 'MemoryRequest'),
memory_limit=get('memory_limit', 'memory', 'MemoryLimit'),
volume_request=get('volume_request', 'volumes', 'VolumeRequest'),
volume_limit=get('volume_limit', 'volumes', 'VolumeLimit'),
volume_plan_request=get('volume_plan_request', 'volume_plan', 'VolumePlanRequest', default={}),
volume_plan_limit=get('volume_plan_limit', 'volume_plan', 'VolumePlanLimit', default={}),
volume_changed=dic.get('volume_changed', False),
storage_request=get('storage_request', 'storage', 'StorageRequest'),
storage_limit=get('storage_limit', 'storage', 'StorageLimit'),
))

dic['numa_node'] = ''
if dic['cpu'] and node_transfer:
numa_node = node_transfer.get_numa_node(dic['cpu'], dic['nodename'])
dic['numa_node'] = dic.get('NUMANode', numa_node)

# don't removing *cpu* from the original dict.
try:
del_keys.remove('cpu')
except KeyError:
pass

del_keys.update({'softlimit', 'VolumeChanged', 'NUMANode'})
delete(*list(del_keys))

return dic


class Transfer(object):

def __init__(self, etcd, orig_root_prefix, new_root_prefix):
"""Initializes a transfer which includes common utilities."""
self.etcd = etcd
self.orig_root_prefix = orig_root_prefix
self.new_root_prefix = new_root_prefix

def trans(self):
Pod(self).trans()

node_transfer = Node(self)
node_transfer.trans()

Workload(self, node_transfer).trans()

def getargs():
ap = argparse.ArgumentParser()
ap.add_argument('-o', dest='orig_root_prefix', help='original prefix', default='/eru')
ap.add_argument('-n', dest='new_root_prefix', help='new prefix', default='/eru2')
ap.add_argument('--etcd-host', default='127.0.0.1')
ap.add_argument('--etcd-port', type=int, default=2379)
return ap.parse_args()

def connect_etcd(host, port):
return etcd3.client(host=host, port=port)

def main():
args = getargs()
etcd = connect_etcd(args.etcd_host, args.etcd_port)
trans = Transfer(etcd, args.orig_root_prefix, args.new_root_prefix)
trans.trans()
return 0

if __name__ == '__main__':
sys.exit(main())
32 changes: 16 additions & 16 deletions types/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@ type ResourceOptions struct {

// ResourceMeta for messages and workload to store
type ResourceMeta struct {
CPUQuotaRequest float64
CPUQuotaLimit float64
CPU ResourceMap
NUMANode string

MemoryRequest int64
MemoryLimit int64

VolumeRequest VolumeBindings
VolumeLimit VolumeBindings
VolumePlanRequest VolumePlan
VolumePlanLimit VolumePlan
VolumeChanged bool

StorageRequest int64
StorageLimit int64
CPUQuotaRequest float64 `json:"cpu_quota_request"`
CPUQuotaLimit float64 `json:"cpu_quota_limit"`
CPU ResourceMap `json:"cpu"`
NUMANode string `json:"numa_node"`

MemoryRequest int64 `json:"memory_request"`
MemoryLimit int64 `json:"memory_limit"`

VolumeRequest VolumeBindings `json:"volume_request"`
VolumeLimit VolumeBindings `json:"volume_limit"`
VolumePlanRequest VolumePlan `json:"volume_plan_request"`
VolumePlanLimit VolumePlan `json:"volume_plan_limit"`
VolumeChanged bool `json:"volume_changed"`

StorageRequest int64 `json:"storage_request"`
StorageLimit int64 `json:"storage_limit"`
}

// ResourceType .
Expand Down

0 comments on commit 7fa95cc

Please sign in to comment.