-
Notifications
You must be signed in to change notification settings - Fork 1
/
__init__.py
75 lines (67 loc) · 2.57 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# Adapted from code written by Paul Kippes and released as
# part of the Python standard library, see
# https://github.com/python/cpython/blob/v3.8.5/Lib/sqlite3/dump.py
CREATE_TABLE_PREFIX = "CREATE TABLE "
def iterdump(connection):
cu = connection.cursor()
yield "BEGIN TRANSACTION;"
writable_schema = False
q = """
SELECT "name", "type", "sql"
FROM "sqlite_master"
WHERE "sql" NOT NULL AND
"type" == 'table'
ORDER BY "name"
"""
schema_res = cu.execute(q)
for table_name, type, sql in schema_res.fetchall():
if table_name == "sqlite_sequence":
yield ('DELETE FROM "sqlite_sequence";')
elif table_name == "sqlite_stat1":
yield ('ANALYZE "sqlite_master";')
elif table_name.startswith("sqlite_"):
continue
elif sql.startswith("CREATE VIRTUAL TABLE"):
if not writable_schema:
yield "PRAGMA writable_schema=ON;"
writable_schema = True
qtable = table_name.replace("'", "''")
yield (
"INSERT INTO sqlite_master(type,name,tbl_name,rootpage,sql) "
"VALUES('table','{0}','{0}',0,'{1}');"
).format(qtable, sql)
# Skip the bit that writes the INSERTs for this table
continue
elif sql.upper().startswith(CREATE_TABLE_PREFIX):
yield "CREATE TABLE IF NOT EXISTS {};".format(
sql[len(CREATE_TABLE_PREFIX):]
)
else:
yield "{0};".format(sql)
# Build the insert statement for each row of the current table
table_name_ident = table_name.replace('"', '""')
res = cu.execute('PRAGMA table_info("{0}")'.format(table_name_ident))
column_names = [str(table_info[1]) for table_info in res.fetchall()]
q = """SELECT 'INSERT INTO "{0}" VALUES({1})' FROM "{0}";""".format(
table_name_ident,
",".join(
"""'||quote("{0}")||'""".format(col.replace('"', '""'))
for col in column_names
),
)
query_res = cu.execute(q)
for row in query_res:
yield "{0};".format(row[0])
# Now when the type is 'index', 'trigger', or 'view'
q = """
SELECT "name", "type", "sql"
FROM "sqlite_master"
WHERE "sql" NOT NULL AND
"type" IN ('index', 'trigger', 'view')
"""
schema_res = cu.execute(q)
for name, type, sql in schema_res.fetchall():
yield "{0};".format(sql)
if writable_schema:
yield "PRAGMA writable_schema=OFF;"
yield "COMMIT;"