Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MySQL数据脱敏inception改为goinception #1307

Merged
merged 49 commits into from
Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
2c20aca
MySQL数据脱敏inception改为goinception
unknowissue Dec 29, 2021
f01373e
MySQL数据脱敏inception改为goinception
unknowissue Dec 29, 2021
cfb127a
修改了逻辑,判断inception/goInception
unknowissue Jan 4, 2022
f8225f7
增加对于goinception脱敏的测试
unknowissue Jan 4, 2022
f6b555a
使用最新的镜像测试ci
unknowissue Jan 4, 2022
5c6832e
使用最新的镜像测试ci
unknowissue Jan 4, 2022
468c8eb
使用最新的镜像测试ci
unknowissue Jan 4, 2022
a3f803f
oracle脱敏更换函数 (#1306)
unknowissue Dec 31, 2021
39612b5
Merge branch 'master' of github.com:unknowissue/Archery
unknowissue Jan 4, 2022
ad3bbe2
ci调试-尝试修改版本
unknowissue Jan 4, 2022
2779721
Merge branch 'hhyo:master' into masking-inception-goInception
unknowissue Jan 4, 2022
e170ae9
修改测试以及还原docer.yaml文件
unknowissue Jan 5, 2022
75a41e1
Merge branch 'master' of github.com:unknowissue/Archery into masking-…
unknowissue Jan 5, 2022
1c7c911
Merge branch 'masking-inception-goInception' of github.com:unknowissu…
unknowissue Jan 5, 2022
3571e2a
测试ci
unknowissue Jan 5, 2022
591fefd
测试ci
unknowissue Jan 5, 2022
c5c8334
测试ci
unknowissue Jan 5, 2022
328bb28
测试ci
unknowissue Jan 5, 2022
a7cbac8
测试ci
unknowissue Jan 5, 2022
6f3caec
测试ci
unknowissue Jan 5, 2022
971aeaa
测试ci
unknowissue Jan 5, 2022
5f2e085
测试ci
unknowissue Jan 6, 2022
92cff29
测试ci
unknowissue Jan 6, 2022
038e187
测试ci
unknowissue Jan 6, 2022
68cfce5
测试ci
unknowissue Jan 6, 2022
1c10a5e
测试ci
unknowissue Jan 6, 2022
47dac89
测试ci
unknowissue Jan 6, 2022
0bbb1ef
ci测试
unknowissue Jan 6, 2022
e6996cd
ci测试
unknowissue Jan 6, 2022
93f6f99
ci测试
unknowissue Jan 6, 2022
c9a9943
ci测试
unknowissue Jan 6, 2022
3786f77
ci测试
unknowissue Jan 6, 2022
5176ea3
ci测试
unknowissue Jan 6, 2022
6d5f4e6
ci测试
unknowissue Jan 6, 2022
1c250f5
ci测试
unknowissue Jan 6, 2022
755a4c0
ci测试
unknowissue Jan 8, 2022
cbb4330
ci测试
unknowissue Jan 8, 2022
723cd57
ci测试
unknowissue Jan 8, 2022
17dc4c7
Merge branch 'master' into masking-inception-goInception
unknowissue Jan 8, 2022
9a9adcf
Merge branch 'master' into masking-inception-goInception
hhyo Jan 8, 2022
c24731c
Merge branch 'master' into masking-inception-goInception
hhyo Jan 11, 2022
43c3a8c
全量修改-ci测试
unknowissue Jan 12, 2022
0aa7da0
全量修改-ci测试
unknowissue Jan 12, 2022
d7b5e60
全量修改-ci测试
unknowissue Jan 12, 2022
371065c
全量修改-ci测试
unknowissue Jan 12, 2022
3291abc
全量修改-ci测试
unknowissue Jan 12, 2022
4078de2
全量修改-ci测试
unknowissue Jan 12, 2022
685fdda
全量修改-ci测试
unknowissue Jan 12, 2022
9fa9407
全量修改-ci测试
unknowissue Jan 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions sql/engines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,6 @@ def get_engine(instance=None): # pragma: no cover
from .mongo import MongoEngine

return MongoEngine(instance=instance)
elif instance.db_type == "inception":
from .inception import InceptionEngine

return InceptionEngine(instance=instance)
elif instance.db_type == "goinception":
from .goinception import GoInceptionEngine

Expand Down
102 changes: 102 additions & 0 deletions sql/engines/goinception.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import re
import traceback
import MySQLdb
import simplejson as json

from common.config import SysConfig
from sql.models import AliyunRdsConfig
Expand Down Expand Up @@ -36,6 +37,21 @@ def get_connection(self, db_name=None):
connect_timeout=10)
return self.conn

