Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update/v7.1.0 #10

Merged
merged 8 commits into from
May 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
v7.1.0
----------
* Remove msgs_msg.response_to_id

v7.0.0
----------
* Test on PG12 and 13
Expand Down
4 changes: 4 additions & 0 deletions WENI-CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
1.2.0-archiver-7.1.0
----------
* Merge tag v7.1.0 from nyaruka into 1.0.0-archiver-7.0.0

1.0.0-archiver-7.0.0
----------
* Consider retention time on delete operation
53 changes: 14 additions & 39 deletions archives/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,6 @@ DELETE FROM msgs_msg_labels
WHERE msg_id IN(?)
`

const unlinkResponses = `
UPDATE msgs_msg
SET response_to_id = NULL
WHERE response_to_id IN(?)
`

const deleteMessages = `
DELETE FROM msgs_msg
WHERE id IN(?)
Expand Down Expand Up @@ -189,75 +183,58 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3
}
rows.Close()

log.WithFields(logrus.Fields{
"msg_count": len(msgIDs),
}).Debug("found messages")
log.WithField("msg_count", len(msgIDs)).Debug("found messages")

// verify we don't see more messages than there are in our archive (fewer is ok)
if visibleCount > archive.RecordCount {
return fmt.Errorf("more messages in the database: %d than in archive: %d", visibleCount, archive.RecordCount)
}

// ok, delete our messages in batches, we do this in transactions as it spans a few different queries
for startIdx := 0; startIdx < len(msgIDs); startIdx += deleteTransactionSize {
for _, idBatch := range chunkIDs(msgIDs, deleteTransactionSize) {
// no single batch should take more than a few minutes
ctx, cancel := context.WithTimeout(ctx, time.Minute*15)
defer cancel()

start := time.Now()

endIdx := startIdx + deleteTransactionSize
if endIdx > len(msgIDs) {
endIdx = len(msgIDs)
}
batchIDs := msgIDs[startIdx:endIdx]

// start our transaction
tx, err := db.BeginTxx(ctx, nil)
if err != nil {
return err
}

// first update our delete_reason
err = executeInQuery(ctx, tx, setMessageDeleteReason, batchIDs)
err = executeInQuery(ctx, tx, setMessageDeleteReason, idBatch)
if err != nil {
return fmt.Errorf("error updating delete reason: %s", err.Error())
return errors.Wrap(err, "error updating delete reason")
}

// now delete any channel logs
err = executeInQuery(ctx, tx, deleteMessageLogs, batchIDs)
err = executeInQuery(ctx, tx, deleteMessageLogs, idBatch)
if err != nil {
return fmt.Errorf("error removing channel logs: %s", err.Error())
return errors.Wrap(err, "error removing channel logs")
}

// then any labels
err = executeInQuery(ctx, tx, deleteMessageLabels, batchIDs)
err = executeInQuery(ctx, tx, deleteMessageLabels, idBatch)
if err != nil {
return fmt.Errorf("error removing message labels: %s", err.Error())
}

// unlink any responses
err = executeInQuery(ctx, tx, unlinkResponses, batchIDs)
if err != nil {
return fmt.Errorf("error unlinking responses: %s", err.Error())
return errors.Wrap(err, "error removing message labels")
}

// finally, delete our messages
err = executeInQuery(ctx, tx, deleteMessages, batchIDs)
err = executeInQuery(ctx, tx, deleteMessages, idBatch)
if err != nil {
return fmt.Errorf("error deleting messages: %s", err.Error())
return errors.Wrap(err, "error deleting messages")
}

// commit our transaction
err = tx.Commit()
if err != nil {
return fmt.Errorf("error committing message delete transaction: %s", err.Error())
return errors.Wrap(err, "error committing message delete transaction")
}

log.WithFields(logrus.Fields{
"elapsed": time.Since(start),
"count": len(batchIDs),
}).Debug("deleted batch of messages")
log.WithField("elapsed", time.Since(start)).WithField("count", len(idBatch)).Debug("deleted batch of messages")

cancel()
}
Expand All @@ -270,14 +247,12 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3
// all went well! mark our archive as no longer needing deletion
_, err = db.ExecContext(outer, setArchiveDeleted, archive.ID, deletedOn)
if err != nil {
return fmt.Errorf("error setting archive as deleted: %s", err.Error())
return errors.Wrap(err, "error setting archive as deleted")
}
archive.NeedsDeletion = false
archive.DeletedOn = &deletedOn

logrus.WithFields(logrus.Fields{
"elapsed": time.Since(start),
}).Info("completed deleting messages")
logrus.WithField("elapsed", time.Since(start)).Info("completed deleting messages")

return nil
}
Expand Down
41 changes: 14 additions & 27 deletions archives/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,69 +180,58 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie
}
rows.Close()

