diff --git a/master/buildbot/scripts/copydb.py b/master/buildbot/scripts/copydb.py index 62232538380..accd1a32612 100644 --- a/master/buildbot/scripts/copydb.py +++ b/master/buildbot/scripts/copydb.py @@ -101,7 +101,6 @@ async def _copy_single_table( column_keys = table.columns.keys() rows_queue = queue.Queue(32) - written_count = [0] total_count = [0] autoincrement_foreign_key_column = None @@ -109,60 +108,51 @@ async def _copy_single_table( if not column.foreign_keys and column.primary_key and isinstance(column.type, sa.Integer): autoincrement_foreign_key_column = column_name - def thd_write(conn): + def thd_write(conn: sa.Connection) -> None: + written_count = 0 max_column_id = 0 - while True: - try: - rows = rows_queue.get(timeout=1) - if rows is None: - if autoincrement_foreign_key_column is not None and max_column_id != 0: - if dst_db.pool.engine.dialect.name == 'postgresql': - # Explicitly inserting primary row IDs does not bump the primary key - # sequence on Postgres - seq_name = f"{table_name}_{autoincrement_foreign_key_column}_seq" - transaction = conn.begin() - conn.execute( - f"ALTER SEQUENCE {seq_name} RESTART WITH {max_column_id + 1}" - ) - transaction.commit() - - rows_queue.task_done() - return - - row_dicts = [{k: getattr(row, k) for k in column_keys} for row in rows] - - if autoincrement_foreign_key_column is not None: - for row in row_dicts: - max_column_id = max(max_column_id, row[autoincrement_foreign_key_column]) - - if table_name == "buildsets": - for row_dict in row_dicts: - if row_dict["parent_buildid"] is not None: - buildset_to_parent_buildid.append(( - row_dict["id"], - row_dict["parent_buildid"], - )) - row_dict["parent_buildid"] = None - - except queue.Empty: - continue + while (rows := rows_queue.get()) is not None: + row_dicts = [{k: getattr(row, k) for k in column_keys} for row in rows] + + if autoincrement_foreign_key_column is not None: + for row in row_dicts: + max_column_id = max(max_column_id, row[autoincrement_foreign_key_column]) + + if table_name == "buildsets": + for row_dict in row_dicts: + if row_dict["parent_buildid"] is not None: + buildset_to_parent_buildid.append(( + row_dict["id"], + row_dict["parent_buildid"], + )) + row_dict["parent_buildid"] = None - try: - written_count[0] += len(rows) - print_log( - f"Copying {len(rows)} items ({written_count[0]}/{total_count[0]}) " - f"for {table_name} table" - ) + written_count += len(rows) + print_log( + f"Copying {len(rows)} items ({written_count}/{total_count[0]}) " + f"for {table_name} table" + ) + try: if len(row_dicts) > 0: conn.execute(table.insert(), row_dicts) - finally: rows_queue.task_done() - def thd_read(conn): + if autoincrement_foreign_key_column is not None and max_column_id != 0: + if dst_db.pool.engine.dialect.name == 'postgresql': + # Explicitly inserting primary row IDs does not bump the primary key + # sequence on Postgres + seq_name = f"{table_name}_{autoincrement_foreign_key_column}_seq" + conn.exec_driver_sql(f"ALTER SEQUENCE {seq_name} RESTART WITH {max_column_id + 1}") + + rows_queue.task_done() + + def thd_read(conn: sa.Connection) -> None: q = sa.select(sa.sql.func.count()).select_from(table) - total_count[0] = conn.execute(q).scalar() + count = conn.execute(q).scalar() + assert count is not None + total_count[0] = count result = conn.execute(sa.select(table)) while chunk := result.fetchmany(10000): @@ -171,7 +161,7 @@ def thd_read(conn): rows_queue.put(None) read_task = src_db.pool.do(thd_read) - write_task = dst_db.pool.do(thd_write) + write_task = dst_db.pool.do_with_transaction(thd_write) await write_task await read_task