Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

支持Oracle数据修改备份SQL功能,支持explain,with查询语法,对Oracle表结构和schema显示进行排序,支持sql tuning advisor,Oracle支持执行计划查看,支持Oracle update/insert/delete/create table/create index的语法SQL审核 #701

Merged
merged 21 commits into from
Apr 25, 2020
Merged
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 126 additions & 13 deletions sql/engines/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import traceback
import re
import sqlparse
import MySQLdb
LeoQuote marked this conversation as resolved.
Show resolved Hide resolved
import simplejson as json

from common.config import SysConfig
Expand Down Expand Up @@ -46,6 +47,25 @@ def info(self):
return 'Oracle engine'

@property
def auto_backup(self):
"""是否支持备份"""
return True

@staticmethod
def get_backup_connection():
archer_config = SysConfig()
backup_host = archer_config.get('inception_remote_backup_host')
backup_port = int(archer_config.get('inception_remote_backup_port', 3306))
backup_user = archer_config.get('inception_remote_backup_user')
backup_password = archer_config.get('inception_remote_backup_password')
return MySQLdb.connect(host=backup_host,
port=backup_port,
user=backup_user,
passwd=backup_password,
charset='utf8mb4',
autocommit=True
)
@property
def server_version(self):
conn = self.get_connection()
version = conn.version
Expand Down Expand Up @@ -82,14 +102,14 @@ def _get_all_schemas(self):
'DIP USERS', 'EXFSYS', 'FLOWS_FILES', 'HR USERS', 'IX USERS', 'MDDATA', 'MDSYS', 'MGMT_VIEW', 'OE USERS',
'OLAPSYS', 'ORACLE_OCM', 'ORDDATA', 'ORDPLUGINS', 'ORDSYS', 'OUTLN', 'OWBSYS', 'OWBSYS_AUDIT', 'PM USERS',
'SCOTT', 'SH USERS', 'SI_INFORMTN_SCHEMA', 'SPATIAL_CSW_ADMIN_USR', 'SPATIAL_WFS_ADMIN_USR', 'SYS',
'SYSMAN', 'SYSTEM', 'WMSYS', 'XDB', 'XS$NULL')
'SYSMAN', 'SYSTEM', 'WMSYS', 'XDB', 'XS$NULL', 'DIP', 'OJVMSYS', 'LBACSYS')
schema_list = [row[0] for row in result.rows if row[0] not in sysschema]
result.rows = schema_list
return result

def get_all_tables(self, db_name, **kwargs):
"""获取table 列表, 返回一个ResultSet"""
sql = f"""SELECT table_name FROM all_tables WHERE nvl(tablespace_name, 'no tablespace') NOT IN ('SYSTEM', 'SYSAUX') AND OWNER = '{db_name}' AND IOT_NAME IS NULL AND DURATION IS NULL
sql = f"""SELECT table_name FROM all_tables WHERE nvl(tablespace_name, 'no tablespace') NOT IN ('SYSTEM', 'SYSAUX') AND OWNER = '{db_name}' AND IOT_NAME IS NULL AND DURATION IS NULL order by table_name
"""
result = self.query(sql=sql)
tb_list = [row[0] for row in result.rows if row[0] not in ['test']]
Expand All @@ -113,7 +133,7 @@ def describe_table(self, db_name, tb_name, **kwargs):
nullable,
data_default
FROM all_tab_cols
WHERE table_name = '{tb_name}' and owner = '{db_name}'
WHERE table_name = '{tb_name}' and owner = '{db_name}' order by column_id
"""
result = self.query(sql=sql)
return result
Expand All @@ -133,7 +153,7 @@ def query_check(self, db_name=None, sql=''):
result['bad_query'] = True
result['msg'] = '没有有效的SQL语句'
return result
if re.match(r"^select", sql_lower) is None:
if re.match(r"^select|^with|^explain", sql_lower) is None:
result['bad_query'] = True
result['msg'] = '仅支持^select语法!'
return result
Expand All @@ -150,15 +170,8 @@ def query_check(self, db_name=None, sql=''):
def filter_sql(self, sql='', limit_num=0):
sql_lower = sql.lower()
# 对查询sql增加limit限制
if re.match(r"^\s*select", sql_lower):
# 针对select count(*) from之类的SQL,不做limit限制
if re.match(r"^\s*select\s+count\s*\(\s*[\*|\d]\s*\)\s+from", sql_lower, re.I):
return sql.rstrip(';')
if sql_lower.find(' rownum ') == -1:
if sql_lower.find('where') == -1:
return f"{sql.rstrip(';')} WHERE ROWNUM <= {limit_num}"
else:
return f"{sql.rstrip(';')} AND ROWNUM <= {limit_num}"
if re.match(r"^select|^with", sql_lower):
sql = f"select a.* from ({sql.rstrip(';')}) a WHERE ROWNUM <= {limit_num}"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不了解oracle,不知道这样的子查询会不会影响性能?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

