Skip to content

Commit

Permalink
Merge pull request #220 from hhyo/feature/osc
Browse files Browse the repository at this point in the history
SQL工单详情增加日志列表,增加osc执行控制,包括进度查看、暂停、恢复、终止
  • Loading branch information
hhyo authored May 27, 2019
2 parents a4c9ecc + 954e9a3 commit 4a36e99
Show file tree
Hide file tree
Showing 10 changed files with 609 additions and 131 deletions.
16 changes: 14 additions & 2 deletions sql/engines/goinception.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ def execute_check(self, instance=None, db_name=None, sql=''):
check_result.warning_count += 1
elif r[2] == 2: # 错误
check_result.error_count += 1
if get_syntax_type(r[5]) == 'DDL':
check_result.syntax_type = 1
# 没有找出DDL语句的才继续执行此判断
if check_result.syntax_type == 2:
if get_syntax_type(r[5]) == 'DDL':
check_result.syntax_type = 1
check_result.column_list = inception_result.column_list
check_result.checked = True
return check_result
Expand Down Expand Up @@ -91,6 +93,16 @@ def query(self, db_name=None, sql='', limit_num=0, close_conn=True):
self.close()
return result_set

def osc_control(self, **kwargs):
"""控制osc执行,获取进度、终止、暂停、恢复等"""
sqlsha1 = kwargs.get('sqlsha1')
command = kwargs.get('command')
if command == 'get':
sql = f"inception get osc_percent '{sqlsha1}';"
else:
sql = f"inception {command} osc '{sqlsha1}';"
return self.query(sql=sql)

def close(self):
if self.conn:
self.conn.close()
Expand Down
18 changes: 16 additions & 2 deletions sql/engines/inception.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ def execute_check(self, instance=None, db_name=None, sql=''):
check_result.warning_count += 1
elif r[2] == 2 or re.match(r"\w*comments\w*", r[4], re.I): # 错误
check_result.error_count += 1
if get_syntax_type(r[5]) == 'DDL':
check_result.syntax_type = 1
# 没有找出DDL语句的才继续执行此判断
if check_result.syntax_type == 2:
if get_syntax_type(r[5]) == 'DDL':
check_result.syntax_type = 1
check_result.column_list = inception_result.column_list
check_result.checked = True
return check_result
Expand Down Expand Up @@ -208,6 +210,18 @@ def get_rollback(self, workflow):
raise Exception(e)
return list_backup_sql

def osc_control(self, **kwargs):
"""控制osc执行,获取进度、终止、暂停、恢复等"""
sqlsha1 = kwargs.get('sqlsha1')
command = kwargs.get('command')
if command == 'get':
sql = f"inception get osc_percent '{sqlsha1}';"
elif command == 'kill':
sql = f"inception stop alter '{sqlsha1}';"
else:
raise ValueError('pt-osc不支持暂停和恢复,需要停止执行请使用终止按钮!')
return self.query(sql=sql)

def close(self):
if self.conn:
self.conn.close()
Expand Down
3 changes: 2 additions & 1 deletion sql/engines/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(self, inception_result=None, **kwargs):
self.sequence = inception_result[7] or ''
self.backup_dbname = inception_result[8] or ''
self.execute_time = inception_result[9] or ''
self.sqlsha1 = inception_result[10] if len(inception_result) == 10 else ''
self.sqlsha1 = inception_result[10] or ''
self.backup_time = inception_result[11] if len(inception_result) == 12 else ''
self.actual_affected_rows = ''
else:
Expand All @@ -39,6 +39,7 @@ def __init__(self, inception_result=None, **kwargs):
self.backup_dbname = kwargs.get('backup_dbname', '')
self.execute_time = kwargs.get('execute_time', '')
self.sqlsha1 = kwargs.get('sqlsha1', '')
self.backup_time = kwargs.get('backup_time', '')
self.actual_affected_rows = kwargs.get('actual_affected_rows', '')