@staticmethod
def get_backup_connection():
archer_config = SysConfig()
backup_host = archer_config.get('inception_remote_backup_host')
backup_port = int(archer_config.get('inception_remote_backup_port', 3306))
backup_user = archer_config.get('inception_remote_backup_user')
backup_password = archer_config.get('inception_remote_backup_password', '')
return MySQLdb.connect(host=backup_host,
port=backup_port,
user=backup_user,
passwd=backup_password,
charset='utf8mb4',
autocommit=True
)

def execute_check(self, instance=None, db_name=None, sql=''):
"""inception check"""
# 判断如果配置了隧道则连接隧道
Expand Down Expand Up @@ -149,6 +165,80 @@ def query_print(self, instance, db_name=None, sql=''):
raise RuntimeError(print_info.get('errmsg'))
return print_info

def query_datamasking(self, instance, db_name=None, sql=''):
"""
将sql交给goInception打印语法树。
使用 masking 参数,可参考 https://github.com/hanchuanchuan/goInception/pull/355
"""
# 判断如果配置了隧道则连接隧道
host, port, user, password = self.remote_instance_conn(instance)
sql = f"""/*--user={user};--password={password};--host={host};--port={port};--masking=1;*/
inception_magic_start;
use `{db_name}`;
{sql}
inception_magic_commit;"""
print_info = self.query(db_name=db_name, sql=sql).to_dict()[0]
# 兼容语法错误时errlevel=0的场景
if print_info['errlevel'] == 0 and print_info['errmsg'] is None :
return json.loads(_repair_json_str(print_info['query_tree']))
elif print_info['errlevel'] == 0 and print_info['errmsg'] == 'Global environment':
raise SyntaxError(f"Inception Error: {print_info['query_tree']}")
else:
raise RuntimeError(f"Inception Error: {print_info['errmsg']}")

def get_rollback(self, workflow):
"""
获取回滚语句,并且按照执行顺序倒序展示,return ['源语句','回滚语句']
"""
list_execute_result = json.loads(workflow.sqlworkflowcontent.execute_result)
# 回滚语句倒序展示
list_execute_result.reverse()
list_backup_sql = []
# 创建连接
conn = self.get_backup_connection()
cur = conn.cursor()
for row in list_execute_result:
try:
# 获取backup_db_name, 兼容旧数据'[[]]'格式
if isinstance(row, list):
if row[8] == 'None':
continue
backup_db_name = row[8]
sequence = row[7]
sql = row[5]
# 新数据
else:
if row.get('backup_dbname') in ('None', ''):
continue
backup_db_name = row.get('backup_dbname')
sequence = row.get('sequence')
sql = row.get('sql')
# 获取备份表名
opid_time = sequence.replace("'", "")
sql_table = f"""select tablename
from {backup_db_name}.$_$Inception_backup_information$_$
where opid_time='{opid_time}';"""

cur.execute(sql_table)
list_tables = cur.fetchall()
if list_tables:
# 获取备份语句
table_name = list_tables[0][0]
sql_back = f"""select rollback_statement
from {backup_db_name}.{table_name}
where opid_time='{opid_time}'"""
cur.execute(sql_back)
list_backup = cur.fetchall()
# 拼接成回滚语句列表,['源语句','回滚语句']
list_backup_sql.append([sql, '\n'.join([back_info[0] for back_info in list_backup])])
except Exception as e:
logger.error(f"获取回滚语句报错,异常信息{traceback.format_exc()}")
raise Exception(e)
# 关闭连接
if conn:
conn.close()
return list_backup_sql

