Skip to content

Commit

Permalink
Merge tag 'v7.1.0' of https://github.com/nyaruka/rp-archiver into upd…
Browse files Browse the repository at this point in the history
…ate/v7.1.0

 * Remove msgs_msg.response_to_id
  • Loading branch information
Sandro-Meireles committed May 11, 2022
2 parents ca5afce + 8cb5fa9 commit 533891c
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 77 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
v7.1.0
----------
* Remove msgs_msg.response_to_id

v7.0.0
----------
* Test on PG12 and 13
Expand Down
53 changes: 14 additions & 39 deletions archives/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,6 @@ DELETE FROM msgs_msg_labels
WHERE msg_id IN(?)
`

const unlinkResponses = `
UPDATE msgs_msg
SET response_to_id = NULL
WHERE response_to_id IN(?)
`

const deleteMessages = `
DELETE FROM msgs_msg
WHERE id IN(?)
Expand Down Expand Up @@ -189,75 +183,58 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3
}
rows.Close()

log.WithFields(logrus.Fields{
"msg_count": len(msgIDs),
}).Debug("found messages")
log.WithField("msg_count", len(msgIDs)).Debug("found messages")

// verify we don't see more messages than there are in our archive (fewer is ok)
if visibleCount > archive.RecordCount {
return fmt.Errorf("more messages in the database: %d than in archive: %d", visibleCount, archive.RecordCount)
}

// ok, delete our messages in batches, we do this in transactions as it spans a few different queries
for startIdx := 0; startIdx < len(msgIDs); startIdx += deleteTransactionSize {
for _, idBatch := range chunkIDs(msgIDs, deleteTransactionSize) {
// no single batch should take more than a few minutes
ctx, cancel := context.WithTimeout(ctx, time.Minute*15)
defer cancel()

start := time.Now()

endIdx := startIdx + deleteTransactionSize
if endIdx > len(msgIDs) {
endIdx = len(msgIDs)
}
batchIDs := msgIDs[startIdx:endIdx]

// start our transaction
tx, err := db.BeginTxx(ctx, nil)
if err != nil {
return err
}

// first update our delete_reason
err = executeInQuery(ctx, tx, setMessageDeleteReason, batchIDs)
err = executeInQuery(ctx, tx, setMessageDeleteReason, idBatch)
if err != nil {
return fmt.Errorf("error updating delete reason: %s", err.Error())
return errors.Wrap(err, "error updating delete reason")
}

// now delete any channel logs
err = executeInQuery(ctx, tx, deleteMessageLogs, batchIDs)
err = executeInQuery(ctx, tx, deleteMessageLogs, idBatch)
if err != nil {
return fmt.Errorf("error removing channel logs: %s", err.Error())
return errors.Wrap(err, "error removing channel logs")
}

// then any labels
err = executeInQuery(ctx, tx, deleteMessageLabels, batchIDs)
err = executeInQuery(ctx, tx, deleteMessageLabels, idBatch)
if err != nil {
return fmt.Errorf("error removing message labels: %s", err.Error())
}

// unlink any responses
err = executeInQuery(ctx, tx, unlinkResponses, batchIDs)
if err != nil {
return fmt.Errorf("error unlinking responses: %s", err.Error())
return errors.Wrap(err, "error removing message labels")
}

// finally, delete our messages
err = executeInQuery(ctx, tx, deleteMessages, batchIDs)
err = executeInQuery(ctx, tx, deleteMessages, idBatch)
if err != nil {
return fmt.Errorf("error deleting messages: %s", err.Error())
return errors.Wrap(err, "error deleting messages")
}

// commit our transaction
err = tx.Commit()
if err != nil {
return fmt.Errorf("error committing message delete transaction: %s", err.Error())
return errors.Wrap(err, "error committing message delete transaction")
}

log.WithFields(logrus.Fields{
"elapsed": time.Since(start),
"count": len(batchIDs),
}).Debug("deleted batch of messages")
log.WithField("elapsed", time.Since(start)).WithField("count", len(idBatch)).Debug("deleted batch of messages")

cancel()
}
Expand All @@ -270,14 +247,12 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3
// all went well! mark our archive as no longer needing deletion
_, err = db.ExecContext(outer, setArchiveDeleted, archive.ID, deletedOn)
if err != nil {
return fmt.Errorf("error setting archive as deleted: %s", err.Error())
return errors.Wrap(err, "error setting archive as deleted")
}
archive.NeedsDeletion = false
archive.DeletedOn = &deletedOn

logrus.WithFields(logrus.Fields{
"elapsed": time.Since(start),
}).Info("completed deleting messages")
logrus.WithField("elapsed", time.Since(start)).Info("completed deleting messages")

return nil
}
Expand Down
41 changes: 14 additions & 27 deletions archives/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,69 +180,58 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie
}
rows.Close()

