From d04331d244ee3e1092c68a2336024a483589a4fd Mon Sep 17 00:00:00 2001 From: Peter Chen <34582813+PeterChen13579@users.noreply.github.com> Date: Fri, 25 Oct 2024 11:27:02 -0400 Subject: [PATCH] fix: Support Delete NFT (#1695) Fixes #1677 --- .../cassandra_delete_range.go | 70 +++++-- .../internal/cass/cass.go | 188 ++++++++++-------- .../internal/util/marker.go | 44 ++++ 3 files changed, 198 insertions(+), 104 deletions(-) create mode 100644 tools/cassandra_delete_range/internal/util/marker.go diff --git a/tools/cassandra_delete_range/cassandra_delete_range.go b/tools/cassandra_delete_range/cassandra_delete_range.go index 71c6f6141..ec0f16c0f 100755 --- a/tools/cassandra_delete_range/cassandra_delete_range.go +++ b/tools/cassandra_delete_range/cassandra_delete_range.go @@ -51,15 +51,19 @@ var ( userName = app.Flag("username", "Username to use when connecting to the cluster").String() password = app.Flag("password", "Password to use when connecting to the cluster").String() - skipSuccessorTable = app.Flag("skip-successor", "Whether to skip deletion from successor table").Default("false").Bool() - skipObjectsTable = app.Flag("skip-objects", "Whether to skip deletion from objects table").Default("false").Bool() - skipLedgerHashesTable = app.Flag("skip-ledger-hashes", "Whether to skip deletion from ledger_hashes table").Default("false").Bool() - skipTransactionsTable = app.Flag("skip-transactions", "Whether to skip deletion from transactions table").Default("false").Bool() - skipDiffTable = app.Flag("skip-diff", "Whether to skip deletion from diff table").Default("false").Bool() - skipLedgerTransactionsTable = app.Flag("skip-ledger-transactions", "Whether to skip deletion from ledger_transactions table").Default("false").Bool() - skipLedgersTable = app.Flag("skip-ledgers", "Whether to skip deletion from ledgers table").Default("false").Bool() - skipWriteLatestLedger = app.Flag("skip-write-latest-ledger", "Whether to skip writing the latest ledger index").Default("false").Bool() - skipAccTransactionsTable = app.Flag("skip-account-transactions", "Whether to skip deletion from account_transactions table").Default("false").Bool() + skipSuccessorTable = app.Flag("skip-successor", "Whether to skip deletion from successor table").Default("false").Bool() + skipObjectsTable = app.Flag("skip-objects", "Whether to skip deletion from objects table").Default("false").Bool() + skipLedgerHashesTable = app.Flag("skip-ledger-hashes", "Whether to skip deletion from ledger_hashes table").Default("false").Bool() + skipTransactionsTable = app.Flag("skip-transactions", "Whether to skip deletion from transactions table").Default("false").Bool() + skipDiffTable = app.Flag("skip-diff", "Whether to skip deletion from diff table").Default("false").Bool() + skipLedgerTransactionsTable = app.Flag("skip-ledger-transactions", "Whether to skip deletion from ledger_transactions table").Default("false").Bool() + skipLedgersTable = app.Flag("skip-ledgers", "Whether to skip deletion from ledgers table").Default("false").Bool() + skipWriteLatestLedger = app.Flag("skip-write-latest-ledger", "Whether to skip writing the latest ledger index").Default("false").Bool() + skipAccTransactionsTable = app.Flag("skip-account-transactions", "Whether to skip deletion from account_transactions table").Default("false").Bool() + skipNFTokenTable = app.Flag("skip-nf-tokens", "Whether to skip deletion from nf_tokens table").Default("false").Bool() + skipIssuerNFTokenTable = app.Flag("skip-issuer-nf-tokens-v2", "Whether to skip deletion from issuer_nf_tokens_v2 table").Default("false").Bool() + skipNFTokenURITable = app.Flag("skip-nf-tokens-uri", "Whether to skip deletion from nf_token_uris table").Default("false").Bool() + skipNFTokenTransactionsTable = app.Flag("skip-nf-token-transactions", "Whether to skip deletion from nf_token_transactions table").Default("false").Bool() workerCount = 1 // the calculated number of parallel goroutines the client should run ranges []*util.TokenRange // the calculated ranges to be executed in parallel @@ -81,19 +85,24 @@ func main() { } clioCass := cass.NewClioCass(&cass.Settings{ - SkipSuccessorTable: *skipSuccessorTable, - SkipObjectsTable: *skipObjectsTable, - SkipLedgerHashesTable: *skipLedgerHashesTable, - SkipTransactionsTable: *skipTransactionsTable, - SkipDiffTable: *skipDiffTable, - SkipLedgerTransactionsTable: *skipLedgerHashesTable, - SkipLedgersTable: *skipLedgersTable, - SkipWriteLatestLedger: *skipWriteLatestLedger, - SkipAccTransactionsTable: *skipAccTransactionsTable, - WorkerCount: workerCount, - Ranges: ranges, - RangesRead: ledgerOrTokenRange, - Command: cmd}, cluster) + SkipSuccessorTable: *skipSuccessorTable, + SkipObjectsTable: *skipObjectsTable, + SkipLedgerHashesTable: *skipLedgerHashesTable, + SkipTransactionsTable: *skipTransactionsTable, + SkipDiffTable: *skipDiffTable, + SkipLedgerTransactionsTable: *skipLedgerHashesTable, + SkipLedgersTable: *skipLedgersTable, + SkipWriteLatestLedger: *skipWriteLatestLedger, + SkipAccTransactionsTable: *skipAccTransactionsTable, + SkipNFTokenTable: *skipNFTokenTable, + SkipIssuerNFTokenTable: *skipIssuerNFTokenTable, + SkipNFTokenURITable: *skipNFTokenURITable, + SkipNFTokenTransactionsTable: *skipNFTokenTransactionsTable, + + WorkerCount: workerCount, + Ranges: ranges, + RangesRead: ledgerOrTokenRange, + Command: cmd}, cluster) switch command { case deleteAfter.FullCommand(): @@ -171,6 +180,10 @@ Skip deletion of: - ledger_transactions table : %t - ledgers table : %t - account_tx table : %t +- nf_tokens table : %t +- issuer_nf_tokens_v2 table : %t +- nf_token_uris table : %t +- nf_token_transactions table : %t Will update ledger_range : %t @@ -194,6 +207,10 @@ Will update ledger_range : %t *skipLedgerTransactionsTable, *skipLedgersTable, *skipAccTransactionsTable, + *skipNFTokenTable, + *skipIssuerNFTokenTable, + *skipNFTokenURITable, + *skipNFTokenTransactionsTable, !*skipWriteLatestLedger, ) @@ -264,6 +281,15 @@ func prepareResume(cmd *string) { // should be already deleted so we skip for deletion tableFound := false switch scanner.Text() { + case "nf_token_transactions": + *skipNFTokenURITable = true + fallthrough + case "nf_token_uris": + *skipNFTokenTable = true + fallthrough + case "nf_tokens": + *skipAccTransactionsTable = true + fallthrough case "account_tx": *skipLedgersTable = true fallthrough diff --git a/tools/cassandra_delete_range/internal/cass/cass.go b/tools/cassandra_delete_range/internal/cass/cass.go index 18275192c..ecb626d38 100644 --- a/tools/cassandra_delete_range/internal/cass/cass.go +++ b/tools/cassandra_delete_range/internal/cass/cass.go @@ -38,15 +38,19 @@ type deleteMethod struct { } type Settings struct { - SkipSuccessorTable bool - SkipObjectsTable bool - SkipLedgerHashesTable bool - SkipTransactionsTable bool - SkipDiffTable bool - SkipLedgerTransactionsTable bool - SkipLedgersTable bool - SkipWriteLatestLedger bool - SkipAccTransactionsTable bool + SkipSuccessorTable bool + SkipObjectsTable bool + SkipLedgerHashesTable bool + SkipTransactionsTable bool + SkipDiffTable bool + SkipLedgerTransactionsTable bool + SkipLedgersTable bool + SkipWriteLatestLedger bool + SkipAccTransactionsTable bool + SkipNFTokenTable bool + SkipIssuerNFTokenTable bool + SkipNFTokenURITable bool + SkipNFTokenTransactionsTable bool WorkerCount int Ranges []*util.TokenRange @@ -54,44 +58,6 @@ type Settings struct { Command string } -type Marker struct { - cmd string - file *os.File -} - -func NewMarker(cmd string) *Marker { - return &Marker{cmd: cmd} -} - -func CloseMarker(m *Marker) { - if m.file != nil { - m.file.Close() - } - os.Remove("continue.txt") -} - -func (m *Marker) EnterTable(table string) error { - // Create the file - file, err := os.OpenFile("continue.txt", os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644) - m.file = file - if err != nil { - return fmt.Errorf("failed to create file: %w", err) - } - fmt.Fprintf(m.file, "%s\n", m.cmd) - m.file.WriteString(fmt.Sprintf("%s\n", table)) - return nil -} - -func (m *Marker) MarkProgress(x int64, y int64) { - fmt.Fprintf(m.file, "%d, %d \n", x, y) -} - -func (m *Marker) ExitTable() { - m.file.Close() - m.file = nil - os.Remove("continue.txt") -} - type Cass interface { GetLedgerRange() (uint64, uint64, error) DeleteBefore(ledgerIdx uint64) @@ -227,7 +193,7 @@ func (c *ClioCass) pruneData( // successor queries if !c.settings.SkipSuccessorTable { - marker := NewMarker(c.settings.Command) + marker := util.NewMarker(c.settings.Command) if err := marker.EnterTable("successor"); err != nil { return err } @@ -235,7 +201,7 @@ func (c *ClioCass) pruneData( log.Println("Generating delete queries for successor table") rowsCount, deleteCount, errCount = c.prepareAndExecuteDeleteQueries(marker, fromLedgerIdx, toLedgerIdx, "SELECT key, seq FROM successor WHERE token(key) >= ? AND token(key) <= ?", - "DELETE FROM successor WHERE key = ? AND seq = ?", deleteMethod{deleteGeneral: maybe.Set(true)}, columnSettings{UseBlob: true, UseSeq: false}) + "DELETE FROM successor WHERE key = ? AND seq = ?", deleteMethod{deleteGeneral: maybe.Set(true)}, columnSettings{}) log.Printf("Total delete queries: %d\n", deleteCount) log.Printf("Total traversed rows: %d\n\n", rowsCount) totalRows += rowsCount @@ -247,7 +213,7 @@ func (c *ClioCass) pruneData( // objects queries if !c.settings.SkipObjectsTable { - marker := NewMarker(c.settings.Command) + marker := util.NewMarker(c.settings.Command) if err := marker.EnterTable("objects"); err != nil { return err } @@ -255,7 +221,7 @@ func (c *ClioCass) pruneData( log.Println("Generating delete queries for objects table") rowsCount, deleteCount, errCount = c.prepareAndExecuteDeleteQueries(marker, fromLedgerIdx, toLedgerIdx, "SELECT key, sequence FROM objects WHERE token(key) >= ? AND token(key) <= ?", - "DELETE FROM objects WHERE key = ? AND sequence = ?", deleteMethod{deleteObject: maybe.Set(true)}, columnSettings{UseBlob: true, UseSeq: true}) + "DELETE FROM objects WHERE key = ? AND sequence = ?", deleteMethod{deleteObject: maybe.Set(true)}, columnSettings{}) log.Printf("Total delete queries: %d\n", deleteCount) log.Printf("Total traversed rows: %d\n\n", rowsCount) totalErrors += errCount @@ -267,7 +233,7 @@ func (c *ClioCass) pruneData( // ledger_hashes queries if !c.settings.SkipLedgerHashesTable { - marker := NewMarker(c.settings.Command) + marker := util.NewMarker(c.settings.Command) if err := marker.EnterTable("ledger_hashes"); err != nil { return err } @@ -287,7 +253,7 @@ func (c *ClioCass) pruneData( // transactions queries if !c.settings.SkipTransactionsTable { - marker := NewMarker(c.settings.Command) + marker := util.NewMarker(c.settings.Command) if err := marker.EnterTable("transactions"); err != nil { return err } @@ -306,8 +272,9 @@ func (c *ClioCass) pruneData( } // diff queries + if !c.settings.SkipDiffTable { - marker := NewMarker(c.settings.Command) + marker := util.NewMarker(c.settings.Command) if err := marker.EnterTable("diff"); err != nil { return err } @@ -324,7 +291,7 @@ func (c *ClioCass) pruneData( // ledger_transactions queries if !c.settings.SkipLedgerTransactionsTable { - marker := NewMarker(c.settings.Command) + marker := util.NewMarker(c.settings.Command) if err := marker.EnterTable("ledger_transactions"); err != nil { return err } @@ -341,7 +308,7 @@ func (c *ClioCass) pruneData( // ledgers queries if !c.settings.SkipLedgersTable { - marker := NewMarker(c.settings.Command) + marker := util.NewMarker(c.settings.Command) if err := marker.EnterTable("ledgers"); err != nil { return err } @@ -358,7 +325,7 @@ func (c *ClioCass) pruneData( // account_tx queries if !c.settings.SkipAccTransactionsTable { - marker := NewMarker(c.settings.Command) + marker := util.NewMarker(c.settings.Command) if err := marker.EnterTable("account_tx"); err != nil { return err } @@ -366,7 +333,27 @@ func (c *ClioCass) pruneData( log.Println("Generating delete queries for account transactions table") rowsCount, deleteCount, errCount = c.prepareAndExecuteDeleteQueries(marker, fromLedgerIdx, toLedgerIdx, "SELECT account, seq_idx FROM account_tx WHERE token(account) >= ? AND token(account) <= ?", - "DELETE FROM account_tx WHERE account = ? AND seq_idx = (?, ?)", deleteMethod{deleteTransaction: maybe.Set(true)}, columnSettings{UseBlob: true, UseSeq: false}) + "DELETE FROM account_tx WHERE account = ? AND seq_idx = (?, ?)", deleteMethod{deleteTransaction: maybe.Set(true)}, columnSettings{}) + log.Printf("Total delete queries: %d\n", deleteCount) + log.Printf("Total traversed rows: %d\n\n", rowsCount) + totalRows += rowsCount + totalErrors += errCount + totalDeletes += deleteCount + + marker.ExitTable() + } + + // nf_token queries + if !c.settings.SkipNFTokenTable { + marker := util.NewMarker(c.settings.Command) + if err := marker.EnterTable("nf_tokens"); err != nil { + return err + } + + log.Println("Generating delete queries for nft tokens table") + rowsCount, deleteCount, errCount = c.prepareAndExecuteDeleteQueries(marker, fromLedgerIdx, toLedgerIdx, + "SELECT token_id, sequence FROM nf_tokens WHERE token(token_id) >= ? AND token(token_id) <= ?", + "DELETE FROM nf_tokens WHERE token_id = ? AND sequence = ?", deleteMethod{deleteObject: maybe.Set(true)}, columnSettings{}) log.Printf("Total delete queries: %d\n", deleteCount) log.Printf("Total traversed rows: %d\n\n", rowsCount) totalRows += rowsCount @@ -376,7 +363,47 @@ func (c *ClioCass) pruneData( marker.ExitTable() } - // TODO: take care of nft tables and other stuff like that + // issuer_nf_tokens_v2: skipped because table is small and not trivial to delete + + // nf_token_URI queries + if !c.settings.SkipNFTokenURITable { + marker := util.NewMarker(c.settings.Command) + if err := marker.EnterTable("nf_token_uris"); err != nil { + return err + } + + log.Println("Generating delete queries for nft token URI table") + rowsCount, deleteCount, errCount = c.prepareAndExecuteDeleteQueries(marker, fromLedgerIdx, toLedgerIdx, + "SELECT token_id, sequence FROM nf_token_uris WHERE token(token_id) >= ? AND token(token_id) <= ?", + "DELETE FROM nf_token_uris WHERE token_id = ? AND sequence = ?", deleteMethod{deleteObject: maybe.Set(true)}, columnSettings{}) + log.Printf("Total delete queries: %d\n", deleteCount) + log.Printf("Total traversed rows: %d\n\n", rowsCount) + totalRows += rowsCount + totalErrors += errCount + totalDeletes += deleteCount + + marker.ExitTable() + } + + // nf_token_transactions + if !c.settings.SkipNFTokenTransactionsTable { + marker := util.NewMarker(c.settings.Command) + if err := marker.EnterTable("nf_token_transactions"); err != nil { + return err + } + + log.Println("Generating delete queries for nft token transactions table") + rowsCount, deleteCount, errCount = c.prepareAndExecuteDeleteQueries(marker, fromLedgerIdx, toLedgerIdx, + "SELECT token_id, seq_idx FROM nf_token_transactions WHERE token(token_id) >= ? AND token(token_id) <= ?", + "DELETE FROM nf_token_transactions WHERE token_id = ? AND seq_idx = (?, ?)", deleteMethod{deleteTransaction: maybe.Set(true)}, columnSettings{}) + log.Printf("Total delete queries: %d\n", deleteCount) + log.Printf("Total traversed rows: %d\n\n", rowsCount) + totalRows += rowsCount + totalErrors += errCount + totalDeletes += deleteCount + + marker.ExitTable() + } if !c.settings.SkipWriteLatestLedger { var ( @@ -424,7 +451,7 @@ func (c *ClioCass) prepareAndExecuteSimpleDeleteQueries( info.Data = append(info.Data, deleteParams{Seq: i}) // for every 1000 queries in data, delete if len(info.Data) == 1000 { - _, err := c.performDeleteQueries(&info, session, colSettings) + err := c.performDeleteQueries(&info, session, colSettings) atomic.AddUint64(&totalDeletes, uint64(len(info.Data))) atomic.AddUint64(&totalErrors, err) info = deleteInfo{Query: deleteQueryTemplate} @@ -432,7 +459,7 @@ func (c *ClioCass) prepareAndExecuteSimpleDeleteQueries( } // delete the rest of queries if exists if len(info.Data) > 0 { - _, err := c.performDeleteQueries(&info, session, colSettings) + err := c.performDeleteQueries(&info, session, colSettings) atomic.AddUint64(&totalDeletes, uint64(len(info.Data))) atomic.AddUint64(&totalErrors, err) } @@ -543,7 +570,7 @@ func (c *ClioCass) prepareAccTxnDelete( } func (c *ClioCass) prepareAndExecuteDeleteQueries( - marker *Marker, + marker *util.Marker, fromLedgerIdx maybe.Maybe[uint64], toLedgerIdx maybe.Maybe[uint64], queryTemplate string, @@ -612,6 +639,8 @@ func (c *ClioCass) prepareAndExecuteDeleteQueries( prepareDeleteResult = c.prepareAccTxnDelete(scanner, &info, fromLedgerIdx, toLedgerIdx, &rowsRetrieved) } else if method.deleteGeneral.HasValue() && method.deleteGeneral.Value() { prepareDeleteResult = c.prepareDefaultDelete(scanner, &info, fromLedgerIdx, toLedgerIdx, &rowsRetrieved) + } else { + log.Fatal("Deletion method not supported") } if !prepareDeleteResult { @@ -620,19 +649,20 @@ func (c *ClioCass) prepareAndExecuteDeleteQueries( atomic.AddUint64(&totalErrors, 1) } - if len(nextPageState) == 0 { - // Checks for delete queries after iterating all pages - if len(info.Data) > 0 { - _, numErr := c.performDeleteQueries(&info, session, colSettings) - atomic.AddUint64(&totalErrors, numErr) - atomic.AddUint64(&totalDeletes, uint64(len(info.Data))) - if totalDeletes >= counter { - log.Printf("... deleted %d queries ...", counter) - counter += 1000 - } - // reset back to the deleted query template after finishing executing delete - info = deleteInfo{Query: deleteQueryTemplate} + // Checks for delete queries when there are queries available to delete + if len(info.Data) > 0 { + numErr := c.performDeleteQueries(&info, session, colSettings) + atomic.AddUint64(&totalErrors, numErr) + atomic.AddUint64(&totalDeletes, uint64(len(info.Data))) + if totalDeletes >= counter { + log.Printf("... deleted %d queries ...", counter) + counter += 1000 } + // reset back to the deleted query template after finishing executing delete + info = deleteInfo{Query: deleteQueryTemplate} + } + + if len(nextPageState) == 0 { break } pageState = nextPageState @@ -681,10 +711,9 @@ func (c *ClioCass) splitDeleteWork(info *deleteInfo) [][]deleteParams { return chunks } -func (c *ClioCass) performDeleteQueries(info *deleteInfo, session *gocql.Session, colSettings columnSettings) (uint64, uint64) { +func (c *ClioCass) performDeleteQueries(info *deleteInfo, session *gocql.Session, colSettings columnSettings) uint64 { var wg sync.WaitGroup var sessionCreationWaitGroup sync.WaitGroup - var totalDeletes uint64 var totalErrors uint64 chunks := c.splitDeleteWork(info) @@ -727,11 +756,6 @@ func (c *ClioCass) performDeleteQueries(info *deleteInfo, session *gocql.Session log.Printf("DELETE ERROR: %s\n", err) fmt.Fprintf(os.Stderr, "FAILED QUERY: %s\n", fmt.Sprintf("%s [blob=0x%x][seq=%d]", info.Query, r.Blob, r.Seq)) atomic.AddUint64(&totalErrors, 1) - } else { - atomic.AddUint64(&totalDeletes, 1) - if atomic.LoadUint64(&totalDeletes)%10000 == 0 { - log.Printf("... %d deletes ...\n", totalDeletes) - } } } } @@ -739,7 +763,7 @@ func (c *ClioCass) performDeleteQueries(info *deleteInfo, session *gocql.Session } wg.Wait() - return totalDeletes, totalErrors + return totalErrors } func (c *ClioCass) updateLedgerRange(newStartLedger maybe.Maybe[uint64], newEndLedger maybe.Maybe[uint64]) error { diff --git a/tools/cassandra_delete_range/internal/util/marker.go b/tools/cassandra_delete_range/internal/util/marker.go new file mode 100644 index 000000000..3e1bf9d7d --- /dev/null +++ b/tools/cassandra_delete_range/internal/util/marker.go @@ -0,0 +1,44 @@ +package util + +import ( + "fmt" + "os" +) + +type Marker struct { + Cmd string + File *os.File +} + +func NewMarker(cmd string) *Marker { + return &Marker{Cmd: cmd} +} + +func CloseMarker(m *Marker) { + if m.File != nil { + m.File.Close() + } + os.Remove("continue.txt") +} + +func (m *Marker) EnterTable(table string) error { + // Create the file + file, err := os.OpenFile("continue.txt", os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644) + m.File = file + if err != nil { + return fmt.Errorf("failed to create file: %w", err) + } + fmt.Fprintf(m.File, "%s\n", m.Cmd) + m.File.WriteString(fmt.Sprintf("%s\n", table)) + return nil +} + +func (m *Marker) MarkProgress(x int64, y int64) { + fmt.Fprintf(m.File, "%d, %d \n", x, y) +} + +func (m *Marker) ExitTable() { + m.File.Close() + m.File = nil + os.Remove("continue.txt") +}