一般Oracle优化器都不会有问题,有排序、去重等操作会有性能问题(有问题的原来代码本身也会有各种问题)

return sql.strip()

def query(self, db_name=None, sql='', limit_num=0, close_conn=True, **kwargs):
Expand All @@ -169,6 +182,12 @@ def query(self, db_name=None, sql='', limit_num=0, close_conn=True, **kwargs):
cursor = conn.cursor()
if db_name:
cursor.execute(f"ALTER SESSION SET CURRENT_SCHEMA = {db_name}")
sql = sql.rstrip(';')
# 支持oralce查询SQL执行计划语句
if re.match(r"^explain", sql, re.I):
cursor.execute(sql)
# 重置SQL文本,获取SQL执行计划
sql = f"select PLAN_TABLE_OUTPUT from table(dbms_xplan.display)"
cursor.execute(sql)
fields = cursor.description
if any(x[1] == cx_Oracle.CLOB for x in fields):
Expand Down Expand Up @@ -275,6 +294,11 @@ def execute_workflow(self, workflow, close_conn=True):
try:
conn = self.get_connection()
cursor = conn.cursor()
# 获取执行工单时间,用于备份SQL的日志挖掘起始时间
cursor.execute(f"alter session set nls_date_format='yyyy-mm-dd hh24:mi:ss'")
cursor.execute(f"select sysdate from dual")
rows = cursor.fetchone()
begin_time = rows[0]
# 逐条执行切分语句,追加到执行结果中
for sqlitem in sqlitemList:
statement = sqlitem.statement
Expand Down Expand Up @@ -340,10 +364,99 @@ def execute_workflow(self, workflow, close_conn=True):
))
line += 1
finally:
cursor.execute(f"select sysdate from dual")
rows = cursor.fetchone()
end_time = rows[0]
self.backup(workflow_id=workflow.id, cursor=cursor, begin_time=begin_time, end_time=end_time)
if close_conn:
self.close()
return execute_result

def backup(self,workflow_id,cursor,begin_time,end_time):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

几个建议和问题:

  1. 参数和上面的用法没对上, 这里是必选参数, 上面的是可选参数.
  2. 这里建议直接传 workflow 对象过去
  3. 这里的cursor 是不是必须传的, 如果是必须传, 建议改一个有辨识度的名字, 和 backup 的cursor 区分开来, 如果不是必须传, 可以用self.get_connection get connection 然后再拿到cursor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为了获取Oracle当前的会话信息cursor是必须传的 ,这个cursor就是执行的cursor,必须是同一个会话,才能抓取到回滚会话的信息,当然也可以关闭执行SQL会话,将需要的会话信息查询出来做参数传过去,这样备份需要新开Oracle的连接会话,多一次fork process开销,代码也更麻烦

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

参数的问题太专业这边没看懂你指的是什么,一直做 的Oracle运维,没做过研发,抱歉

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的,这个要想获得备份语句,需要和已经执行的上线sql在同一会话里面,要不然不方便从v$logmnr_contents 获取到undo sql

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def backup(self, workflow, current_cursor, begin_time, end_time):

self.backup(workflow, cursor, begin_time, end_time)

# 回滚SQL入库
LeoQuote marked this conversation as resolved.
Show resolved Hide resolved
# 生成回滚SQL,执行用户需要有grant select any transaction to 权限,需要有grant execute on dbms_logmnr to权限
# 数据库需开启最小化附加日志alter database add supplemental log data;
# 需为归档模式;开启附件日志会增加redo日志量,一般不会有多大影响,需评估归档磁盘空间,redo磁盘IO性能
# 创建备份库连接
try:
conn = self.get_backup_connection()
cur = conn.cursor()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里改成 backup_cursor = conn.cursor() 后面 curcursor 会看花眼

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