log.WithFields(logrus.Fields{
"run_count": len(runIDs),
}).Debug("found runs")
log.WithField("run_count", len(runIDs)).Debug("found runs")

// verify we don't see more runs than there are in our archive (fewer is ok)
if runCount > archive.RecordCount {
return fmt.Errorf("more runs in the database: %d than in archive: %d", runCount, archive.RecordCount)
}

// ok, delete our runs in batches, we do this in transactions as it spans a few different queries
for startIdx := 0; startIdx < len(runIDs); startIdx += deleteTransactionSize {
for _, idBatch := range chunkIDs(runIDs, deleteTransactionSize) {
// no single batch should take more than a few minutes
ctx, cancel := context.WithTimeout(ctx, time.Minute*15)
defer cancel()

start := time.Now()

endIdx := startIdx + deleteTransactionSize
if endIdx > len(runIDs) {
endIdx = len(runIDs)
}
batchIDs := runIDs[startIdx:endIdx]

// start our transaction
tx, err := db.BeginTxx(ctx, nil)
if err != nil {
return err
}

// first update our delete_reason
err = executeInQuery(ctx, tx, setRunDeleteReason, batchIDs)
err = executeInQuery(ctx, tx, setRunDeleteReason, idBatch)
if err != nil {
return fmt.Errorf("error updating delete reason: %s", err.Error())
return errors.Wrap(err, "error updating delete reason")
}

// any recent runs
err = executeInQuery(ctx, tx, deleteRecentRuns, batchIDs)
err = executeInQuery(ctx, tx, deleteRecentRuns, idBatch)
if err != nil {
return fmt.Errorf("error deleting recent runs: %s", err.Error())
return errors.Wrap(err, "error deleting recent runs")
}

// unlink any parents
err = executeInQuery(ctx, tx, unlinkParents, batchIDs)
err = executeInQuery(ctx, tx, unlinkParents, idBatch)
if err != nil {
return fmt.Errorf("error unliking parent runs: %s", err.Error())
return errors.Wrap(err, "error unliking parent runs")
}

// finally, delete our runs
err = executeInQuery(ctx, tx, deleteRuns, batchIDs)
err = executeInQuery(ctx, tx, deleteRuns, idBatch)
if err != nil {
return fmt.Errorf("error deleting runs: %s", err.Error())
return errors.Wrap(err, "error deleting runs")
}

// commit our transaction
err = tx.Commit()
if err != nil {
return fmt.Errorf("error committing run delete transaction: %s", err.Error())
return errors.Wrap(err, "error committing run delete transaction")
}

log.WithFields(logrus.Fields{
"elapsed": time.Since(start),
"count": len(batchIDs),
}).Debug("deleted batch of runs")
log.WithField("elapsed", time.Since(start)).WithField("count", len(idBatch)).Debug("deleted batch of runs")

cancel()
}
Expand All @@ -255,14 +244,12 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie
// all went well! mark our archive as no longer needing deletion
_, err = db.ExecContext(outer, setArchiveDeleted, archive.ID, deletedOn)
if err != nil {
return fmt.Errorf("error setting archive as deleted: %s", err.Error())
return errors.Wrap(err, "error setting archive as deleted")
}
archive.NeedsDeletion = false
archive.DeletedOn = &deletedOn

logrus.WithFields(logrus.Fields{
"elapsed": time.Since(start),
}).Info("completed deleting runs")
logrus.WithField("elapsed", time.Since(start)).Info("completed deleting runs")

return nil
}
15 changes: 15 additions & 0 deletions archives/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package archives

// chunks a slice of in64 IDs
func chunkIDs(ids []int64, size int) [][]int64 {
chunks := make([][]int64, 0, len(ids)/size+1)

for i := 0; i < len(ids); i += size {
end := i + size
if end > len(ids) {
end = len(ids)
}
chunks = append(chunks, ids[i:end])
}
return chunks
}
21 changes: 10 additions & 11 deletions testdb.sql
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ CREATE TABLE msgs_msg (
org_id integer NOT NULL references orgs_org(id) on delete cascade,
metadata text,
topup_id integer,
delete_reason char(1) NULL,
response_to_id integer NULL references msgs_msg(id)
delete_reason char(1) NULL
);

