diff --git a/archives/messages.go b/archives/messages.go index a3a5119..7a7f1b2 100644 --- a/archives/messages.go +++ b/archives/messages.go @@ -189,9 +189,7 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 } rows.Close() - log.WithFields(logrus.Fields{ - "msg_count": len(msgIDs), - }).Debug("found messages") + log.WithField("msg_count", len(msgIDs)).Debug("found messages") // verify we don't see more messages than there are in our archive (fewer is ok) if visibleCount > archive.RecordCount { @@ -199,19 +197,13 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 } // ok, delete our messages in batches, we do this in transactions as it spans a few different queries - for startIdx := 0; startIdx < len(msgIDs); startIdx += deleteTransactionSize { + for _, idBatch := range chunkIDs(msgIDs, deleteTransactionSize) { // no single batch should take more than a few minutes ctx, cancel := context.WithTimeout(ctx, time.Minute*15) defer cancel() start := time.Now() - endIdx := startIdx + deleteTransactionSize - if endIdx > len(msgIDs) { - endIdx = len(msgIDs) - } - batchIDs := msgIDs[startIdx:endIdx] - // start our transaction tx, err := db.BeginTxx(ctx, nil) if err != nil { @@ -219,45 +211,42 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 } // first update our delete_reason - err = executeInQuery(ctx, tx, setMessageDeleteReason, batchIDs) + err = executeInQuery(ctx, tx, setMessageDeleteReason, idBatch) if err != nil { - return fmt.Errorf("error updating delete reason: %s", err.Error()) + return errors.Wrap(err, "error updating delete reason") } // now delete any channel logs - err = executeInQuery(ctx, tx, deleteMessageLogs, batchIDs) + err = executeInQuery(ctx, tx, deleteMessageLogs, idBatch) if err != nil { - return fmt.Errorf("error removing channel logs: %s", err.Error()) + return errors.Wrap(err, "error removing channel logs") } // then any labels - err = executeInQuery(ctx, tx, deleteMessageLabels, batchIDs) + err = executeInQuery(ctx, tx, deleteMessageLabels, idBatch) if err != nil { - return fmt.Errorf("error removing message labels: %s", err.Error()) + return errors.Wrap(err, "error removing message labels") } // unlink any responses - err = executeInQuery(ctx, tx, unlinkResponses, batchIDs) + err = executeInQuery(ctx, tx, unlinkResponses, idBatch) if err != nil { - return fmt.Errorf("error unlinking responses: %s", err.Error()) + return errors.Wrap(err, "error unlinking responses") } // finally, delete our messages - err = executeInQuery(ctx, tx, deleteMessages, batchIDs) + err = executeInQuery(ctx, tx, deleteMessages, idBatch) if err != nil { - return fmt.Errorf("error deleting messages: %s", err.Error()) + return errors.Wrap(err, "error deleting messages") } // commit our transaction err = tx.Commit() if err != nil { - return fmt.Errorf("error committing message delete transaction: %s", err.Error()) + return errors.Wrap(err, "error committing message delete transaction") } - log.WithFields(logrus.Fields{ - "elapsed": time.Since(start), - "count": len(batchIDs), - }).Debug("deleted batch of messages") + log.WithField("elapsed", time.Since(start)).WithField("count", len(idBatch)).Debug("deleted batch of messages") cancel() } @@ -270,14 +259,12 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 // all went well! mark our archive as no longer needing deletion _, err = db.ExecContext(outer, setArchiveDeleted, archive.ID, deletedOn) if err != nil { - return fmt.Errorf("error setting archive as deleted: %s", err.Error()) + return errors.Wrap(err, "error setting archive as deleted") } archive.NeedsDeletion = false archive.DeletedOn = &deletedOn - logrus.WithFields(logrus.Fields{ - "elapsed": time.Since(start), - }).Info("completed deleting messages") + logrus.WithField("elapsed", time.Since(start)).Info("completed deleting messages") return nil } diff --git a/archives/runs.go b/archives/runs.go index c2e314e..763e119 100644 --- a/archives/runs.go +++ b/archives/runs.go @@ -180,9 +180,7 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie } rows.Close() - log.WithFields(logrus.Fields{ - "run_count": len(runIDs), - }).Debug("found runs") + log.WithField("run_count", len(runIDs)).Debug("found runs") // verify we don't see more runs than there are in our archive (fewer is ok) if runCount > archive.RecordCount { @@ -190,19 +188,13 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie } // ok, delete our runs in batches, we do this in transactions as it spans a few different queries - for startIdx := 0; startIdx < len(runIDs); startIdx += deleteTransactionSize { + for _, idBatch := range chunkIDs(runIDs, deleteTransactionSize) { // no single batch should take more than a few minutes ctx, cancel := context.WithTimeout(ctx, time.Minute*15) defer cancel() start := time.Now() - endIdx := startIdx + deleteTransactionSize - if endIdx > len(runIDs) { - endIdx = len(runIDs) - } - batchIDs := runIDs[startIdx:endIdx] - // start our transaction tx, err := db.BeginTxx(ctx, nil) if err != nil { @@ -210,39 +202,36 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie } // first update our delete_reason - err = executeInQuery(ctx, tx, setRunDeleteReason, batchIDs) + err = executeInQuery(ctx, tx, setRunDeleteReason, idBatch) if err != nil { - return fmt.Errorf("error updating delete reason: %s", err.Error()) + return errors.Wrap(err, "error updating delete reason") } // any recent runs - err = executeInQuery(ctx, tx, deleteRecentRuns, batchIDs) + err = executeInQuery(ctx, tx, deleteRecentRuns, idBatch) if err != nil { - return fmt.Errorf("error deleting recent runs: %s", err.Error()) + return errors.Wrap(err, "error deleting recent runs") } // unlink any parents - err = executeInQuery(ctx, tx, unlinkParents, batchIDs) + err = executeInQuery(ctx, tx, unlinkParents, idBatch) if err != nil { - return fmt.Errorf("error unliking parent runs: %s", err.Error()) + return errors.Wrap(err, "error unliking parent runs") } // finally, delete our runs - err = executeInQuery(ctx, tx, deleteRuns, batchIDs) + err = executeInQuery(ctx, tx, deleteRuns, idBatch) if err != nil { - return fmt.Errorf("error deleting runs: %s", err.Error()) + return errors.Wrap(err, "error deleting runs") } // commit our transaction err = tx.Commit() if err != nil { - return fmt.Errorf("error committing run delete transaction: %s", err.Error()) + return errors.Wrap(err, "error committing run delete transaction") } - log.WithFields(logrus.Fields{ - "elapsed": time.Since(start), - "count": len(batchIDs), - }).Debug("deleted batch of runs") + log.WithField("elapsed", time.Since(start)).WithField("count", len(idBatch)).Debug("deleted batch of runs") cancel() } @@ -255,14 +244,12 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie // all went well! mark our archive as no longer needing deletion _, err = db.ExecContext(outer, setArchiveDeleted, archive.ID, deletedOn) if err != nil { - return fmt.Errorf("error setting archive as deleted: %s", err.Error()) + return errors.Wrap(err, "error setting archive as deleted") } archive.NeedsDeletion = false archive.DeletedOn = &deletedOn - logrus.WithFields(logrus.Fields{ - "elapsed": time.Since(start), - }).Info("completed deleting runs") + logrus.WithField("elapsed", time.Since(start)).Info("completed deleting runs") return nil } diff --git a/archives/utils.go b/archives/utils.go new file mode 100644 index 0000000..d50db23 --- /dev/null +++ b/archives/utils.go @@ -0,0 +1,15 @@ +package archives + +// chunks a slice of in64 IDs +func chunkIDs(ids []int64, size int) [][]int64 { + chunks := make([][]int64, 0, len(ids)/size+1) + + for i := 0; i < len(ids); i += size { + end := i + size + if end > len(ids) { + end = len(ids) + } + chunks = append(chunks, ids[i:end]) + } + return chunks +}