Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

delete broadcasts which no longer have any active messages #32

Merged
merged 2 commits into from
Apr 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -1280,6 +1281,121 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3
return nil
}

// DeleteBroadcasts deletes all broadcasts older than 90 days for the passed in org which have no active messages on them
func DeleteBroadcasts(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, org Org) error {
start := time.Now()
threshhold := now.AddDate(0, 0, -org.ActiveDays)

rows, err := db.QueryxContext(ctx, selectOldOrgBroadcasts, org.ID, threshhold)
if err != nil {
return err
}
defer rows.Close()

count := 0
for rows.Next() {
if count == 0 {
logrus.WithField("org_id", org.ID).Info("deleting broadcasts")
}

// been deleting this org more than an hour? thats enough for today, exit out
if time.Since(start) > time.Hour {
break
}

var broadcastID int64
err := rows.Scan(&broadcastID)
if err != nil {
return errors.Wrap(err, "unable to get broadcast id")
}

// make sure we have no active messages
var msgCount int64
err = db.Get(&msgCount, `SELECT count(*) FROM msgs_msg WHERE broadcast_id = $1`, broadcastID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe not so important but if you just want to know if any rows exist, don't need to full count

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, felt the error message was more interesting with count. In theory we should never really get to this case anyways.

if err != nil {
return errors.Wrapf(err, "unable to select number of msgs for broadcast: %d", broadcastID)
}

if msgCount != 0 {
logrus.WithField("broadcast_id", broadcastID).WithField("org_id", org.ID).WithField("msg_count", msgCount).Warn("unable to delete broadcast, has messages still")
continue
}

// we delete broadcasts in a transaction per broadcast
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return errors.Wrapf(err, "error starting transaction while deleting broadcast: %d", broadcastID)
}

// delete contacts M2M
_, err = tx.Exec(`DELETE from msgs_broadcast_contacts WHERE broadcast_id = $1`, broadcastID)
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error deleting related contacts for broadcast: %d", broadcastID)
}

// delete groups M2M
_, err = tx.Exec(`DELETE from msgs_broadcast_groups WHERE broadcast_id = $1`, broadcastID)
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error deleting related groups for broadcast: %d", broadcastID)
}

// delete URNs M2M
_, err = tx.Exec(`DELETE from msgs_broadcast_urns WHERE broadcast_id = $1`, broadcastID)
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error deleting related urns for broadcast: %d", broadcastID)
}

// delete counts associated with this broadcast
_, err = tx.Exec(`DELETE from msgs_broadcastmsgcount WHERE broadcast_id = $1`, broadcastID)
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error deleting counts for broadcast: %d", broadcastID)
}

// finally, delete our broadcast
_, err = tx.Exec(`DELETE from msgs_broadcast WHERE id = $1`, broadcastID)
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error deleting broadcast: %d", broadcastID)
}

err = tx.Commit()
if err != nil {
return errors.Wrapf(err, "error deleting broadcast: %d", broadcastID)
}

count++
}

if count > 0 {
logrus.WithFields(logrus.Fields{
"elapsed": time.Since(start),
"count": count,
"org_id": org.ID,
}).Info("completed deleting broadcasts")
}

return nil
}

const selectOldOrgBroadcasts = `
SELECT
id
FROM
msgs_broadcast
WHERE
org_id = $1 AND
created_on < $2 AND
schedule_id IS NULL
ORDER BY
created_on ASC,
id ASC
LIMIT 1000000;
`

const selectOrgRunsInRange = `
SELECT fr.id, fr.is_active, cc.is_test
FROM flows_flowrun fr
Expand Down Expand Up @@ -1527,6 +1643,10 @@ func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config
switch a.ArchiveType {
case MessageType:
err = DeleteArchivedMessages(ctx, config, db, s3Client, a)
if err == nil {
err = DeleteBroadcasts(ctx, now, config, db, org)
}

case RunType:
err = DeleteArchivedRuns(ctx, config, db, s3Client, a)
default:
Expand Down
12 changes: 12 additions & 0 deletions archiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ func TestArchiveOrgMessages(t *testing.T) {
s3Client, err := NewS3Client(config)
assert.NoError(t, err)

assertCount(t, db, 4, `SELECT count(*) from msgs_broadcast WHERE org_id = $1`, 2)

created, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[1], MessageType)
assert.NoError(t, err)

Expand Down Expand Up @@ -397,6 +399,9 @@ func TestArchiveOrgMessages(t *testing.T) {
)
assert.NoError(t, err)
assert.Equal(t, 1, count)

// one broadcast still exists because it has a schedule, the other because it still has msgs, the last because it is new
assertCount(t, db, 3, `SELECT count(*) from msgs_broadcast WHERE org_id = $1`, 2)
}
}

Expand All @@ -406,6 +411,13 @@ FROM flows_flowrun
WHERE org_id = $1 and modified_on >= $2 and modified_on < $3
`