def get_variables(self, variables=None):
"""获取实例参数"""
if variables:
Expand Down Expand Up @@ -247,3 +337,15 @@ def get_session_variables(instance):
for k, v in variables.items():
set_session_sql += f"inception set session {k} = '{v}';\n"
return variables, set_session_sql

def _repair_json_str(json_str):
"""
处理JSONDecodeError: Expecting property name enclosed in double quotes
inception语法树出现{"a":1,}、["a":1,]、{'a':1}、[, { }]
"""
json_str = re.sub(r"{\s*'(.+)':", r'{"\1":', json_str)
json_str = re.sub(r",\s*?]", "]", json_str)
json_str = re.sub(r",\s*?}", "}", json_str)
json_str = re.sub(r"\[,\s*?{", "[{", json_str)
json_str = json_str.replace("'", "\"")
return json_str
10 changes: 5 additions & 5 deletions sql/engines/inception.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,17 +206,17 @@ def get_rollback(self, workflow):
sql = row.get('sql')
# 获取备份表名
opid_time = sequence.replace("'", "")
sql_table = f"""select tablename
from {backup_db_name}.$_$Inception_backup_information$_$
sql_table = f"""select tablename
from {backup_db_name}.$_$Inception_backup_information$_$
where opid_time='{opid_time}';"""

cur.execute(sql_table)
list_tables = cur.fetchall()
if list_tables:
# 获取备份语句
table_name = list_tables[0][0]
sql_back = f"""select rollback_statement
from {backup_db_name}.{table_name}
sql_back = f"""select rollback_statement
from {backup_db_name}.{table_name}
where opid_time='{opid_time}'"""
cur.execute(sql_back)
list_backup = cur.fetchall()
Expand Down Expand Up @@ -271,4 +271,4 @@ def _repair_json_str(json_str):
json_str = re.sub(r",\s*?}", "}", json_str)
json_str = re.sub(r"\[,\s*?{", "[{", json_str)
json_str = json_str.replace("'", "\"")
return json_str
return json_str
10 changes: 5 additions & 5 deletions sql/engines/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
from sql.utils.sql_utils import get_syntax_type, remove_comments
from . import EngineBase
from .models import ResultSet, ReviewResult, ReviewSet
from .inception import InceptionEngine
from sql.utils.data_masking import data_masking
from sql.utils.go_data_masking import go_data_masking
from common.config import SysConfig

logger = logging.getLogger('default')
Expand All @@ -25,7 +25,7 @@ class MysqlEngine(EngineBase):
def __init__(self, instance=None):
super().__init__(instance=instance)
self.config = SysConfig()
self.inc_engine = InceptionEngine() if self.config.get('inception') else GoInceptionEngine()
self.inc_engine = GoInceptionEngine()

def get_connection(self, db_name=None):
# https://stackoverflow.com/questions/19256155/python-mysqldb-returning-x01-for-bit-values
Expand Down Expand Up @@ -112,7 +112,7 @@ def get_all_tables(self, db_name, **kwargs):

def get_all_columns_by_tb(self, db_name, tb_name, **kwargs):
"""获取所有字段, 返回一个ResultSet"""
sql = f"""SELECT
sql = f"""SELECT
COLUMN_NAME,
COLUMN_TYPE,
CHARACTER_SET_NAME,
Expand Down Expand Up @@ -234,7 +234,7 @@ def query_masking(self, db_name=None, sql='', resultset=None):
返回一个脱敏后的结果集"""
# 仅对select语句脱敏
if re.match(r"^select", sql, re.I):
mask_result = data_masking(self.instance, db_name, sql, resultset)
mask_result = go_data_masking(self.instance, db_name, sql, resultset)
else:
mask_result = resultset
return mask_result
Expand Down Expand Up @@ -338,7 +338,7 @@ def execute(self, db_name=None, sql='', close_conn=True):

def get_rollback(self, workflow):
"""通过inception获取回滚语句列表"""
inception_engine = InceptionEngine()
inception_engine = GoInceptionEngine()
return inception_engine.get_rollback(workflow)

def get_variables(self, variables=None):
Expand Down
Loading