Skip to content

Commit

Permalink
MySQL数据脱敏inception改为goinception (#1307)
Browse files Browse the repository at this point in the history
* MySQL数据脱敏inception改为goinception

* MySQL数据脱敏inception改为goinception

* 修改了逻辑,判断inception/goInception

* 增加对于goinception脱敏的测试

* 使用最新的镜像测试ci

* 使用最新的镜像测试ci

* 使用最新的镜像测试ci

* oracle脱敏更换函数 (#1306)

* oracle脱敏更换函数

* ci测试Oracle语法问题

* 删除了注释备份

* 修改了测试sql

* ci调试-尝试修改版本

* 修改测试以及还原docer.yaml文件

* 测试ci

* 测试ci

* 测试ci

* 测试ci

* 测试ci

* 测试ci

* 测试ci

* 测试ci

* 测试ci

* 测试ci

* 测试ci

* 测试ci

* 测试ci

* ci测试

* ci测试

* ci测试

* ci测试

* ci测试

* ci测试

* ci测试

* ci测试

* ci测试

* ci测试

* ci测试

* 全量修改-ci测试

* 全量修改-ci测试

* 全量修改-ci测试

* 全量修改-ci测试

* 全量修改-ci测试

* 全量修改-ci测试

* 全量修改-ci测试

* 全量修改-ci测试

Co-authored-by: 小圈圈 <[email protected]>
  • Loading branch information
unknowissue and hhyo authored Jan 13, 2022
1 parent d9763eb commit 07ff897
Show file tree
Hide file tree
Showing 11 changed files with 551 additions and 177 deletions.
4 changes: 0 additions & 4 deletions sql/engines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,6 @@ def get_engine(instance=None): # pragma: no cover
from .mongo import MongoEngine

return MongoEngine(instance=instance)
elif instance.db_type == "inception":
from .inception import InceptionEngine

return InceptionEngine(instance=instance)
elif instance.db_type == "goinception":
from .goinception import GoInceptionEngine

Expand Down
102 changes: 102 additions & 0 deletions sql/engines/goinception.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import re
import traceback
import MySQLdb
import simplejson as json

from common.config import SysConfig
from sql.models import AliyunRdsConfig
Expand Down Expand Up @@ -36,6 +37,21 @@ def get_connection(self, db_name=None):
connect_timeout=10)
return self.conn

@staticmethod
def get_backup_connection():
archer_config = SysConfig()
backup_host = archer_config.get('inception_remote_backup_host')
backup_port = int(archer_config.get('inception_remote_backup_port', 3306))
backup_user = archer_config.get('inception_remote_backup_user')
backup_password = archer_config.get('inception_remote_backup_password', '')
return MySQLdb.connect(host=backup_host,
port=backup_port,
user=backup_user,
passwd=backup_password,
charset='utf8mb4',
autocommit=True
)

def execute_check(self, instance=None, db_name=None, sql=''):
"""inception check"""
# 判断如果配置了隧道则连接隧道
Expand Down Expand Up @@ -149,6 +165,80 @@ def query_print(self, instance, db_name=None, sql=''):
raise RuntimeError(print_info.get('errmsg'))
return print_info

def query_datamasking(self, instance, db_name=None, sql=''):
"""
将sql交给goInception打印语法树。
使用 masking 参数,可参考 https://github.com/hanchuanchuan/goInception/pull/355
"""
# 判断如果配置了隧道则连接隧道
host, port, user, password = self.remote_instance_conn(instance)
sql = f"""/*--user={user};--password={password};--host={host};--port={port};--masking=1;*/
inception_magic_start;
use `{db_name}`;
{sql}
inception_magic_commit;"""
print_info = self.query(db_name=db_name, sql=sql).to_dict()[0]
# 兼容语法错误时errlevel=0的场景
if print_info['errlevel'] == 0 and print_info['errmsg'] is None :
return json.loads(_repair_json_str(print_info['query_tree']))
elif print_info['errlevel'] == 0 and print_info['errmsg'] == 'Global environment':
raise SyntaxError(f"Inception Error: {print_info['query_tree']}")
else:
raise RuntimeError(f"Inception Error: {print_info['errmsg']}")

def get_rollback(self, workflow):
"""
获取回滚语句,并且按照执行顺序倒序展示,return ['源语句','回滚语句']
"""
list_execute_result = json.loads(workflow.sqlworkflowcontent.execute_result)
# 回滚语句倒序展示
list_execute_result.reverse()
list_backup_sql = []
# 创建连接
conn = self.get_backup_connection()
cur = conn.cursor()
for row in list_execute_result:
try:
# 获取backup_db_name, 兼容旧数据'[[]]'格式
if isinstance(row, list):
if row[8] == 'None':
continue
backup_db_name = row[8]
sequence = row[7]
sql = row[5]
# 新数据
else:
if row.get('backup_dbname') in ('None', ''):
continue
backup_db_name = row.get('backup_dbname')
sequence = row.get('sequence')
sql = row.get('sql')
# 获取备份表名
opid_time = sequence.replace("'", "")
sql_table = f"""select tablename
from {backup_db_name}.$_$Inception_backup_information$_$
where opid_time='{opid_time}';"""

cur.execute(sql_table)
list_tables = cur.fetchall()
if list_tables:
# 获取备份语句
table_name = list_tables[0][0]
sql_back = f"""select rollback_statement
from {backup_db_name}.{table_name}
where opid_time='{opid_time}'"""
cur.execute(sql_back)
list_backup = cur.fetchall()
# 拼接成回滚语句列表,['源语句','回滚语句']
list_backup_sql.append([sql, '\n'.join([back_info[0] for back_info in list_backup])])
except Exception as e:
logger.error(f"获取回滚语句报错,异常信息{traceback.format_exc()}")
raise Exception(e)
# 关闭连接
if conn:
conn.close()
return list_backup_sql

def get_variables(self, variables=None):
"""获取实例参数"""
if variables:
Expand Down Expand Up @@ -247,3 +337,15 @@ def get_session_variables(instance):
for k, v in variables.items():
set_session_sql += f"inception set session {k} = '{v}';\n"
return variables, set_session_sql

def _repair_json_str(json_str):
"""
处理JSONDecodeError: Expecting property name enclosed in double quotes
inception语法树出现{"a":1,}、["a":1,]、{'a':1}、[, { }]
"""
json_str = re.sub(r"{\s*'(.+)':", r'{"\1":', json_str)
json_str = re.sub(r",\s*?]", "]", json_str)
json_str = re.sub(r",\s*?}", "}", json_str)
json_str = re.sub(r"\[,\s*?{", "[{", json_str)
json_str = json_str.replace("'", "\"")
return json_str
10 changes: 5 additions & 5 deletions sql/engines/inception.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,17 +206,17 @@ def get_rollback(self, workflow):
sql = row.get('sql')
# 获取备份表名
opid_time = sequence.replace("'", "")
sql_table = f"""select tablename
from {backup_db_name}.$_$Inception_backup_information$_$
sql_table = f"""select tablename
from {backup_db_name}.$_$Inception_backup_information$_$
where opid_time='{opid_time}';"""

cur.execute(sql_table)
list_tables = cur.fetchall()
if list_tables:
# 获取备份语句
table_name = list_tables[0][0]
sql_back = f"""select rollback_statement
from {backup_db_name}.{table_name}
sql_back = f"""select rollback_statement
from {backup_db_name}.{table_name}
where opid_time='{opid_time}'"""
cur.execute(sql_back)
list_backup = cur.fetchall()
Expand Down Expand Up @@ -271,4 +271,4 @@ def _repair_json_str(json_str):
json_str = re.sub(r",\s*?}", "}", json_str)
json_str = re.sub(r"\[,\s*?{", "[{", json_str)
json_str = json_str.replace("'", "\"")
return json_str
return json_str
10 changes: 5 additions & 5 deletions sql/engines/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
from sql.utils.sql_utils import get_syntax_type, remove_comments
from . import EngineBase
from .models import ResultSet, ReviewResult, ReviewSet
from .inception import InceptionEngine
from sql.utils.data_masking import data_masking
from sql.utils.go_data_masking import go_data_masking
from common.config import SysConfig

logger = logging.getLogger('default')
Expand All @@ -25,7 +25,7 @@ class MysqlEngine(EngineBase):
def __init__(self, instance=None):
super().__init__(instance=instance)
self.config = SysConfig()
self.inc_engine = InceptionEngine() if self.config.get('inception') else GoInceptionEngine()
self.inc_engine = GoInceptionEngine()

def get_connection(self, db_name=None):
# https://stackoverflow.com/questions/19256155/python-mysqldb-returning-x01-for-bit-values
Expand Down Expand Up @@ -112,7 +112,7 @@ def get_all_tables(self, db_name, **kwargs):

def get_all_columns_by_tb(self, db_name, tb_name, **kwargs):
"""获取所有字段, 返回一个ResultSet"""
sql = f"""SELECT
sql = f"""SELECT
COLUMN_NAME,
COLUMN_TYPE,
CHARACTER_SET_NAME,
Expand Down Expand Up @@ -234,7 +234,7 @@ def query_masking(self, db_name=None, sql='', resultset=None):
返回一个脱敏后的结果集"""
# 仅对select语句脱敏
if re.match(r"^select", sql, re.I):
mask_result = data_masking(self.instance, db_name, sql, resultset)
mask_result = go_data_masking(self.instance, db_name, sql, resultset)
else:
mask_result = resultset
return mask_result
Expand Down Expand Up @@ -338,7 +338,7 @@ def execute(self, db_name=None, sql='', close_conn=True):

def get_rollback(self, workflow):
"""通过inception获取回滚语句列表"""
inception_engine = InceptionEngine()
inception_engine = GoInceptionEngine()
return inception_engine.get_rollback(workflow)

def get_variables(self, variables=None):
Expand Down
Loading

0 comments on commit 07ff897

Please sign in to comment.