Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

支持Oracle数据修改备份SQL功能,支持explain,with查询语法,对Oracle表结构和schema显示进行排序,支持sql tuning advisor,Oracle支持执行计划查看,支持Oracle update/insert/delete/create table/create index的语法SQL审核 #701

Merged
merged 21 commits into from
Apr 25, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 44 additions & 71 deletions sql/engines/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,18 +278,12 @@ def execute_check(self, db_name=None, sql=''):
return check_result

def execute_workflow(self, workflow, close_conn=True):
"""执行上线单,返回Review set
原来的逻辑是根据 sql_content简单来分割SQL,进而再执行这些SQL
新的逻辑变更为根据审核结果中记录的sql来执行,
如果是PLSQL存储过程等对象定义操作,还需检查确认新建对象是否编译通过!
"""
review_content = workflow.sqlworkflowcontent.review_content
review_result = json.loads(review_content)
sqlitemList = get_exec_sqlitem_list(review_result, workflow.db_name)

"""执行上线单,返回Review set"""
sql = workflow.sqlworkflowcontent.sql_content
execute_result = ReviewSet(full_sql=sql)

# 删除注释语句,切分语句,将切换CURRENT_SCHEMA语句增加到切分结果中
sql = sqlparse.format(sql, strip_comments=True)
split_sql = [f"ALTER SESSION SET CURRENT_SCHEMA = {workflow.db_name};"] + sqlparse.split(sql)
line = 1
statement = None
try:
Expand All @@ -301,46 +295,26 @@ def execute_workflow(self, workflow, close_conn=True):
rows = cursor.fetchone()
begin_time = rows[0]
# 逐条执行切分语句,追加到执行结果中
for sqlitem in sqlitemList:
statement = sqlitem.statement
if sqlitem.stmt_type == "SQL":
statement = statement.rstrip(';')
for statement in split_sql:
statement = statement.rstrip(';')
with FuncTimer() as t:
cursor.execute(statement)
conn.commit()

rowcount = cursor.rowcount
stagestatus = "Execute Successfully"
if sqlitem.stmt_type == "PLSQL" and sqlitem.object_name and sqlitem.object_name != 'ANONYMOUS' and sqlitem.object_name != '':
query_obj_sql = f"""SELECT OBJECT_NAME, STATUS, TO_CHAR(LAST_DDL_TIME, 'YYYY-MM-DD HH24:MI:SS') FROM ALL_OBJECTS
WHERE OWNER = '{sqlitem.object_owner}'
AND OBJECT_NAME = '{sqlitem.object_name}'
"""
cursor.execute(query_obj_sql)
row = cursor.fetchone()
if row:
status = row[1]
if status and status == "INVALID":
stagestatus = "Compile Failed. Object " + sqlitem.object_owner + "." + sqlitem.object_name + " is invalid."
else:
stagestatus = "Compile Failed. Object " + sqlitem.object_owner + "." + sqlitem.object_name + " doesn't exist."

if stagestatus != "Execute Successfully":
raise Exception(stagestatus)

if statement !='':
cursor.execute(statement)
conn.commit()
execute_result.rows.append(ReviewResult(
id=line,
errlevel=0,
stagestatus=stagestatus,
stagestatus='Execute Successfully',
errormessage='None',
sql=statement,
affected_rows=rowcount,
affected_rows=cursor.rowcount,
execute_time=t.cost,
))
line += 1
except Exception as e:
logger.warning(f"Oracle命令执行报错,语句:{statement or sql}, 错误信息:{traceback.format_exc()}")
execute_result.error = str(e)
#conn.rollback()
# 追加当前报错语句信息到执行结果中
execute_result.rows.append(ReviewResult(
id=line,
Expand All @@ -353,50 +327,32 @@ def execute_workflow(self, workflow, close_conn=True):
))
line += 1
# 报错语句后面的语句标记为审核通过、未执行,追加到执行结果中
for sqlitem in sqlitemList[line - 1:]:
for statement in split_sql[line - 1:]:
execute_result.rows.append(ReviewResult(
id=line,
errlevel=0,
stagestatus='Audit completed',
errormessage=f'前序语句失败, 未执行',
sql=sqlitem.statement,
sql=statement,
affected_rows=0,
execute_time=0,
))
line += 1
finally:
# 生成回滚SQL,执行用户需要有grant select any transaction to 权限,需要有grant execute on dbms_logmnr to权限
# 数据库需开启最小化附加日志alter database add supplemental log data;
# 需为归档模式;开启附件日志会增加redo日志量,一般不会有多大影响,需评估归档磁盘空间,redo磁盘IO性能
cursor.execute(f"select sysdate from dual")
rows = cursor.fetchone()
end_time = rows[0]
logmnr_start_sql = f'''begin
dbms_logmnr.start_logmnr(
starttime=>to_date('{begin_time}','yyyy-mm-dd hh24:mi:ss'),
endtime=>to_date('{end_time}','yyyy/mm/dd hh24:mi:ss'),
options=>dbms_logmnr.dict_from_online_catalog + dbms_logmnr.continuous_mine);
end;'''
undo_sql = f'''select sql_redo,sql_undo from v$logmnr_contents where
SEG_OWNER not in ('SYS','SYSTEM')
and session# = (select s.sid from v$session s where s.sid = (select sid from v$mystat where rownum = 1 ))
and serial# = (select serial# from v$session s where s.sid = (select sid from v$mystat where rownum = 1 )) order by scn desc'''
logmnr_end_sql = f'''begin
dbms_logmnr.end_logmnr;
end;'''
workflow_id = f"{workflow.sqlworkflowcontent.workflow_id}"
cursor.execute(logmnr_start_sql)
cursor.execute(undo_sql)
rows = cursor.fetchall()
cursor.execute(logmnr_end_sql)
self.ora_backup_insert(rows=rows, id=workflow_id)
self.backup(id=workflow.id,cursor=cursor,begin_time=begin_time,end_time=end_time)
if close_conn:
self.close()
return execute_result
def ora_backup_insert(self, rows = [], id = 0):