func assertCount(t *testing.T, db *sqlx.DB, expected int, query string, args ...interface{}) {
var count int
err := db.Get(&count, query, args...)
assert.NoError(t, err, "error executing query: %s", query)
assert.Equal(t, expected, count, "counts mismatch for query %s", query)
}

func TestArchiveOrgRuns(t *testing.T) {
db := setup(t)
ctx := context.Background()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/naoina/go-stringutil v0.1.0
github.com/naoina/toml v0.1.1
github.com/nyaruka/ezconf v0.2.1
github.com/pkg/errors v0.8.0
github.com/pkg/errors v0.8.1
github.com/pmezard/go-difflib v1.0.0
github.com/sirupsen/logrus v1.0.5
github.com/stretchr/testify v1.2.1
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ github.com/getsentry/raven-go v0.0.0-20180430182053-263040ce1a36 h1:i93kN7TI/4T5
github.com/getsentry/raven-go v0.0.0-20180430182053-263040ce1a36/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
github.com/go-ini/ini v1.36.0 h1:63En8accP8FKkFZ77ztSfvQf9kGRJN3qBIdItP46RRk=
github.com/go-ini/ini v1.36.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/go-sql-driver/mysql v1.4.0 h1:7LxgVwFb2hIQtMm87NdgAVfXjnt4OePseqT1tKx+opk=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8 h1:12VvqtR6Aowv3l/EQUlocDHW2Cp4G9WJVH7uyH8QFJE=
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
Expand All @@ -23,6 +24,7 @@ github.com/lib/pq v0.0.0-20180327071824-d34b9ff171c2 h1:hRGSmZu7j271trc9sneMrpOW
github.com/lib/pq v0.0.0-20180327071824-d34b9ff171c2/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.0.0 h1:X5PMW56eZitiTeO7tKzZxFCSpbFZJtkMMooicw2us9A=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/mattn/go-sqlite3 v1.9.0 h1:pDRiWfl+++eC2FEFRy6jXmQlvp4Yh3z1MJKg4UeYM/4=
github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/naoina/go-stringutil v0.1.0 h1:rCUeRUHjBjGTSHl0VC00jUPLz8/F9dDzYI70Hzifhks=
github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0=
Expand All @@ -32,6 +34,8 @@ github.com/nyaruka/ezconf v0.2.1 h1:TDXWoqjqYya1uhou1mAJZg7rgFYL98EB0Tb3+BWtUh0=
github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDYYxgnQw=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.0.5 h1:8c8b5uO0zS4X6RPl/sd1ENwSkIc0/H2PaHxE3udaE8I=
Expand Down
42 changes: 36 additions & 6 deletions testdb.sql
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,38 @@ CREATE TABLE msgs_broadcast (
"text" hstore NOT NULL,
purged BOOLEAN NOT NULL,
created_on timestamp with time zone NOT NULL,
schedule_id int NULL,
org_id integer NOT NULL references orgs_org(id) on delete cascade
);

DROP TABLE IF EXISTS msgs_broadcast_contacts;
CREATE TABLE msgs_broadcast_contacts (
id serial primary key,
broadcast_id integer NOT NULL,
contact_id integer NOT NULL
);

DROP TABLE IF EXISTS msgs_broadcast_groups;
CREATE TABLE msgs_broadcast_groups (
id serial primary key,
broadcast_id integer NOT NULL,
group_id integer NOT NULL
);

DROP TABLE IF EXISTS msgs_broadcast_urns;
CREATE TABLE msgs_broadcast_urns (
id serial primary key,
broadcast_id integer NOT NULL,
contacturn_id integer NOT NULL
);

DROP TABLE IF EXISTS msgs_broadcastmsgcount;
CREATE TABLE msgs_broadcastmsgcount (
id serial primary key,
"count" integer NOT NULL,
broadcast_id integer NOT NULL
);

DROP TABLE IF EXISTS msgs_label CASCADE;
CREATE TABLE msgs_label (
id serial primary key,
Expand Down Expand Up @@ -266,13 +295,19 @@ INSERT INTO contacts_contactgroup_contacts(id, contact_id, contactgroup_id) VALU
(3, 1, 4),
(4, 3, 4);

INSERT INTO msgs_broadcast(id, text, created_on, purged, org_id, schedule_id) VALUES
(1, 'eng=>"hello",fre=>"bonjour"'::hstore, '2017-08-12 22:11:59.890662+02:00', TRUE, 2, 1),
(2, 'base=>"hola"'::hstore, '2017-08-12 22:11:59.890662+02:00', TRUE, 2, NULL),
(3, 'base=>"not purged"'::hstore, '2017-08-12 19:11:59.890662+02:00', FALSE, 2, NULL),
(4, 'base=>"new"'::hstore, '2019-08-12 19:11:59.890662+02:00', FALSE, 2, NULL);

INSERT INTO msgs_msg(id, broadcast_id, uuid, text, created_on, sent_on, modified_on, direction, status, visibility, msg_type, attachments, channel_id, contact_id, contact_urn_id, org_id, msg_count, error_count, next_attempt, response_to_id) VALUES
(1, NULL, '2f969340-704a-4aa2-a1bd-2f832a21d257', 'message 1', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL),
(2, NULL, 'abe87ac1-015c-4803-be29-1e89509fe682', 'message 2', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'D', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL),
(3, NULL, 'a7e83a22-a6ff-4e18-82d0-19545640ccba', 'message 3', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'O', 'H', 'V', 'I', '{"image/png:https://foo.bar/image1.png", "image/png:https://foo.bar/image2.png"}', NULL, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL),
(4, NULL, '1cad36af-5581-4c8a-81cd-83708398f61e', 'message 4', '2017-08-13 21:11:59.890662+00', '2017-08-13 21:11:59.890662+00', '2017-08-13 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-13 21:11:59.890662+00', NULL),
(5, NULL, 'f557972e-2eb5-42fa-9b87-902116d18787', 'message 5', '2017-08-11 21:11:59.890662+02:00', '2017-08-11 21:11:59.890662+02:00', '2017-08-11 21:11:59.890662+02:00', 'I', 'H', 'V', 'I', NULL, 3, 7, 8, 3, 1, 0, '2017-08-11 21:11:59.890662+02:00', NULL),
(6, NULL, '579d148c-0ab1-4afb-832f-afb1fe0e19b7', 'message 6', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-10-08 21:11:59.890662+00', NULL),
(6, 2, '579d148c-0ab1-4afb-832f-afb1fe0e19b7', 'message 6', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-10-08 21:11:59.890662+00', NULL),
(7, NULL, '7aeca469-2593-444e-afe4-4702317534c9', 'message 7', '2018-01-02 21:11:59.890662+00', '2018-01-02 21:11:59.890662+00', '2018-01-02 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2018-01-02 21:11:59.890662+00', 2),
(8, NULL, '48fab92b-6d75-45c8-9121-81176d97bdbf', 'message 8', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 11, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL),
(9, NULL, 'e14ab466-0d3b-436d-a0f7-5851fd7d9b7d', 'message 9', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'O', 'S', 'V', 'F', NULL, NULL, 6, NULL, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL);
Expand All @@ -296,11 +331,6 @@ INSERT INTO channels_channellog(id, msg_id) VALUES
(5, 5),
(6, 6);

INSERT INTO msgs_broadcast(id, text, created_on, purged, org_id) VALUES
(1, 'eng=>"hello",fre=>"bonjour"'::hstore, '2017-08-12 22:11:59.890662+02:00', TRUE, 2),
(2, 'base=>"hola"'::hstore, '2017-08-12 22:11:59.890662+02:00', TRUE, 2),
(3, 'base=>"not purged"'::hstore, '2017-08-12 19:11:59.890662+02:00', FALSE, 2);

INSERT INTO flows_flow(id, uuid, name) VALUES
(1, '6639286a-9120-45d4-aa39-03ae3942a4a6', 'Flow 1'),
(2, '629db399-a5fb-4fa0-88e6-f479957b63d2', 'Flow 2'),
Expand Down