diff --git a/sql/engines/goinception.py b/sql/engines/goinception.py index c1d8302a41..d0e79638ae 100644 --- a/sql/engines/goinception.py +++ b/sql/engines/goinception.py @@ -39,8 +39,10 @@ def execute_check(self, instance=None, db_name=None, sql=''): check_result.warning_count += 1 elif r[2] == 2: # 错误 check_result.error_count += 1 - if get_syntax_type(r[5]) == 'DDL': - check_result.syntax_type = 1 + # 没有找出DDL语句的才继续执行此判断 + if check_result.syntax_type == 2: + if get_syntax_type(r[5]) == 'DDL': + check_result.syntax_type = 1 check_result.column_list = inception_result.column_list check_result.checked = True return check_result @@ -91,6 +93,16 @@ def query(self, db_name=None, sql='', limit_num=0, close_conn=True): self.close() return result_set + def osc_control(self, **kwargs): + """控制osc执行,获取进度、终止、暂停、恢复等""" + sqlsha1 = kwargs.get('sqlsha1') + command = kwargs.get('command') + if command == 'get': + sql = f"inception get osc_percent '{sqlsha1}';" + else: + sql = f"inception {command} osc '{sqlsha1}';" + return self.query(sql=sql) + def close(self): if self.conn: self.conn.close() diff --git a/sql/engines/inception.py b/sql/engines/inception.py index f4fea5665b..f8a5aefb4f 100644 --- a/sql/engines/inception.py +++ b/sql/engines/inception.py @@ -77,8 +77,10 @@ def execute_check(self, instance=None, db_name=None, sql=''): check_result.warning_count += 1 elif r[2] == 2 or re.match(r"\w*comments\w*", r[4], re.I): # 错误 check_result.error_count += 1 - if get_syntax_type(r[5]) == 'DDL': - check_result.syntax_type = 1 + # 没有找出DDL语句的才继续执行此判断 + if check_result.syntax_type == 2: + if get_syntax_type(r[5]) == 'DDL': + check_result.syntax_type = 1 check_result.column_list = inception_result.column_list check_result.checked = True return check_result @@ -208,6 +210,18 @@ def get_rollback(self, workflow): raise Exception(e) return list_backup_sql + def osc_control(self, **kwargs): + """控制osc执行,获取进度、终止、暂停、恢复等""" + sqlsha1 = kwargs.get('sqlsha1') + command = kwargs.get('command') + if command == 'get': + sql = f"inception get osc_percent '{sqlsha1}';" + elif command == 'kill': + sql = f"inception stop alter '{sqlsha1}';" + else: + raise ValueError('pt-osc不支持暂停和恢复,需要停止执行请使用终止按钮!') + return self.query(sql=sql) + def close(self): if self.conn: self.conn.close() diff --git a/sql/engines/models.py b/sql/engines/models.py index fa145b1c45..783269c1a6 100644 --- a/sql/engines/models.py +++ b/sql/engines/models.py @@ -24,7 +24,7 @@ def __init__(self, inception_result=None, **kwargs): self.sequence = inception_result[7] or '' self.backup_dbname = inception_result[8] or '' self.execute_time = inception_result[9] or '' - self.sqlsha1 = inception_result[10] if len(inception_result) == 10 else '' + self.sqlsha1 = inception_result[10] or '' self.backup_time = inception_result[11] if len(inception_result) == 12 else '' self.actual_affected_rows = '' else: @@ -39,6 +39,7 @@ def __init__(self, inception_result=None, **kwargs): self.backup_dbname = kwargs.get('backup_dbname', '') self.execute_time = kwargs.get('execute_time', '') self.sqlsha1 = kwargs.get('sqlsha1', '') + self.backup_time = kwargs.get('backup_time', '') self.actual_affected_rows = kwargs.get('actual_affected_rows', '') diff --git a/sql/engines/mysql.py b/sql/engines/mysql.py index 4b80b1f979..597ea29c94 100644 --- a/sql/engines/mysql.py +++ b/sql/engines/mysql.py @@ -183,8 +183,10 @@ def execute_check(self, db_name=None, sql=''): affected_rows=0, execute_time=0, ) # 判断工单类型 - if get_syntax_type(statement) == 'DDL': - check_result.syntax_type = 1 + # 没有找出DDL语句的才继续执行此判断 + if check_result.syntax_type == 2: + if get_syntax_type(statement) == 'DDL': + check_result.syntax_type = 1 check_result.rows += [result] # 遇到禁用和高危语句直接返回,提高效率 @@ -260,6 +262,17 @@ def set_variable(self, variable_name, variable_value): sql = f"""set global {variable_name}={variable_value};""" return self.query(sql=sql) + def osc_control(self, **kwargs): + """控制osc执行,获取进度、终止、暂停、恢复等 + get、kill、pause、resume + """ + if SysConfig().get('go_inception'): + go_inception_engine = GoInceptionEngine() + return go_inception_engine.osc_control(**kwargs) + else: + inception_engine = InceptionEngine() + return inception_engine.osc_control(**kwargs) + def close(self): if self.conn: self.conn.close() diff --git a/sql/engines/tests.py b/sql/engines/tests.py index 16ff812c45..ec6b024dc0 100644 --- a/sql/engines/tests.py +++ b/sql/engines/tests.py @@ -172,7 +172,7 @@ def setUp(self): def tearDown(self): self.ins1.delete() - self.sys_config.replace(json.dumps({})) + self.sys_config.purge() SqlWorkflow.objects.all().delete() SqlWorkflowContent.objects.all().delete() @@ -376,7 +376,25 @@ def test_get_variables_filter(self, _query): def test_set_variable(self, _query): new_engine = MysqlEngine(instance=self.ins1) new_engine.set_variable('binlog_format', 'ROW') - _query.assert_called_once() + _query.assert_called_once_with(sql="set global binlog_format=ROW;") + + @patch('sql.engines.mysql.GoInceptionEngine') + def test_osc_go_inception(self, _inception_engine): + self.sys_config.set('go_inception', 'true') + _inception_engine.return_value.osc_control.return_value = ReviewSet() + command = 'get' + sqlsha1 = 'xxxxx' + new_engine = MysqlEngine(instance=self.ins1) + new_engine.osc_control(sqlsha1=sqlsha1, command=command) + + @patch('sql.engines.mysql.InceptionEngine') + def test_osc_inception(self, _inception_engine): + self.sys_config.set('go_inception', 'false') + _inception_engine.return_value.osc_control.return_value = ReviewSet() + command = 'get' + sqlsha1 = 'xxxxx' + new_engine = MysqlEngine(instance=self.ins1) + new_engine.osc_control(sqlsha1=sqlsha1, command=command) class TestRedis(TestCase): @@ -785,6 +803,36 @@ def test_get_rollback_list(self, _connect): new_engine = InceptionEngine() new_engine.get_rollback(self.wf) + @patch('sql.engines.inception.InceptionEngine.query') + def test_osc_get(self, _query): + new_engine = InceptionEngine() + command = 'get' + sqlsha1 = 'xxxxx' + sql = f"inception get osc_percent '{sqlsha1}';" + _query.return_value = ResultSet(full_sql=sql, rows=[], column_list=[]) + new_engine.osc_control(sqlsha1=sqlsha1, command=command) + _query.assert_called_once_with(sql=sql) + + @patch('sql.engines.inception.InceptionEngine.query') + def test_osc_kill(self, _query): + new_engine = InceptionEngine() + command = 'kill' + sqlsha1 = 'xxxxx' + sql = f"inception stop alter '{sqlsha1}';" + _query.return_value = ResultSet(full_sql=sql, rows=[], column_list=[]) + new_engine.osc_control(sqlsha1=sqlsha1, command=command) + _query.assert_called_once_with(sql=sql) + + @patch('sql.engines.inception.InceptionEngine.query') + def test_osc_not_support(self, _query): + new_engine = InceptionEngine() + command = 'stop' + sqlsha1 = 'xxxxx' + sql = f"inception stop alter '{sqlsha1}';" + _query.return_value = ResultSet(full_sql=sql, rows=[], column_list=[]) + with self.assertRaisesMessage(ValueError, 'pt-osc不支持暂停和恢复,需要停止执行请使用终止按钮!'): + new_engine.osc_control(sqlsha1=sqlsha1, command=command) + class TestGoInception(TestCase): def setUp(self): @@ -866,6 +914,46 @@ def test_query_not_limit(self, _conn, _cursor, _execute): query_result = new_engine.query(db_name=0, sql='select 1', limit_num=0) self.assertIsInstance(query_result, ResultSet) + @patch('sql.engines.goinception.GoInceptionEngine.query') + def test_osc_get(self, _query): + new_engine = GoInceptionEngine() + command = 'get' + sqlsha1 = 'xxxxx' + sql = f"inception get osc_percent '{sqlsha1}';" + _query.return_value = ResultSet(full_sql=sql, rows=[], column_list=[]) + new_engine.osc_control(sqlsha1=sqlsha1, command=command) + _query.assert_called_once_with(sql=sql) + + @patch('sql.engines.goinception.GoInceptionEngine.query') + def test_osc_pause(self, _query): + new_engine = GoInceptionEngine() + command = 'pause' + sqlsha1 = 'xxxxx' + sql = f"inception {command} osc '{sqlsha1}';" + _query.return_value = ResultSet(full_sql=sql, rows=[], column_list=[]) + new_engine.osc_control(sqlsha1=sqlsha1, command=command) + _query.assert_called_once_with(sql=sql) + + @patch('sql.engines.goinception.GoInceptionEngine.query') + def test_osc_resume(self, _query): + new_engine = GoInceptionEngine() + command = 'resume' + sqlsha1 = 'xxxxx' + sql = f"inception {command} osc '{sqlsha1}';" + _query.return_value = ResultSet(full_sql=sql, rows=[], column_list=[]) + new_engine.osc_control(sqlsha1=sqlsha1, command=command) + _query.assert_called_once_with(sql=sql) + + @patch('sql.engines.goinception.GoInceptionEngine.query') + def test_osc_kill(self, _query): + new_engine = GoInceptionEngine() + command = 'kill' + sqlsha1 = 'xxxxx' + sql = f"inception kill osc '{sqlsha1}';" + _query.return_value = ResultSet(full_sql=sql, rows=[], column_list=[]) + new_engine.osc_control(sqlsha1=sqlsha1, command=command) + _query.assert_called_once_with(sql=sql) + class TestOracle(TestCase): """Oracle 测试""" diff --git a/sql/sql_workflow.py b/sql/sql_workflow.py index fe8ef40fb0..42a3ad5784 100644 --- a/sql/sql_workflow.py +++ b/sql/sql_workflow.py @@ -444,3 +444,22 @@ def get_workflow_status(request): workflow_detail = get_object_or_404(SqlWorkflow, pk=workflow_id) result = {"status": workflow_detail.status, "msg": "", "data": ""} return JsonResponse(result) + + +def osc_control(request): + """用于mysql控制osc执行""" + workflow_id = request.POST.get('workflow_id') + sqlsha1 = request.POST.get('sqlsha1') + command = request.POST.get('command') + workflow = SqlWorkflow.objects.get(id=workflow_id) + execute_engine = get_engine(workflow.instance) + try: + execute_result = execute_engine.osc_control(command=command, sqlsha1=sqlsha1) + rows = execute_result.to_dict() + error = execute_result.error + except Exception as e: + rows = [] + error = str(e) + result = {"total": len(rows), "rows": rows, "msg": error} + return HttpResponse(json.dumps(result, cls=ExtendJSONEncoder, bigint_as_string=True), + content_type='application/json') diff --git a/sql/templates/detail.html b/sql/templates/detail.html index e83a49594f..143bca4e79 100644 --- a/sql/templates/detail.html +++ b/sql/templates/detail.html @@ -101,29 +101,52 @@