Expand Down
17 changes: 15 additions & 2 deletions sql/engines/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,10 @@ def execute_check(self, db_name=None, sql=''):
affected_rows=0,
execute_time=0, )
# 判断工单类型
if get_syntax_type(statement) == 'DDL':
check_result.syntax_type = 1
# 没有找出DDL语句的才继续执行此判断
if check_result.syntax_type == 2:
if get_syntax_type(statement) == 'DDL':
check_result.syntax_type = 1
check_result.rows += [result]

# 遇到禁用和高危语句直接返回,提高效率
Expand Down Expand Up @@ -260,6 +262,17 @@ def set_variable(self, variable_name, variable_value):
sql = f"""set global {variable_name}={variable_value};"""
return self.query(sql=sql)

def osc_control(self, **kwargs):
"""控制osc执行,获取进度、终止、暂停、恢复等
get、kill、pause、resume
"""
if SysConfig().get('go_inception'):
go_inception_engine = GoInceptionEngine()
return go_inception_engine.osc_control(**kwargs)
else:
inception_engine = InceptionEngine()
return inception_engine.osc_control(**kwargs)

def close(self):
if self.conn:
self.conn.close()
Expand Down
92 changes: 90 additions & 2 deletions sql/engines/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def setUp(self):

def tearDown(self):
self.ins1.delete()
self.sys_config.replace(json.dumps({}))
self.sys_config.purge()
SqlWorkflow.objects.all().delete()
SqlWorkflowContent.objects.all().delete()

Expand Down Expand Up @@ -376,7 +376,25 @@ def test_get_variables_filter(self, _query):
def test_set_variable(self, _query):
new_engine = MysqlEngine(instance=self.ins1)
new_engine.set_variable('binlog_format', 'ROW')
_query.assert_called_once()
_query.assert_called_once_with(sql="set global binlog_format=ROW;")

@patch('sql.engines.mysql.GoInceptionEngine')
def test_osc_go_inception(self, _inception_engine):
self.sys_config.set('go_inception', 'true')
_inception_engine.return_value.osc_control.return_value = ReviewSet()
command = 'get'
sqlsha1 = 'xxxxx'
new_engine = MysqlEngine(instance=self.ins1)
new_engine.osc_control(sqlsha1=sqlsha1, command=command)

@patch('sql.engines.mysql.InceptionEngine')
def test_osc_inception(self, _inception_engine):
self.sys_config.set('go_inception', 'false')
_inception_engine.return_value.osc_control.return_value = ReviewSet()
command = 'get'
sqlsha1 = 'xxxxx'
new_engine = MysqlEngine(instance=self.ins1)
new_engine.osc_control(sqlsha1=sqlsha1, command=command)


class TestRedis(TestCase):
Expand Down Expand Up @@ -785,6 +803,36 @@ def test_get_rollback_list(self, _connect):
new_engine = InceptionEngine()
new_engine.get_rollback(self.wf)

@patch('sql.engines.inception.InceptionEngine.query')
def test_osc_get(self, _query):
new_engine = InceptionEngine()
command = 'get'
sqlsha1 = 'xxxxx'
sql = f"inception get osc_percent '{sqlsha1}';"
_query.return_value = ResultSet(full_sql=sql, rows=[], column_list=[])
new_engine.osc_control(sqlsha1=sqlsha1, command=command)
_query.assert_called_once_with(sql=sql)

@patch('sql.engines.inception.InceptionEngine.query')
def test_osc_kill(self, _query):
new_engine = InceptionEngine()
command = 'kill'
sqlsha1 = 'xxxxx'
sql = f"inception stop alter '{sqlsha1}';"
_query.return_value = ResultSet(full_sql=sql, rows=[], column_list=[])
new_engine.osc_control(sqlsha1=sqlsha1, command=command)
_query.assert_called_once_with(sql=sql)

