Skip to content

Commit

Permalink
scripts: fix copydb for SQLAlchemy>=2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
tdesveaux committed Nov 3, 2024
1 parent f881974 commit 0736102
Showing 1 changed file with 37 additions and 47 deletions.
84 changes: 37 additions & 47 deletions master/buildbot/scripts/copydb.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,68 +101,58 @@ async def _copy_single_table(
column_keys = table.columns.keys()

rows_queue = queue.Queue(32)
written_count = [0]
total_count = [0]

autoincrement_foreign_key_column = None
for column_name, column in table.columns.items():
if not column.foreign_keys and column.primary_key and isinstance(column.type, sa.Integer):
autoincrement_foreign_key_column = column_name

def thd_write(conn):
def thd_write(conn: sa.Connection) -> None:
written_count = 0
max_column_id = 0
while True:
try:
rows = rows_queue.get(timeout=1)
if rows is None:
if autoincrement_foreign_key_column is not None and max_column_id != 0:
if dst_db.pool.engine.dialect.name == 'postgresql':
# Explicitly inserting primary row IDs does not bump the primary key
# sequence on Postgres
seq_name = f"{table_name}_{autoincrement_foreign_key_column}_seq"
transaction = conn.begin()
conn.execute(
f"ALTER SEQUENCE {seq_name} RESTART WITH {max_column_id + 1}"
)
transaction.commit()

rows_queue.task_done()
return

row_dicts = [{k: getattr(row, k) for k in column_keys} for row in rows]

if autoincrement_foreign_key_column is not None:
for row in row_dicts:
max_column_id = max(max_column_id, row[autoincrement_foreign_key_column])

if table_name == "buildsets":
for row_dict in row_dicts:
if row_dict["parent_buildid"] is not None:
buildset_to_parent_buildid.append((
row_dict["id"],
row_dict["parent_buildid"],
))
row_dict["parent_buildid"] = None

except queue.Empty:
continue
while (rows := rows_queue.get()) is not None:
row_dicts = [{k: getattr(row, k) for k in column_keys} for row in rows]

if autoincrement_foreign_key_column is not None:
for row in row_dicts:
max_column_id = max(max_column_id, row[autoincrement_foreign_key_column])

if table_name == "buildsets":
for row_dict in row_dicts:
if row_dict["parent_buildid"] is not None:
buildset_to_parent_buildid.append((
row_dict["id"],
row_dict["parent_buildid"],
))
row_dict["parent_buildid"] = None

try:
written_count[0] += len(rows)
print_log(
f"Copying {len(rows)} items ({written_count[0]}/{total_count[0]}) "
f"for {table_name} table"
)
written_count += len(rows)
print_log(
f"Copying {len(rows)} items ({written_count}/{total_count[0]}) "
f"for {table_name} table"
)

try:
if len(row_dicts) > 0:
conn.execute(table.insert(), row_dicts)

finally:
rows_queue.task_done()

def thd_read(conn):
if autoincrement_foreign_key_column is not None and max_column_id != 0:
if dst_db.pool.engine.dialect.name == 'postgresql':
# Explicitly inserting primary row IDs does not bump the primary key
# sequence on Postgres
seq_name = f"{table_name}_{autoincrement_foreign_key_column}_seq"
conn.exec_driver_sql(f"ALTER SEQUENCE {seq_name} RESTART WITH {max_column_id + 1}")

rows_queue.task_done()

def thd_read(conn: sa.Connection) -> None:
q = sa.select(sa.sql.func.count()).select_from(table)
total_count[0] = conn.execute(q).scalar()
count = conn.execute(q).scalar()
assert count is not None
total_count[0] = count

result = conn.execute(sa.select(table))
while chunk := result.fetchmany(10000):
Expand All @@ -171,7 +161,7 @@ def thd_read(conn):
rows_queue.put(None)

read_task = src_db.pool.do(thd_read)
write_task = dst_db.pool.do(thd_write)
write_task = dst_db.pool.do_with_transaction(thd_write)

await write_task
await read_task
Expand Down

0 comments on commit 0736102

Please sign in to comment.