log.WithFields(logrus.Fields{
"run_count": len(runIDs),
}).Debug("found runs")
log.WithField("run_count", len(runIDs)).Debug("found runs")

// verify we don't see more runs than there are in our archive (fewer is ok)
if runCount > archive.RecordCount {
return fmt.Errorf("more runs in the database: %d than in archive: %d", runCount, archive.RecordCount)
}

// ok, delete our runs in batches, we do this in transactions as it spans a few different queries
for startIdx := 0; startIdx < len(runIDs); startIdx += deleteTransactionSize {
for _, idBatch := range chunkIDs(runIDs, deleteTransactionSize) {
// no single batch should take more than a few minutes
ctx, cancel := context.WithTimeout(ctx, time.Minute*15)
defer cancel()

start := time.Now()

endIdx := startIdx + deleteTransactionSize
if endIdx > len(runIDs) {
endIdx = len(runIDs)
}
batchIDs := runIDs[startIdx:endIdx]

// start our transaction
tx, err := db.BeginTxx(ctx, nil)
if err != nil {
return err
}

// first update our delete_reason
err = executeInQuery(ctx, tx, setRunDeleteReason, batchIDs)
err = executeInQuery(ctx, tx, setRunDeleteReason, idBatch)
if err != nil {
return fmt.Errorf("error updating delete reason: %s", err.Error())
return errors.Wrap(err, "error updating delete reason")
}

// any recent runs
err = executeInQuery(ctx, tx, deleteRecentRuns, batchIDs)
err = executeInQuery(ctx, tx, deleteRecentRuns, idBatch)
if err != nil {
return fmt.Errorf("error deleting recent runs: %s", err.Error())
return errors.Wrap(err, "error deleting recent runs")
}

// unlink any parents
err = executeInQuery(ctx, tx, unlinkParents, batchIDs)
err = executeInQuery(ctx, tx, unlinkParents, idBatch)
if err != nil {
return fmt.Errorf("error unliking parent runs: %s", err.Error())
return errors.Wrap(err, "error unliking parent runs")
}

// finally, delete our runs
err = executeInQuery(ctx, tx, deleteRuns, batchIDs)
err = executeInQuery(ctx, tx, deleteRuns, idBatch)
if err != nil {
return fmt.Errorf("error deleting runs: %s", err.Error())
return errors.Wrap(err, "error deleting runs")
}

// commit our transaction
err = tx.Commit()
if err != nil {
return fmt.Errorf("error committing run delete transaction: %s", err.Error())
return errors.Wrap(err, "error committing run delete transaction")
}

log.WithFields(logrus.Fields{
"elapsed": time.Since(start),
"count": len(batchIDs),
}).Debug("deleted batch of runs")
log.WithField("elapsed", time.Since(start)).WithField("count", len(idBatch)).Debug("deleted batch of runs")

cancel()
}
Expand All @@ -255,14 +244,12 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie
// all went well! mark our archive as no longer needing deletion
_, err = db.ExecContext(outer, setArchiveDeleted, archive.ID, deletedOn)
if err != nil {
return fmt.Errorf("error setting archive as deleted: %s", err.Error())
return errors.Wrap(err, "error setting archive as deleted")
}
archive.NeedsDeletion = false
archive.DeletedOn = &deletedOn

logrus.WithFields(logrus.Fields{
"elapsed": time.Since(start),
}).Info("completed deleting runs")
logrus.WithField("elapsed", time.Since(start)).Info("completed deleting runs")

return nil
}
15 changes: 15 additions & 0 deletions archives/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package archives

// chunks a slice of in64 IDs
func chunkIDs(ids []int64, size int) [][]int64 {
chunks := make([][]int64, 0, len(ids)/size+1)

for i := 0; i < len(ids); i += size {
end := i + size
if end > len(ids) {
end = len(ids)
}
chunks = append(chunks, ids[i:end])
}
return chunks
}
21 changes: 10 additions & 11 deletions testdb.sql
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ CREATE TABLE msgs_msg (
org_id integer NOT NULL references orgs_org(id) on delete cascade,
metadata text,
topup_id integer,
delete_reason char(1) NULL,
response_to_id integer NULL references msgs_msg(id)
delete_reason char(1) NULL
);

DROP TABLE IF EXISTS msgs_broadcast_recipients;
Expand Down Expand Up @@ -269,15 +268,15 @@ INSERT INTO msgs_broadcast(id, text, created_on, purged, org_id, schedule_id) VA
(3, 'base=>"not purged"'::hstore, '2017-08-12 19:11:59.890662+02:00', FALSE, 2, NULL),
(4, 'base=>"new"'::hstore, '2019-08-12 19:11:59.890662+02:00', FALSE, 2, NULL);