本地部署的代码(1.7.8版本)这边加上了Oracle执行计划查看支持(pr已经实现),Oracle数据修改SQL审核(explain plan for 实现),改动的太多了,我按上面要求重新整理测试后再提交一份吧,这个版本上线的Oracle存储过程和本地1.7.8版本的改动不兼容,后续打算把Oracle的sql tuning advisor也集成到SQL优化功能上去,前端代码暂时没看懂,哪位大神能给点思路?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

那块前端代码比较乱,看你需要展示什么信息,如果能做成通用的格式,可以扩充一下现有的engine,具体可以添加一下我的个人微信

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好吧, 下次你提的时候记得在你的fork里新开一个分支, master 我们没法帮你修改, 一般会有保护.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oracle的sql tuning advisor是Oracle自带的存储过程,执行普通的查询SQL就能获取到SQL优化报告,展示信息其实就是个查询SQL的返回结果。Oracle执行计划获取和这个思路是一样的,执行Oracle特定的存储过程后返回一个clob类型的查询结果。包括Oracle实例性能诊断优化建议报告(ADDM),AWR报告,都是可以通过调用Oracle存储过程的形式获取到,如果要丰富Oracle的功能确实需要做个通用格式,能力有限,完全没思路要怎么弄

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

那实现和返回格式都参考sql优化内的执行计划展示就行

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oracle要有新功能了,好期待。
我准备五一研究一下archery的源代码,希望能看懂。

cur.execute(f"""create database if not exists ora_backup;""")
cur.execute(f"use ora_backup;")
cur.execute(f"""CREATE TABLE if not exists `sql_rollback` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`redo_sql` mediumtext,
`undo_sql` mediumtext,
`workflow_id` bigint(20) NOT NULL,
PRIMARY KEY (`id`),
key `idx_sql_rollback_01` (`workflow_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;""")
logmnr_start_sql = f'''begin
dbms_logmnr.start_logmnr(
starttime=>to_date('{begin_time}','yyyy-mm-dd hh24:mi:ss'),
endtime=>to_date('{end_time}','yyyy/mm/dd hh24:mi:ss'),
options=>dbms_logmnr.dict_from_online_catalog + dbms_logmnr.continuous_mine);
end;'''
undo_sql = f'''select sql_redo,sql_undo from v$logmnr_contents
where SEG_OWNER not in ('SYS','SYSTEM')
and session# = (select sid from v$mystat where rownum = 1)
and serial# = (select serial# from v$session s where s.sid = (select sid from v$mystat where rownum = 1 )) order by scn desc'''
logmnr_end_sql = f'''begin
dbms_logmnr.end_logmnr;
end;'''
cursor.execute(logmnr_start_sql)
cursor.execute(undo_sql)
rows = cursor.fetchall()
cursor.execute(logmnr_end_sql)
if len(rows) > 0:
for row in rows:
redo_sql=f"{row[0]}"
redo_sql=redo_sql.replace("'","\\'")
if row[1] is None:
undo_sql = f' '
else:
undo_sql=f"{row[1]}"
undo_sql=undo_sql.replace("'","\\'")
sql = f"""insert into sql_rollback(redo_sql,undo_sql,workflow_id) values('{redo_sql}','{undo_sql}',{workflow_id});"""
cur.execute(sql)
except Exception as e:
logger.warning(f"备份失败,错误信息{traceback.format_exc()}")
return False
finally:
# 关闭连接
if conn:
conn.close()
return True

def get_rollback(self, workflow):
"""
获取回滚语句,并且按照执行顺序倒序展示,return ['源语句','回滚语句']
"""
list_execute_result = json.loads(workflow.sqlworkflowcontent.execute_result)
# 回滚语句倒序展示
list_execute_result.reverse()
list_backup_sql = []
try:
# 创建连接
conn = self.get_backup_connection()
cur = conn.cursor()
sql = f"""select redo_sql,undo_sql from sql_rollback where workflow_id = {workflow.id} order by id;"""
cur.execute(f"use ora_backup;")
cur.execute(sql)
list_tables = cur.fetchall()
for row in list_tables:
redo_sql = row[0]
undo_sql = row[1]
# 拼接成回滚语句列表,['源语句','回滚语句']
list_backup_sql.append([redo_sql,undo_sql])
except Exception as e:
logger.error(f"获取回滚语句报错,异常信息{traceback.format_exc()}")
raise Exception(e)
# 关闭连接
if conn:
conn.close()
return list_backup_sql

def close(self):
if self.conn:
self.conn.close()
Expand Down