Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scripts: add convenience script for creating table #348

Merged
merged 5 commits into from
Jul 8, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions scripts/create_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2019, Xiaomi, Inc. All rights reserved.
# This source code is licensed under the Apache License Version 2.0, which
# can be found in the LICENSE file in the root directory of this source tree.

"""
Basic usage:

./scripts/create_table.py --table ai_user_info --depart 人工智能部-小爱-XXX项目组 --user wutao1 --cluster bj1-ai --throttle "2000*delay*100" --parts 16

TODO: automatically set write throttling on the table.
"""

import os
import click
import py_utils
import re


def validate_param_table(ctx, param, value):
# TODO(wutao1): check illegal characters
return value


def validate_param_depart(ctx, param, value):
return value.encode('utf-8')


def validate_param_user(ctx, param, value):
return value.encode('utf-8')


def validate_param_cluster(ctx, param, value):
return value


def validate_param_parts(ctx, param, value):
if value == None:
return
try:
parts = int(value)
return parts
except ValueError:
raise click.BadParameter(
'invalid value of partition count \'%s\'' % value)


def validate_param_throttle(ctx, param, value):
if value == '':
return None
pattern = re.compile(r'^\d+\*delay\*\d+(,\d+\*reject\*\d+)?$')
match = pattern.match(value)
if match is not None:
return value
else:
raise click.BadParameter(
'invalid value of throttle \'%s\'' % value)


def create_table_if_needed(cluster, table, parts):
if not cluster.has_table(table):
try:
cluster.create_table(table, parts)
except Exception as err:
py_utils.echo(err, "red")
exit(1)
else:
py_utils.echo("Success: table \"{}\" exists".format(table))


def set_business_info_if_needed(cluster, table, depart, user):
new_business_info = "depart={},user={}".format(depart, user)
py_utils.echo("New value of business.info=" + new_business_info)

envs = cluster.get_app_envs(table)
if envs != None:
old_business_info = envs.get('business.info')
if old_business_info != None:
py_utils.echo("Old value of business.info=" + old_business_info)
if old_business_info.encode('utf-8') == new_business_info:
py_utils.echo("Success: business info keeps unchanged")
return

cluster.set_app_envs(table, 'business.info',
new_business_info)


def set_write_throttling_if_needed(cluster, table, new_throttle):
py_utils.echo("New value of replica.write_throttling=" + new_throttle)

envs = cluster.get_app_envs(table)
if envs != None:
old_throttle = envs.get('replica.write_throttling')
if old_throttle != None:
py_utils.echo(
"Old value of replica.write_throttling=" + old_throttle)
if old_throttle == new_throttle:
py_utils.echo(
"Success: throttle keeps unchanged")
return
cluster.set_app_envs(table, 'replica.write_throttling',
new_throttle)


@click.command()
@click.option("--table",
required=True,
callback=validate_param_table,
help="Name of the table you want to create.")
@click.option("--depart",
required=True,
callback=validate_param_depart,
help="Department of the table owner. If there are more than one levels of department, use '-' to concatenate them")
@click.option("--user",
required=True,
callback=validate_param_user,
help="The table owner. If there are more than one owners, use '&' to concatenate them.")
@click.option("--cluster",
required=True,
callback=validate_param_cluster,
help="The cluster name. Where you want to place the table.")
@click.option("--parts",
callback=validate_param_parts,
help="The partition count of the table. Empty means no create.")
@click.option("--throttle",
default="",
callback=validate_param_throttle,
help="{delay_qps_threshold}*delay*{delay_ms},{reject_qps_threshold}*reject*{delay_ms_before_reject}")
def main(table, depart, user, cluster, parts, throttle):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

参数顺序可以调整下, 从广到细, 从required到optional

c = py_utils.PegasusCluster(cluster_name=cluster)
create_table_if_needed(c, table, parts)
set_business_info_if_needed(c, table, depart, user)
if throttle is not None:
set_write_throttling_if_needed(c, table, throttle)


if __name__ == "__main__":
main()

45 changes: 39 additions & 6 deletions scripts/py_utils/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ def echo(message, color=None):


class PegasusCluster(object):
def __init__(self, cfg_file_name):
self._cluster_name = os.path.basename(cfg_file_name).replace(
"pegasus-", "").replace(".cfg", "")
def __init__(self, cfg_file_name=None, cluster_name=None):
if cluster_name is None:
self._cluster_name = os.path.basename(cfg_file_name).replace(
"pegasus-", "").replace(".cfg", "")
else:
self._cluster_name = cluster_name
self._shell_path = os.getenv("PEGASUS_SHELL_PATH")
self._cfg_file_name = cfg_file_name
if self._shell_path is None:
Expand All @@ -36,8 +39,10 @@ def print_unhealthy_partitions(self):
list_detail = self._run_shell("ls -d -j").strip()

list_detail_json = json.loads(list_detail)
read_unhealthy_app_count = int(list_detail_json["summary"]["read_unhealthy_app_count"])
write_unhealthy_app_count = int(list_detail_json["summary"]["write_unhealthy_app_count"])
read_unhealthy_app_count = int(
list_detail_json["summary"]["read_unhealthy_app_count"])
write_unhealthy_app_count = int(
list_detail_json["summary"]["write_unhealthy_app_count"])
if write_unhealthy_app_count > 0:
echo("cluster is write unhealthy, write_unhealthy_app_count = " +
str(write_unhealthy_app_count))
Expand Down Expand Up @@ -73,14 +78,41 @@ def get_meta_host(self):
if line.strip().startswith("host.0"):
return line.split("=")[1].strip()

def create_table(self, table, parts):
create_result = self._run_shell(
"create {} -p {}".format(table, parts)).strip()
if "ERR_INVALID_PARAMETERS" in create_result:
raise ValueError("failed to create table \"{}\"".format(table))

def get_app_envs(self, table):
envs_result = self._run_shell(
"use {} \n get_app_envs".format(table)).strip()[len("OK\n"):]
if "ERR_OBJECT_NOT_FOUND" in envs_result:
raise ValueError("table {} does not exist".format(table))
if envs_result == "":
return None
envs_result = self._run_shell(
"use {} \n get_app_envs -j".format(table)).strip()[len("OK\n"):]
return json.loads(envs_result)['app_envs']

def set_app_envs(self, table, env_name, env_value):
envs_result = self._run_shell(
"use {} \n set_app_envs {} {}".format(table, env_name, env_value)).strip()[len("OK\n"):]
if "ERR_OBJECT_NOT_FOUND" in envs_result:
raise ValueError("table {} does not exist".format(table))

def has_table(self, table):
app_result = self._run_shell("app {} ".format(table)).strip()
return "ERR_OBJECT_NOT_FOUND" not in app_result

def _run_shell(self, args):
"""
:param args: arguments passed to ./run.sh shell (type `string`)
:return: shell output
"""
global _global_verbose

cmd = "cd {1}; echo {0} | ./run.sh shell -n {2}".format(
cmd = "cd {1}; echo -e \"{0}\" | ./run.sh shell -n {2}".format(
args, self._shell_path, self._cluster_name)
if _global_verbose:
echo("executing command: \"{0}\"".format(cmd))
Expand Down Expand Up @@ -119,3 +151,4 @@ def list_pegasus_clusters(config_path, env):
continue
clusters.append(PegasusCluster(config_path + "/" + fname))
return clusters