INSERT INTO msgs_msg(id, broadcast_id, uuid, text, created_on, sent_on, modified_on, direction, status, visibility, msg_type, attachments, channel_id, contact_id, contact_urn_id, org_id, msg_count, error_count, next_attempt, response_to_id) VALUES
(1, NULL, '2f969340-704a-4aa2-a1bd-2f832a21d257', 'message 1', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL),
(2, NULL, 'abe87ac1-015c-4803-be29-1e89509fe682', 'message 2', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'D', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL),
(3, NULL, 'a7e83a22-a6ff-4e18-82d0-19545640ccba', 'message 3', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'O', 'H', 'V', 'I', '{"image/png:https://foo.bar/image1.png", "image/png:https://foo.bar/image2.png"}', NULL, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL),
(4, NULL, '1cad36af-5581-4c8a-81cd-83708398f61e', 'message 4', '2017-08-13 21:11:59.890662+00', '2017-08-13 21:11:59.890662+00', '2017-08-13 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-13 21:11:59.890662+00', NULL),
(5, NULL, 'f557972e-2eb5-42fa-9b87-902116d18787', 'message 5', '2017-08-11 21:11:59.890662+02:00', '2017-08-11 21:11:59.890662+02:00', '2017-08-11 21:11:59.890662+02:00', 'I', 'H', 'V', 'I', NULL, 3, 7, 8, 3, 1, 0, '2017-08-11 21:11:59.890662+02:00', NULL),
(6, 2, '579d148c-0ab1-4afb-832f-afb1fe0e19b7', 'message 6', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-10-08 21:11:59.890662+00', NULL),
(7, NULL, '7aeca469-2593-444e-afe4-4702317534c9', 'message 7', '2018-01-02 21:11:59.890662+00', '2018-01-02 21:11:59.890662+00', '2018-01-02 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2018-01-02 21:11:59.890662+00', 2),
(9, NULL, 'e14ab466-0d3b-436d-a0f7-5851fd7d9b7d', 'message 9', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'O', 'S', 'V', 'F', NULL, NULL, 6, NULL, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL);
INSERT INTO msgs_msg(id, broadcast_id, uuid, text, created_on, sent_on, modified_on, direction, status, visibility, msg_type, attachments, channel_id, contact_id, contact_urn_id, org_id, msg_count, error_count, next_attempt) VALUES
(1, NULL, '2f969340-704a-4aa2-a1bd-2f832a21d257', 'message 1', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00'),
(2, NULL, 'abe87ac1-015c-4803-be29-1e89509fe682', 'message 2', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'D', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00'),
(3, NULL, 'a7e83a22-a6ff-4e18-82d0-19545640ccba', 'message 3', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'O', 'H', 'V', 'I', '{"image/png:https://foo.bar/image1.png", "image/png:https://foo.bar/image2.png"}', NULL, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00'),
(4, NULL, '1cad36af-5581-4c8a-81cd-83708398f61e', 'message 4', '2017-08-13 21:11:59.890662+00', '2017-08-13 21:11:59.890662+00', '2017-08-13 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-13 21:11:59.890662+00'),
(5, NULL, 'f557972e-2eb5-42fa-9b87-902116d18787', 'message 5', '2017-08-11 21:11:59.890662+02:00', '2017-08-11 21:11:59.890662+02:00', '2017-08-11 21:11:59.890662+02:00', 'I', 'H', 'V', 'I', NULL, 3, 7, 8, 3, 1, 0, '2017-08-11 21:11:59.890662+02:00'),
(6, 2, '579d148c-0ab1-4afb-832f-afb1fe0e19b7', 'message 6', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-10-08 21:11:59.890662+00'),
(7, NULL, '7aeca469-2593-444e-afe4-4702317534c9', 'message 7', '2018-01-02 21:11:59.890662+00', '2018-01-02 21:11:59.890662+00', '2018-01-02 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2018-01-02 21:11:59.890662+00'),
(9, NULL, 'e14ab466-0d3b-436d-a0f7-5851fd7d9b7d', 'message 9', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'O', 'S', 'V', 'F', NULL, NULL, 6, NULL, 2, 1, 0, '2017-08-12 21:11:59.890662+00');

INSERT INTO msgs_label(id, uuid, name) VALUES
(1, '1d9e3188-b74b-4ae0-a166-0de31aedb34a', 'Label 1'),
Expand Down

0 comments on commit 533891c

Please sign in to comment.