@patch('sql.engines.inception.InceptionEngine.query')
def test_osc_not_support(self, _query):
new_engine = InceptionEngine()
command = 'stop'
sqlsha1 = 'xxxxx'
sql = f"inception stop alter '{sqlsha1}';"
_query.return_value = ResultSet(full_sql=sql, rows=[], column_list=[])
with self.assertRaisesMessage(ValueError, 'pt-osc不支持暂停和恢复,需要停止执行请使用终止按钮!'):
new_engine.osc_control(sqlsha1=sqlsha1, command=command)


class TestGoInception(TestCase):
def setUp(self):
Expand Down Expand Up @@ -866,6 +914,46 @@ def test_query_not_limit(self, _conn, _cursor, _execute):
query_result = new_engine.query(db_name=0, sql='select 1', limit_num=0)
self.assertIsInstance(query_result, ResultSet)

@patch('sql.engines.goinception.GoInceptionEngine.query')
def test_osc_get(self, _query):
new_engine = GoInceptionEngine()
command = 'get'
sqlsha1 = 'xxxxx'
sql = f"inception get osc_percent '{sqlsha1}';"
_query.return_value = ResultSet(full_sql=sql, rows=[], column_list=[])
new_engine.osc_control(sqlsha1=sqlsha1, command=command)
_query.assert_called_once_with(sql=sql)

@patch('sql.engines.goinception.GoInceptionEngine.query')
def test_osc_pause(self, _query):
new_engine = GoInceptionEngine()
command = 'pause'
sqlsha1 = 'xxxxx'
sql = f"inception {command} osc '{sqlsha1}';"
_query.return_value = ResultSet(full_sql=sql, rows=[], column_list=[])
new_engine.osc_control(sqlsha1=sqlsha1, command=command)
_query.assert_called_once_with(sql=sql)

@patch('sql.engines.goinception.GoInceptionEngine.query')
def test_osc_resume(self, _query):
new_engine = GoInceptionEngine()
command = 'resume'
sqlsha1 = 'xxxxx'
sql = f"inception {command} osc '{sqlsha1}';"
_query.return_value = ResultSet(full_sql=sql, rows=[], column_list=[])
new_engine.osc_control(sqlsha1=sqlsha1, command=command)
_query.assert_called_once_with(sql=sql)

@patch('sql.engines.goinception.GoInceptionEngine.query')
def test_osc_kill(self, _query):
new_engine = GoInceptionEngine()
command = 'kill'
sqlsha1 = 'xxxxx'
sql = f"inception kill osc '{sqlsha1}';"
_query.return_value = ResultSet(full_sql=sql, rows=[], column_list=[])
new_engine.osc_control(sqlsha1=sqlsha1, command=command)
_query.assert_called_once_with(sql=sql)


class TestOracle(TestCase):
"""Oracle 测试"""
Expand Down
19 changes: 19 additions & 0 deletions sql/sql_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,3 +444,22 @@ def get_workflow_status(request):
workflow_detail = get_object_or_404(SqlWorkflow, pk=workflow_id)
result = {"status": workflow_detail.status, "msg": "", "data": ""}
return JsonResponse(result)


def osc_control(request):
"""用于mysql控制osc执行"""
workflow_id = request.POST.get('workflow_id')
sqlsha1 = request.POST.get('sqlsha1')
command = request.POST.get('command')
workflow = SqlWorkflow.objects.get(id=workflow_id)
execute_engine = get_engine(workflow.instance)
try:
execute_result = execute_engine.osc_control(command=command, sqlsha1=sqlsha1)
rows = execute_result.to_dict()
error = execute_result.error
except Exception as e:
rows = []
error = str(e)
result = {"total": len(rows), "rows": rows, "msg": error}
return HttpResponse(json.dumps(result, cls=ExtendJSONEncoder, bigint_as_string=True),
content_type='application/json')
Loading

0 comments on commit 4a36e99

Please sign in to comment.