{{ workflow_detail.workflow_name }}
-
- - {% if last_operation_info %} - - - - - - - - - - - -
- 操作信息 -
- {{ last_operation_info }} -
-
- {% endif %} + + + + +
+ +
+
+ + {% if last_operation_info %} + + + + + + + + + + + +
+ 操作信息 +
+ {{ last_operation_info }} +
+
+ {% endif %} +
+ + +
+ +
+
+
{% if is_can_review or is_can_cancel %} @@ -230,6 +253,34 @@

+ + + + + {% endblock content %} @@ -357,106 +408,338 @@ + + @@ -466,6 +749,8 @@ var key; var retryCnt = 1; $(document).ready(function () { + sessionStorage.setItem('sql_workflow_active_li_id', 'detail_tab'); + get_detail(); if (status === "workflow_executing") { getWorkflowStatus(workflow_id); } @@ -477,7 +762,7 @@ clearTimeout(key); key = setTimeout(function () { getWorkflowStatus(workflow_id); - }, 1000); + }, 2500); retryCnt++; } else { retryCnt = 1; @@ -513,4 +798,21 @@ + + + {% endblock %} diff --git a/sql/templates/sqlsubmit.html b/sql/templates/sqlsubmit.html index cca8936a7c..b5ecbae1ae 100644 --- a/sql/templates/sqlsubmit.html +++ b/sql/templates/sqlsubmit.html @@ -484,13 +484,13 @@ pageNumber: 1, //初始化加载第一页,默认第一页,并记录 pageSize: 500, //每页的记录行数(*) pageList: [500, 1000, 5000], //可供选择的每页的行数(*) - search: false, //是否显示表格搜索 + search: true, //是否显示表格搜索 strictSearch: false, //是否全匹配搜索 showColumns: true, //是否显示所有的列(选择显示的列) showRefresh: false, //是否显示刷新按钮 showExport: true, exportDataType: "all", - minimumCountColumns: 2, //最少允许的列数 + minimumCountColumns: 1, //最少允许的列数 uniqueId: "id", //每一行的唯一标识,一般为主键列 showToggle: true, //是否显示详细视图和列表视图的切换按钮 cardView: false, //是否显示详细视图 diff --git a/sql/tests.py b/sql/tests.py index bd3caaabbe..10918cfc50 100644 --- a/sql/tests.py +++ b/sql/tests.py @@ -261,7 +261,6 @@ def test_query_priv_check_super(self): limit_num=100) self.assertDictEqual(r, {'status': 0, 'msg': 'ok', 'data': {'priv_check': True, 'limit_num': 100}}) - @patch('sql.query_privileges._table_ref', return_value=[{'db': 'archery', 'table': 'sql_users'}]) @patch('sql.query_privileges._tb_priv', return_value=False) @patch('sql.query_privileges._db_priv', return_value=False) @@ -989,7 +988,8 @@ def test_workflow_auto_review_view(self, mock_user_instances, mock_get_engine, m r = c.post('/autoreview/', data=request_data_without_backup, follow=False) self.assertIn('detail', r.url) workflow_id = int(re.search(r'\/detail\/(\d+)\/', r.url).groups()[0]) - self.assertEqual(request_data_without_backup['workflow_name'], SqlWorkflow.objects.get(id=workflow_id).workflow_name) + self.assertEqual(request_data_without_backup['workflow_name'], + SqlWorkflow.objects.get(id=workflow_id).workflow_name) # 关闭备份选项, 不允许不备份 archer_config.set('enable_backup_switch', 'false') @@ -998,6 +998,34 @@ def test_workflow_auto_review_view(self, mock_user_instances, mock_get_engine, m workflow_id = int(re.search(r'\/detail\/(\d+)\/', r.url).groups()[0]) self.assertEqual(SqlWorkflow.objects.get(id=workflow_id).is_backup, True) + @patch('sql.sql_workflow.get_engine') + def test_osc_control(self, _get_engine): + c = Client() + c.force_login(self.superuser1) + request_data = { + 'workflow_id': self.wf1.id, + 'sqlsha1': 'sqlsha1', + 'command': 'get', + } + _get_engine.return_value.osc_control.return_value = ResultSet() + r = c.post('/inception/osc_control/', data=request_data, follow=False) + self.assertDictEqual(json.loads(r.content), + {"total": 0, "rows": [], "msg": None}) + + @patch('sql.sql_workflow.get_engine') + def test_osc_control_exception(self, _get_engine): + c = Client() + c.force_login(self.superuser1) + request_data = { + 'workflow_id': self.wf1.id, + 'sqlsha1': 'sqlsha1', + 'command': 'get', + } + _get_engine.return_value.osc_control.return_value = RuntimeError + r = c.post('/inception/osc_control/', data=request_data, follow=False) + self.assertDictEqual(json.loads(r.content), + {"total": 0, "rows": [], "msg": "type object 'RuntimeError' has no attribute 'to_dict'"}) + class TestOptimize(TestCase): """ diff --git a/sql/urls.py b/sql/urls.py index 0f3cc0c674..75f571f57c 100644 --- a/sql/urls.py +++ b/sql/urls.py @@ -54,6 +54,7 @@ path('simplecheck/', sql_workflow.check), path('getWorkflowStatus/', sql_workflow.get_workflow_status), path('del_sqlcronjob/', tasks.del_schedule), + path('inception/osc_control/', sql_workflow.osc_control), path('sql_analyze/generate/', sql_analyze.generate), path('sql_analyze/analyze/', sql_analyze.analyze),