Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-79009: sqlite3.iterdump now correctly handles tables with autoincrement #9621

Merged
merged 15 commits into from
Jun 19, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion Lib/sqlite3/dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,14 @@ def _iterdump(connection):
ORDER BY "name"
"""
schema_res = cu.execute(q)
sqlite_sequence = []
for table_name, type, sql in schema_res.fetchall():
if table_name == 'sqlite_sequence':
yield('DELETE FROM "sqlite_sequence";')
sqlite_sequence = ['DELETE FROM "sqlite_sequence"']
rows = cu.execute('SELECT * FROM "sqlite_sequence";').fetchall()
sqlite_sequence.extend(['INSERT INTO "sqlite_sequence" VALUES(\'{0}\',{1})'
.format(row[0], row[1]) for row in rows])
continue
elif table_name == 'sqlite_stat1':
yield('ANALYZE "sqlite_master";')
elif table_name.startswith('sqlite_'):
Expand Down Expand Up @@ -67,4 +72,8 @@ def _iterdump(connection):
for name, type, sql in schema_res.fetchall():
yield('{0};'.format(sql))

# Yield statements concerning the sqlite_sequence table at the end of the transaction: (bpo-34828)
erlend-aasland marked this conversation as resolved.
Show resolved Hide resolved
for row in sqlite_sequence:
yield("{0};".format(row))

yield('COMMIT;')
70 changes: 70 additions & 0 deletions Lib/sqlite3/test/test_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,76 @@ def test_table_dump(self):
[self.assertEqual(expected_sqls[i], actual_sqls[i])
for i in range(len(expected_sqls))]

def test_dump_autoincrement(self):
expected_sqls = [
"""CREATE TABLE "posts" (id int primary key);""",
"""INSERT INTO "posts" VALUES(0);""",
"""CREATE TABLE "tags" ( """
"""id integer primary key autoincrement,"""
"""tag varchar(256),"""
"""post int references posts);""",
"""INSERT INTO "tags" VALUES(NULL,'test',0);""",
"""CREATE TABLE "tags2" ( """
"""id integer primary key autoincrement,"""
"""tag varchar(256),"""
"""post int references posts);"""
]
itssme marked this conversation as resolved.
Show resolved Hide resolved

for sql_statement in expected_sqls:
self.cu.execute(sql_statement)
iterdump = self.cx.iterdump()
actual_sqls = [sql_statement for sql_statement in iterdump]

# the NULL value should now be automatically be set to 1
expected_sqls[3] = expected_sqls[3].replace("NULL", "1")
expected_sqls = ['BEGIN TRANSACTION;'] + expected_sqls + \
['DELETE FROM "sqlite_sequence";'] + \
["""INSERT INTO "sqlite_sequence" VALUES('tags',1);"""] + \
['COMMIT;']

for i in range(len(expected_sqls)):
self.assertEqual(expected_sqls[i], actual_sqls[i])
itssme marked this conversation as resolved.
Show resolved Hide resolved

def test_dump_autoincrement_create_new_db(self):
old_db = [
"""BEGIN TRANSACTION ;""",
"""CREATE TABLE "posts" (id int primary key);""",
"""INSERT INTO "posts" VALUES(0);""",
"""CREATE TABLE "tags" ( """
"""id integer primary key autoincrement,"""
"""tag varchar(256),"""
"""post int references posts);""",
"""CREATE TABLE "tags2" ( """
"""id integer primary key autoincrement,"""
"""tag varchar(256),"""
"""post int references posts);"""
]
for i in range(1, 10):
old_db.append("""INSERT INTO "tags"
VALUES(NULL,'test{0}',0);""".format(i))
for i in range(1, 5):
old_db.append("""INSERT INTO "tags2"
VALUES(NULL,'test{0}',0);""".format(i))
old_db.append("COMMIT;")

for sql_statement in old_db:
self.cu.execute(sql_statement)

cx2 = sqlite.connect(":memory:")
itssme marked this conversation as resolved.
Show resolved Hide resolved
query = "".join(line for line in self.cx.iterdump())
itssme marked this conversation as resolved.
Show resolved Hide resolved
cx2.executescript(query)
cu2 = cx2.cursor()

self.assertEqual(cu2.execute('SELECT "seq"'
'FROM "sqlite_sequence"'
'WHERE "name" == "tags"').fetchall()[0][0], 9)
self.assertEqual(cu2.execute('SELECT "seq" FROM'
'"sqlite_sequence"'
'WHERE "name" == "tags2"').fetchall()[0][0], 4)

cu2.close()
cx2.close()

def test_unorderable_row(self):
# iterdump() should be able to cope with unorderable row types (issue #15545)
class UnorderableRow:
Expand Down
1 change: 1 addition & 0 deletions Misc/ACKS
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,7 @@ Ron Klatchko
Reid Kleckner
Carsten Klein
Bastian Kleineidam
Joel Klimont
Bob Kline
Matthias Klose
Jeremy Kloth
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:meth:`~sqlite3.Connection.iterdump` now handles databases that use ``AUTOINCREMENT`` in one or more tables.