DROP TABLE IF EXISTS msgs_broadcast_recipients;
Expand Down Expand Up @@ -269,15 +268,15 @@ INSERT INTO msgs_broadcast(id, text, created_on, purged, org_id, schedule_id) VA
(3, 'base=>"not purged"'::hstore, '2017-08-12 19:11:59.890662+02:00', FALSE, 2, NULL),
(4, 'base=>"new"'::hstore, '2019-08-12 19:11:59.890662+02:00', FALSE, 2, NULL);

INSERT INTO msgs_msg(id, broadcast_id, uuid, text, created_on, sent_on, modified_on, direction, status, visibility, msg_type, attachments, channel_id, contact_id, contact_urn_id, org_id, msg_count, error_count, next_attempt, response_to_id) VALUES
(1, NULL, '2f969340-704a-4aa2-a1bd-2f832a21d257', 'message 1', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL),
(2, NULL, 'abe87ac1-015c-4803-be29-1e89509fe682', 'message 2', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'D', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL),
(3, NULL, 'a7e83a22-a6ff-4e18-82d0-19545640ccba', 'message 3', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'O', 'H', 'V', 'I', '{"image/png:https://foo.bar/image1.png", "image/png:https://foo.bar/image2.png"}', NULL, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL),
(4, NULL, '1cad36af-5581-4c8a-81cd-83708398f61e', 'message 4', '2017-08-13 21:11:59.890662+00', '2017-08-13 21:11:59.890662+00', '2017-08-13 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-13 21:11:59.890662+00', NULL),
(5, NULL, 'f557972e-2eb5-42fa-9b87-902116d18787', 'message 5', '2017-08-11 21:11:59.890662+02:00', '2017-08-11 21:11:59.890662+02:00', '2017-08-11 21:11:59.890662+02:00', 'I', 'H', 'V', 'I', NULL, 3, 7, 8, 3, 1, 0, '2017-08-11 21:11:59.890662+02:00', NULL),
(6, 2, '579d148c-0ab1-4afb-832f-afb1fe0e19b7', 'message 6', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-10-08 21:11:59.890662+00', NULL),
(7, NULL, '7aeca469-2593-444e-afe4-4702317534c9', 'message 7', '2018-01-02 21:11:59.890662+00', '2018-01-02 21:11:59.890662+00', '2018-01-02 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2018-01-02 21:11:59.890662+00', 2),
(9, NULL, 'e14ab466-0d3b-436d-a0f7-5851fd7d9b7d', 'message 9', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'O', 'S', 'V', 'F', NULL, NULL, 6, NULL, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL);
INSERT INTO msgs_msg(id, broadcast_id, uuid, text, created_on, sent_on, modified_on, direction, status, visibility, msg_type, attachments, channel_id, contact_id, contact_urn_id, org_id, msg_count, error_count, next_attempt) VALUES
(1, NULL, '2f969340-704a-4aa2-a1bd-2f832a21d257', 'message 1', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00'),
(2, NULL, 'abe87ac1-015c-4803-be29-1e89509fe682', 'message 2', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'D', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00'),
(3, NULL, 'a7e83a22-a6ff-4e18-82d0-19545640ccba', 'message 3', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'O', 'H', 'V', 'I', '{"image/png:https://foo.bar/image1.png", "image/png:https://foo.bar/image2.png"}', NULL, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00'),
(4, NULL, '1cad36af-5581-4c8a-81cd-83708398f61e', 'message 4', '2017-08-13 21:11:59.890662+00', '2017-08-13 21:11:59.890662+00', '2017-08-13 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-13 21:11:59.890662+00'),
(5, NULL, 'f557972e-2eb5-42fa-9b87-902116d18787', 'message 5', '2017-08-11 21:11:59.890662+02:00', '2017-08-11 21:11:59.890662+02:00', '2017-08-11 21:11:59.890662+02:00', 'I', 'H', 'V', 'I', NULL, 3, 7, 8, 3, 1, 0, '2017-08-11 21:11:59.890662+02:00'),
(6, 2, '579d148c-0ab1-4afb-832f-afb1fe0e19b7', 'message 6', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-10-08 21:11:59.890662+00'),
(7, NULL, '7aeca469-2593-444e-afe4-4702317534c9', 'message 7', '2018-01-02 21:11:59.890662+00', '2018-01-02 21:11:59.890662+00', '2018-01-02 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2018-01-02 21:11:59.890662+00'),
(9, NULL, 'e14ab466-0d3b-436d-a0f7-5851fd7d9b7d', 'message 9', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'O', 'S', 'V', 'F', NULL, NULL, 6, NULL, 2, 1, 0, '2017-08-12 21:11:59.890662+00');

INSERT INTO msgs_label(id, uuid, name) VALUES
(1, '1d9e3188-b74b-4ae0-a166-0de31aedb34a', 'Label 1'),
Expand Down