From c0346bac33d4dd5cd2c0982e9e42a809cfd7c50d Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Mon, 29 Nov 2021 17:22:42 -0500 Subject: [PATCH 1/5] Add chunkIDs util function --- archives/messages.go | 31 +++++++++---------------------- archives/runs.go | 29 ++++++++--------------------- archives/utils.go | 15 +++++++++++++++ 3 files changed, 32 insertions(+), 43 deletions(-) create mode 100644 archives/utils.go diff --git a/archives/messages.go b/archives/messages.go index a3a5119..3098213 100644 --- a/archives/messages.go +++ b/archives/messages.go @@ -189,9 +189,7 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 } rows.Close() - log.WithFields(logrus.Fields{ - "msg_count": len(msgIDs), - }).Debug("found messages") + log.WithField("msg_count", len(msgIDs)).Debug("found messages") // verify we don't see more messages than there are in our archive (fewer is ok) if visibleCount > archive.RecordCount { @@ -199,19 +197,13 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 } // ok, delete our messages in batches, we do this in transactions as it spans a few different queries - for startIdx := 0; startIdx < len(msgIDs); startIdx += deleteTransactionSize { + for _, idBatch := range chunkIDs(msgIDs, deleteTransactionSize) { // no single batch should take more than a few minutes ctx, cancel := context.WithTimeout(ctx, time.Minute*15) defer cancel() start := time.Now() - endIdx := startIdx + deleteTransactionSize - if endIdx > len(msgIDs) { - endIdx = len(msgIDs) - } - batchIDs := msgIDs[startIdx:endIdx] - // start our transaction tx, err := db.BeginTxx(ctx, nil) if err != nil { @@ -219,31 +211,31 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 } // first update our delete_reason - err = executeInQuery(ctx, tx, setMessageDeleteReason, batchIDs) + err = executeInQuery(ctx, tx, setMessageDeleteReason, idBatch) if err != nil { return fmt.Errorf("error updating delete reason: %s", err.Error()) } // now delete any channel logs - err = executeInQuery(ctx, tx, deleteMessageLogs, batchIDs) + err = executeInQuery(ctx, tx, deleteMessageLogs, idBatch) if err != nil { return fmt.Errorf("error removing channel logs: %s", err.Error()) } // then any labels - err = executeInQuery(ctx, tx, deleteMessageLabels, batchIDs) + err = executeInQuery(ctx, tx, deleteMessageLabels, idBatch) if err != nil { return fmt.Errorf("error removing message labels: %s", err.Error()) } // unlink any responses - err = executeInQuery(ctx, tx, unlinkResponses, batchIDs) + err = executeInQuery(ctx, tx, unlinkResponses, idBatch) if err != nil { return fmt.Errorf("error unlinking responses: %s", err.Error()) } // finally, delete our messages - err = executeInQuery(ctx, tx, deleteMessages, batchIDs) + err = executeInQuery(ctx, tx, deleteMessages, idBatch) if err != nil { return fmt.Errorf("error deleting messages: %s", err.Error()) } @@ -254,10 +246,7 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 return fmt.Errorf("error committing message delete transaction: %s", err.Error()) } - log.WithFields(logrus.Fields{ - "elapsed": time.Since(start), - "count": len(batchIDs), - }).Debug("deleted batch of messages") + log.WithField("elapsed", time.Since(start)).WithField("count", len(idBatch)).Debug("deleted batch of messages") cancel() } @@ -275,9 +264,7 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 archive.NeedsDeletion = false archive.DeletedOn = &deletedOn - logrus.WithFields(logrus.Fields{ - "elapsed": time.Since(start), - }).Info("completed deleting messages") + logrus.WithField("elapsed", time.Since(start)).Info("completed deleting messages") return nil } diff --git a/archives/runs.go b/archives/runs.go index c2e314e..bfb964f 100644 --- a/archives/runs.go +++ b/archives/runs.go @@ -180,9 +180,7 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie } rows.Close() - log.WithFields(logrus.Fields{ - "run_count": len(runIDs), - }).Debug("found runs") + log.WithField("run_count", len(runIDs)).Debug("found runs") // verify we don't see more runs than there are in our archive (fewer is ok) if runCount > archive.RecordCount { @@ -190,19 +188,13 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie } // ok, delete our runs in batches, we do this in transactions as it spans a few different queries - for startIdx := 0; startIdx < len(runIDs); startIdx += deleteTransactionSize { + for _, idBatch := range chunkIDs(runIDs, deleteTransactionSize) { // no single batch should take more than a few minutes ctx, cancel := context.WithTimeout(ctx, time.Minute*15) defer cancel() start := time.Now() - endIdx := startIdx + deleteTransactionSize - if endIdx > len(runIDs) { - endIdx = len(runIDs) - } - batchIDs := runIDs[startIdx:endIdx] - // start our transaction tx, err := db.BeginTxx(ctx, nil) if err != nil { @@ -210,25 +202,25 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie } // first update our delete_reason - err = executeInQuery(ctx, tx, setRunDeleteReason, batchIDs) + err = executeInQuery(ctx, tx, setRunDeleteReason, idBatch) if err != nil { return fmt.Errorf("error updating delete reason: %s", err.Error()) } // any recent runs - err = executeInQuery(ctx, tx, deleteRecentRuns, batchIDs) + err = executeInQuery(ctx, tx, deleteRecentRuns, idBatch) if err != nil { return fmt.Errorf("error deleting recent runs: %s", err.Error()) } // unlink any parents - err = executeInQuery(ctx, tx, unlinkParents, batchIDs) + err = executeInQuery(ctx, tx, unlinkParents, idBatch) if err != nil { return fmt.Errorf("error unliking parent runs: %s", err.Error()) } // finally, delete our runs - err = executeInQuery(ctx, tx, deleteRuns, batchIDs) + err = executeInQuery(ctx, tx, deleteRuns, idBatch) if err != nil { return fmt.Errorf("error deleting runs: %s", err.Error()) } @@ -239,10 +231,7 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie return fmt.Errorf("error committing run delete transaction: %s", err.Error()) } - log.WithFields(logrus.Fields{ - "elapsed": time.Since(start), - "count": len(batchIDs), - }).Debug("deleted batch of runs") + log.WithField("elapsed", time.Since(start)).WithField("count", len(idBatch)).Debug("deleted batch of runs") cancel() } @@ -260,9 +249,7 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie archive.NeedsDeletion = false archive.DeletedOn = &deletedOn - logrus.WithFields(logrus.Fields{ - "elapsed": time.Since(start), - }).Info("completed deleting runs") + logrus.WithField("elapsed", time.Since(start)).Info("completed deleting runs") return nil } diff --git a/archives/utils.go b/archives/utils.go new file mode 100644 index 0000000..d50db23 --- /dev/null +++ b/archives/utils.go @@ -0,0 +1,15 @@ +package archives + +// chunks a slice of in64 IDs +func chunkIDs(ids []int64, size int) [][]int64 { + chunks := make([][]int64, 0, len(ids)/size+1) + + for i := 0; i < len(ids); i += size { + end := i + size + if end > len(ids) { + end = len(ids) + } + chunks = append(chunks, ids[i:end]) + } + return chunks +} From 80dc34708ca0a891a4d4ba2d828ce719410876ea Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Mon, 29 Nov 2021 17:40:27 -0500 Subject: [PATCH 2/5] Use more errors.Wrap --- archives/messages.go | 14 +++++++------- archives/runs.go | 12 ++++++------ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/archives/messages.go b/archives/messages.go index 3098213..7a7f1b2 100644 --- a/archives/messages.go +++ b/archives/messages.go @@ -213,37 +213,37 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 // first update our delete_reason err = executeInQuery(ctx, tx, setMessageDeleteReason, idBatch) if err != nil { - return fmt.Errorf("error updating delete reason: %s", err.Error()) + return errors.Wrap(err, "error updating delete reason") } // now delete any channel logs err = executeInQuery(ctx, tx, deleteMessageLogs, idBatch) if err != nil { - return fmt.Errorf("error removing channel logs: %s", err.Error()) + return errors.Wrap(err, "error removing channel logs") } // then any labels err = executeInQuery(ctx, tx, deleteMessageLabels, idBatch) if err != nil { - return fmt.Errorf("error removing message labels: %s", err.Error()) + return errors.Wrap(err, "error removing message labels") } // unlink any responses err = executeInQuery(ctx, tx, unlinkResponses, idBatch) if err != nil { - return fmt.Errorf("error unlinking responses: %s", err.Error()) + return errors.Wrap(err, "error unlinking responses") } // finally, delete our messages err = executeInQuery(ctx, tx, deleteMessages, idBatch) if err != nil { - return fmt.Errorf("error deleting messages: %s", err.Error()) + return errors.Wrap(err, "error deleting messages") } // commit our transaction err = tx.Commit() if err != nil { - return fmt.Errorf("error committing message delete transaction: %s", err.Error()) + return errors.Wrap(err, "error committing message delete transaction") } log.WithField("elapsed", time.Since(start)).WithField("count", len(idBatch)).Debug("deleted batch of messages") @@ -259,7 +259,7 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 // all went well! mark our archive as no longer needing deletion _, err = db.ExecContext(outer, setArchiveDeleted, archive.ID, deletedOn) if err != nil { - return fmt.Errorf("error setting archive as deleted: %s", err.Error()) + return errors.Wrap(err, "error setting archive as deleted") } archive.NeedsDeletion = false archive.DeletedOn = &deletedOn diff --git a/archives/runs.go b/archives/runs.go index bfb964f..763e119 100644 --- a/archives/runs.go +++ b/archives/runs.go @@ -204,31 +204,31 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie // first update our delete_reason err = executeInQuery(ctx, tx, setRunDeleteReason, idBatch) if err != nil { - return fmt.Errorf("error updating delete reason: %s", err.Error()) + return errors.Wrap(err, "error updating delete reason") } // any recent runs err = executeInQuery(ctx, tx, deleteRecentRuns, idBatch) if err != nil { - return fmt.Errorf("error deleting recent runs: %s", err.Error()) + return errors.Wrap(err, "error deleting recent runs") } // unlink any parents err = executeInQuery(ctx, tx, unlinkParents, idBatch) if err != nil { - return fmt.Errorf("error unliking parent runs: %s", err.Error()) + return errors.Wrap(err, "error unliking parent runs") } // finally, delete our runs err = executeInQuery(ctx, tx, deleteRuns, idBatch) if err != nil { - return fmt.Errorf("error deleting runs: %s", err.Error()) + return errors.Wrap(err, "error deleting runs") } // commit our transaction err = tx.Commit() if err != nil { - return fmt.Errorf("error committing run delete transaction: %s", err.Error()) + return errors.Wrap(err, "error committing run delete transaction") } log.WithField("elapsed", time.Since(start)).WithField("count", len(idBatch)).Debug("deleted batch of runs") @@ -244,7 +244,7 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie // all went well! mark our archive as no longer needing deletion _, err = db.ExecContext(outer, setArchiveDeleted, archive.ID, deletedOn) if err != nil { - return fmt.Errorf("error setting archive as deleted: %s", err.Error()) + return errors.Wrap(err, "error setting archive as deleted") } archive.NeedsDeletion = false archive.DeletedOn = &deletedOn From e83191bd93e4fcb1d3f30720fa3a28dc033f6f51 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Tue, 7 Dec 2021 10:01:46 -0500 Subject: [PATCH 3/5] Remove msgs_msg.response_to_id --- archives/messages.go | 12 ------------ testdb.sql | 21 ++++++++++----------- 2 files changed, 10 insertions(+), 23 deletions(-) diff --git a/archives/messages.go b/archives/messages.go index 7a7f1b2..b957015 100644 --- a/archives/messages.go +++ b/archives/messages.go @@ -123,12 +123,6 @@ DELETE FROM msgs_msg_labels WHERE msg_id IN(?) ` -const unlinkResponses = ` -UPDATE msgs_msg -SET response_to_id = NULL -WHERE response_to_id IN(?) -` - const deleteMessages = ` DELETE FROM msgs_msg WHERE id IN(?) @@ -228,12 +222,6 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 return errors.Wrap(err, "error removing message labels") } - // unlink any responses - err = executeInQuery(ctx, tx, unlinkResponses, idBatch) - if err != nil { - return errors.Wrap(err, "error unlinking responses") - } - // finally, delete our messages err = executeInQuery(ctx, tx, deleteMessages, idBatch) if err != nil { diff --git a/testdb.sql b/testdb.sql index c4bdb2e..dab01e5 100644 --- a/testdb.sql +++ b/testdb.sql @@ -90,8 +90,7 @@ CREATE TABLE msgs_msg ( org_id integer NOT NULL references orgs_org(id) on delete cascade, metadata text, topup_id integer, - delete_reason char(1) NULL, - response_to_id integer NULL references msgs_msg(id) + delete_reason char(1) NULL ); DROP TABLE IF EXISTS msgs_broadcast_recipients; @@ -269,15 +268,15 @@ INSERT INTO msgs_broadcast(id, text, created_on, purged, org_id, schedule_id) VA (3, 'base=>"not purged"'::hstore, '2017-08-12 19:11:59.890662+02:00', FALSE, 2, NULL), (4, 'base=>"new"'::hstore, '2019-08-12 19:11:59.890662+02:00', FALSE, 2, NULL); -INSERT INTO msgs_msg(id, broadcast_id, uuid, text, created_on, sent_on, modified_on, direction, status, visibility, msg_type, attachments, channel_id, contact_id, contact_urn_id, org_id, msg_count, error_count, next_attempt, response_to_id) VALUES -(1, NULL, '2f969340-704a-4aa2-a1bd-2f832a21d257', 'message 1', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL), -(2, NULL, 'abe87ac1-015c-4803-be29-1e89509fe682', 'message 2', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'D', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL), -(3, NULL, 'a7e83a22-a6ff-4e18-82d0-19545640ccba', 'message 3', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'O', 'H', 'V', 'I', '{"image/png:https://foo.bar/image1.png", "image/png:https://foo.bar/image2.png"}', NULL, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL), -(4, NULL, '1cad36af-5581-4c8a-81cd-83708398f61e', 'message 4', '2017-08-13 21:11:59.890662+00', '2017-08-13 21:11:59.890662+00', '2017-08-13 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-13 21:11:59.890662+00', NULL), -(5, NULL, 'f557972e-2eb5-42fa-9b87-902116d18787', 'message 5', '2017-08-11 21:11:59.890662+02:00', '2017-08-11 21:11:59.890662+02:00', '2017-08-11 21:11:59.890662+02:00', 'I', 'H', 'V', 'I', NULL, 3, 7, 8, 3, 1, 0, '2017-08-11 21:11:59.890662+02:00', NULL), -(6, 2, '579d148c-0ab1-4afb-832f-afb1fe0e19b7', 'message 6', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-10-08 21:11:59.890662+00', NULL), -(7, NULL, '7aeca469-2593-444e-afe4-4702317534c9', 'message 7', '2018-01-02 21:11:59.890662+00', '2018-01-02 21:11:59.890662+00', '2018-01-02 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2018-01-02 21:11:59.890662+00', 2), -(9, NULL, 'e14ab466-0d3b-436d-a0f7-5851fd7d9b7d', 'message 9', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'O', 'S', 'V', 'F', NULL, NULL, 6, NULL, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL); +INSERT INTO msgs_msg(id, broadcast_id, uuid, text, created_on, sent_on, modified_on, direction, status, visibility, msg_type, attachments, channel_id, contact_id, contact_urn_id, org_id, msg_count, error_count, next_attempt) VALUES +(1, NULL, '2f969340-704a-4aa2-a1bd-2f832a21d257', 'message 1', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00'), +(2, NULL, 'abe87ac1-015c-4803-be29-1e89509fe682', 'message 2', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'D', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00'), +(3, NULL, 'a7e83a22-a6ff-4e18-82d0-19545640ccba', 'message 3', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'O', 'H', 'V', 'I', '{"image/png:https://foo.bar/image1.png", "image/png:https://foo.bar/image2.png"}', NULL, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00'), +(4, NULL, '1cad36af-5581-4c8a-81cd-83708398f61e', 'message 4', '2017-08-13 21:11:59.890662+00', '2017-08-13 21:11:59.890662+00', '2017-08-13 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-13 21:11:59.890662+00'), +(5, NULL, 'f557972e-2eb5-42fa-9b87-902116d18787', 'message 5', '2017-08-11 21:11:59.890662+02:00', '2017-08-11 21:11:59.890662+02:00', '2017-08-11 21:11:59.890662+02:00', 'I', 'H', 'V', 'I', NULL, 3, 7, 8, 3, 1, 0, '2017-08-11 21:11:59.890662+02:00'), +(6, 2, '579d148c-0ab1-4afb-832f-afb1fe0e19b7', 'message 6', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-10-08 21:11:59.890662+00'), +(7, NULL, '7aeca469-2593-444e-afe4-4702317534c9', 'message 7', '2018-01-02 21:11:59.890662+00', '2018-01-02 21:11:59.890662+00', '2018-01-02 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2018-01-02 21:11:59.890662+00'), +(9, NULL, 'e14ab466-0d3b-436d-a0f7-5851fd7d9b7d', 'message 9', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'O', 'S', 'V', 'F', NULL, NULL, 6, NULL, 2, 1, 0, '2017-08-12 21:11:59.890662+00'); INSERT INTO msgs_label(id, uuid, name) VALUES (1, '1d9e3188-b74b-4ae0-a166-0de31aedb34a', 'Label 1'), From 8cb5fa9f13bc99a900523fecd65a628c5f28af25 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Thu, 9 Dec 2021 09:33:01 -0500 Subject: [PATCH 4/5] Update CHANGELOG.md for v7.1.0 --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d6f00d3..692ca94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +v7.1.0 +---------- + * Remove msgs_msg.response_to_id + v7.0.0 ---------- * Test on PG12 and 13 From c4e282fa0c9d7f4f69f424d9c44b0d65a8c5c2df Mon Sep 17 00:00:00 2001 From: Rafael Soares Date: Wed, 25 May 2022 14:45:51 -0300 Subject: [PATCH 5/5] update WENI-CHANGELOG.md for 1.2.0-archiver-7.1.0 --- WENI-CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/WENI-CHANGELOG.md b/WENI-CHANGELOG.md index ee622f7..f207020 100644 --- a/WENI-CHANGELOG.md +++ b/WENI-CHANGELOG.md @@ -1,3 +1,7 @@ +1.2.0-archiver-7.1.0 +---------- + * Merge tag v7.1.0 from nyaruka into 1.0.0-archiver-7.0.0 + 1.0.0-archiver-7.0.0 ---------- * Consider retention time on delete operation \ No newline at end of file