-
Notifications
You must be signed in to change notification settings - Fork 217
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
database/bulk-delete: delete many rows without overwhelming the DB
- Loading branch information
1 parent
af28b7f
commit 4126e2c
Showing
2 changed files
with
60 additions
and
30 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,73 +1,81 @@ | ||
// see packages/database/pool/pool.ts for where this name is also hard coded: | ||
process.env.PGDATABASE = "smc_ephemeral_testing_database"; | ||
|
||
import { escapeIdentifier } from "pg"; | ||
|
||
import getPool from "@cocalc/database/pool"; | ||
import { SCHEMA } from "@cocalc/util/schema"; | ||
|
||
interface Opts { | ||
table: string; | ||
table: string; // e.g. project_log, etc. | ||
field: "project_id" | "account_id"; // for now, we only support a few | ||
id?: string; // default "id", the ID field in the table, which identifies each row uniquely | ||
value: string; // a UUID | ||
limit?: number; | ||
limit?: number; // default 1024 | ||
maxUtilPct?: number; // 0-100, percent | ||
} | ||
|
||
type Ret = Promise<{ | ||
rowsDeleted: number; | ||
durationS: number; | ||
totalWaitS: number; | ||
totalPgTimeS: number; | ||
}>; | ||
|
||
function deleteQuery(table: string, field: string) { | ||
function deleteQuery(table: string, field: string, id: string) { | ||
const T = escapeIdentifier(table); | ||
const F = escapeIdentifier(field); | ||
const ID = escapeIdentifier(id); | ||
|
||
return ` | ||
DELETE FROM ${T} | ||
WHERE ${F} IN ( | ||
SELECT ${F} FROM ${T} WHERE ${F} = $1 LIMIT $2 | ||
) | ||
RETURNING 1 | ||
`; | ||
WHERE ${ID} IN ( | ||
SELECT ${ID} FROM ${T} WHERE ${F} = $1 LIMIT $2 | ||
)`; | ||
} | ||
|
||
export async function bulk_delete(opts: Opts): Ret { | ||
const { table, field, value } = opts; | ||
let { limit = 1000 } = opts; | ||
const { table, field, value, id = "id", maxUtilPct = 10 } = opts; | ||
let { limit = 1024 } = opts; | ||
// assert table name is a key in SCHEMA | ||
if (!(table in SCHEMA)) { | ||
throw new Error(`table ${table} does not exist`); | ||
} | ||
|
||
const q = deleteQuery(table, field); | ||
console.log(q); | ||
console.log(opts); | ||
if (maxUtilPct < 1 || maxUtilPct > 99) { | ||
throw new Error(`maxUtilPct must be between 1 and 99`); | ||
} | ||
|
||
const q = deleteQuery(table, field, id); | ||
const pool = getPool(); | ||
|
||
const start_ts = Date.now(); | ||
let rowsDeleted = 0; | ||
|
||
let rowsDeleted = 0; | ||
let totalWaitS = 0; | ||
let totalPgTimeS = 0; | ||
while (true) { | ||
const t0 = Date.now(); | ||
const ret = await pool.query(q, [value, limit]); | ||
const td = Date.now() - t0; | ||
const dt = (Date.now() - t0) / 1000; | ||
rowsDeleted += ret.rowCount ?? 0; | ||
totalPgTimeS += dt; | ||
|
||
// adjust the limit | ||
const next = Math.round( | ||
td > 0.1 ? limit / 2 : td < 0.05 ? limit * 2 : limit, | ||
); | ||
limit = Math.max(1, Math.min(10000, next)); | ||
// adjust the limit: we aim to keep the operation between 0.1 and 0.2 secs | ||
const next = dt > 0.2 ? limit / 2 : dt < 0.1 ? limit * 2 : limit; | ||
limit = Math.max(1, Math.min(32768, Math.round(next))); | ||
|
||
// wait for a bit, but not more than 1 second ~ this aims for a max utilization of 10% | ||
const wait_ms = Math.min(1000, td * 10); | ||
await new Promise((done) => setTimeout(done, wait_ms)); | ||
const waitS = Math.min(1, dt * ((100 - maxUtilPct) / maxUtilPct)); | ||
await new Promise((done) => setTimeout(done, 1000 * waitS)); | ||
totalWaitS += waitS; | ||
|
||
console.log( | ||
`loop: deleted ${ret.rowCount} | wait=${wait_ms} | limit=${limit}`, | ||
); | ||
// console.log( | ||
// `deleted ${ret.rowCount} | dt=${dt} | wait=${waitS} | limit=${limit}`, | ||
// ); | ||
|
||
if (ret.rowCount === 0) break; | ||
} | ||
|
||
const durationS = (Date.now() - start_ts) / 1000; | ||
return { durationS, rowsDeleted }; | ||
return { durationS, rowsDeleted, totalWaitS, totalPgTimeS }; | ||
} |