def backup(self,id,cursor,begin_time,end_time):
LeoQuote marked this conversation as resolved.
Show resolved Hide resolved
# 回滚SQL入库
LeoQuote marked this conversation as resolved.
Show resolved Hide resolved
# 创建连接
# 生成回滚SQL,执行用户需要有grant select any transaction to 权限,需要有grant execute on dbms_logmnr to权限
# 数据库需开启最小化附加日志alter database add supplemental log data;
# 需为归档模式;开启附件日志会增加redo日志量,一般不会有多大影响,需评估归档磁盘空间,redo磁盘IO性能
# 创建备份库连接
try:
conn = self.get_backup_connection()
cur = conn.cursor()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里改成 backup_cursor = conn.cursor() 后面 curcursor 会看花眼

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

本地部署的代码(1.7.8版本)这边加上了Oracle执行计划查看支持(pr已经实现),Oracle数据修改SQL审核(explain plan for 实现),改动的太多了,我按上面要求重新整理测试后再提交一份吧,这个版本上线的Oracle存储过程和本地1.7.8版本的改动不兼容,后续打算把Oracle的sql tuning advisor也集成到SQL优化功能上去,前端代码暂时没看懂,哪位大神能给点思路?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

那块前端代码比较乱,看你需要展示什么信息,如果能做成通用的格式,可以扩充一下现有的engine,具体可以添加一下我的个人微信

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好吧, 下次你提的时候记得在你的fork里新开一个分支, master 我们没法帮你修改, 一般会有保护.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oracle的sql tuning advisor是Oracle自带的存储过程,执行普通的查询SQL就能获取到SQL优化报告,展示信息其实就是个查询SQL的返回结果。Oracle执行计划获取和这个思路是一样的,执行Oracle特定的存储过程后返回一个clob类型的查询结果。包括Oracle实例性能诊断优化建议报告(ADDM),AWR报告,都是可以通过调用Oracle存储过程的形式获取到,如果要丰富Oracle的功能确实需要做个通用格式,能力有限,完全没思路要怎么弄

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

那实现和返回格式都参考sql优化内的执行计划展示就行

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oracle要有新功能了,好期待。
我准备五一研究一下archery的源代码,希望能看懂。

Expand All @@ -410,11 +366,31 @@ def ora_backup_insert(self, rows = [], id = 0):
PRIMARY KEY (`id`),
key `idx_sql_rollback_01` (`workflow_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;""")
logmnr_start_sql = f'''begin
dbms_logmnr.start_logmnr(
starttime=>to_date('{begin_time}','yyyy-mm-dd hh24:mi:ss'),
endtime=>to_date('{end_time}','yyyy/mm/dd hh24:mi:ss'),
options=>dbms_logmnr.dict_from_online_catalog + dbms_logmnr.continuous_mine);
end;'''
undo_sql = f'''select sql_redo,sql_undo from v$logmnr_contents where
SEG_OWNER not in ('SYS','SYSTEM')
and session# = (select s.sid from v$session s where s.sid = (select sid from v$mystat where rownum = 1 ))
and serial# = (select serial# from v$session s where s.sid = (select sid from v$mystat where rownum = 1 )) order by scn desc'''
logmnr_end_sql = f'''begin
dbms_logmnr.end_logmnr;
end;'''
cursor.execute(logmnr_start_sql)
cursor.execute(undo_sql)
rows = cursor.fetchall()
cursor.execute(logmnr_end_sql)
if len(rows) > 0:
for row in rows:
redo_sql=f"{row[0]}"
redo_sql=redo_sql.replace("'","\\'")
undo_sql=f"{row[1]}"
if row[1] is None:
undo_sql = f' '
else:
undo_sql=f"{row[1]}"
undo_sql=undo_sql.replace("'","\\'")
sql = f"""insert into sql_rollback(redo_sql,undo_sql,workflow_id) values('{redo_sql}','{undo_sql}',{id});"""
cur.execute(sql)
Expand All @@ -432,7 +408,7 @@ def get_rollback(self, workflow):
获取回滚语句,并且按照执行顺序倒序展示,return ['源语句','回滚语句']
"""
list_execute_result = json.loads(workflow.sqlworkflowcontent.execute_result)
workflow_id = workflow.sqlworkflowcontent.workflow_id
workflow_id = workflow.id
LeoQuote marked this conversation as resolved.
Show resolved Hide resolved
# 回滚语句倒序展示
list_execute_result.reverse()
list_backup_sql = []
Expand All @@ -446,10 +422,7 @@ def get_rollback(self, workflow):
list_tables = cur.fetchall()
for row in list_tables:
redo_sql = row[0]
if row[1] is None:
undo_sql = ' '
else:
undo_sql = row[1]
undo_sql = row[1]
# 拼接成回滚语句列表,['源语句','回滚语句']
list_backup_sql.append([redo_sql,undo_sql])
except Exception as e:
Expand Down