From c9dfa9201f14c0499eeadd5d10512e6ab013722f Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Fri, 27 May 2022 14:06:04 -0500 Subject: [PATCH 1/2] Add librato analytics for time elapsed and number of orgs --- archives/archives_test.go | 14 +++--- archives/config.go | 14 +++++- cmd/rp-archiver/main.go | 97 ++++++++++++++++++++++++++------------- go.mod | 2 + go.sum | 18 ++++---- 5 files changed, 96 insertions(+), 49 deletions(-) diff --git a/archives/archives_test.go b/archives/archives_test.go index 0fc6d01..daf91b0 100644 --- a/archives/archives_test.go +++ b/archives/archives_test.go @@ -34,7 +34,7 @@ func TestGetMissingDayArchives(t *testing.T) { // get the tasks for our org ctx := context.Background() - config := NewConfig() + config := NewDefaultConfig() orgs, err := GetActiveOrgs(ctx, db, config) assert.NoError(t, err) @@ -82,7 +82,7 @@ func TestGetMissingMonthArchives(t *testing.T) { // get the tasks for our org ctx := context.Background() - config := NewConfig() + config := NewDefaultConfig() orgs, err := GetActiveOrgs(ctx, db, config) assert.NoError(t, err) @@ -115,7 +115,7 @@ func TestCreateMsgArchive(t *testing.T) { err := EnsureTempArchiveDirectory("/tmp") assert.NoError(t, err) - config := NewConfig() + config := NewDefaultConfig() orgs, err := GetActiveOrgs(ctx, db, config) assert.NoError(t, err) now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) @@ -192,7 +192,7 @@ func TestCreateRunArchive(t *testing.T) { err := EnsureTempArchiveDirectory("/tmp") assert.NoError(t, err) - config := NewConfig() + config := NewDefaultConfig() orgs, err := GetActiveOrgs(ctx, db, config) assert.NoError(t, err) now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) @@ -249,7 +249,7 @@ func TestWriteArchiveToDB(t *testing.T) { db := setup(t) ctx := context.Background() - config := NewConfig() + config := NewDefaultConfig() orgs, err := GetActiveOrgs(ctx, db, config) assert.NoError(t, err) now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) @@ -303,7 +303,7 @@ func TestArchiveOrgMessages(t *testing.T) { ctx := context.Background() deleteTransactionSize = 1 - config := NewConfig() + config := NewDefaultConfig() orgs, err := GetActiveOrgs(ctx, db, config) assert.NoError(t, err) now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) @@ -444,7 +444,7 @@ func TestArchiveOrgRuns(t *testing.T) { db := setup(t) ctx := context.Background() - config := NewConfig() + config := NewDefaultConfig() orgs, err := GetActiveOrgs(ctx, db, config) assert.NoError(t, err) now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) diff --git a/archives/config.go b/archives/config.go index 5ec635a..f95d1cf 100644 --- a/archives/config.go +++ b/archives/config.go @@ -1,5 +1,7 @@ package archives +import "os" + // Config is our top level configuration object type Config struct { DB string `help:"the connection string for our database"` @@ -25,10 +27,16 @@ type Config struct { Delete bool `help:"whether to delete messages and runs from the db after archival (default false)"` ExitOnCompletion bool `help:"whether archiver should exit after completing archiving job (default false)"` StartTime string `help:"what time archive jobs should run in UTC HH:MM "` + + LibratoUsername string `help:"the username that will be used to authenticate to Librato"` + LibratoToken string `help:"the token that will be used to authenticate to Librato"` + InstanceName string `help:"the unique name of this instance used for analytics"` } -// NewConfig returns a new default configuration object -func NewConfig() *Config { +// NewDefaultConfig returns a new default configuration object +func NewDefaultConfig() *Config { + hostname, _ := os.Hostname() + config := Config{ DB: "postgres://localhost/archiver_test?sslmode=disable", LogLevel: "info", @@ -52,6 +60,8 @@ func NewConfig() *Config { Delete: false, ExitOnCompletion: false, StartTime: "00:01", + + InstanceName: hostname, } return &config diff --git a/cmd/rp-archiver/main.go b/cmd/rp-archiver/main.go index c48f4d2..a897dd9 100644 --- a/cmd/rp-archiver/main.go +++ b/cmd/rp-archiver/main.go @@ -4,6 +4,7 @@ import ( "context" "os" "strings" + "sync" "time" "github.com/aws/aws-sdk-go/service/s3/s3iface" @@ -11,12 +12,14 @@ import ( "github.com/jmoiron/sqlx" _ "github.com/lib/pq" "github.com/nyaruka/ezconf" + "github.com/nyaruka/gocommon/analytics" "github.com/nyaruka/rp-archiver/archives" + "github.com/pkg/errors" "github.com/sirupsen/logrus" ) func main() { - config := archives.NewConfig() + config := archives.NewDefaultConfig() loader := ezconf.NewLoader(&config, "archiver", "Archives RapidPro runs and msgs to S3", []string{"archiver.toml"}) loader.MustLoad() @@ -73,6 +76,15 @@ func main() { } } + wg := &sync.WaitGroup{} + + // if we have a librato token, configure it + if config.LibratoToken != "" { + analytics.RegisterBackend(analytics.NewLibrato(config.LibratoUsername, config.LibratoToken, config.InstanceName, time.Second, wg)) + } + + analytics.Start() + // ensure that we can actually write to the temp directory err = archives.EnsureTempArchiveDirectory(config.TempDir) if err != nil { @@ -82,46 +94,21 @@ func main() { for { start := time.Now().In(time.UTC) - // convert the starttime to time.Time + // convert the start time to time.Time layout := "15:04" hour, err := time.Parse(layout, config.StartTime) if err != nil { logrus.WithError(err).Fatal("invalid start time supplied, format: HH:mm") } - // get our active orgs - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - orgs, err := archives.GetActiveOrgs(ctx, db, config) - cancel() - + // try to archive all active orgs, and if it fails, wait 5 minutes and try again + err = archiveActiveOrgs(db, config, s3Client) if err != nil { - logrus.WithError(err).Error("error getting active orgs") + logrus.WithError(err).Error("error archiving, will retry in 5 minutes") time.Sleep(time.Minute * 5) continue } - // for each org, do our export - for _, org := range orgs { - // no single org should take more than 12 hours - ctx, cancel := context.WithTimeout(context.Background(), time.Hour*12) - log := logrus.WithField("org", org.Name).WithField("org_id", org.ID) - - if config.ArchiveMessages { - _, _, err = archives.ArchiveOrg(ctx, time.Now(), config, db, s3Client, org, archives.MessageType) - if err != nil { - log.WithError(err).WithField("archive_type", archives.MessageType).Error("error archiving org messages") - } - } - if config.ArchiveRuns { - _, _, err = archives.ArchiveOrg(ctx, time.Now(), config, db, s3Client, org, archives.RunType) - if err != nil { - log.WithError(err).WithField("archive_type", archives.RunType).Error("error archiving org runs") - } - } - - cancel() - } - // ok, we did all our work for our orgs, quit if so configured or sleep until the next day if config.ExitOnCompletion { break @@ -139,10 +126,56 @@ func main() { napTime := nextDay.Sub(time.Now().In(time.UTC)) if napTime > time.Duration(0) { - logrus.WithField("time", napTime).WithField("next_start", nextDay).Info("Sleeping until next UTC day") + logrus.WithField("time", napTime).WithField("next_start", nextDay).Info("sleeping until next UTC day") time.Sleep(napTime) } else { - logrus.WithField("next_start", nextDay).Info("Rebuilding immediately without sleep") + logrus.WithField("next_start", nextDay).Info("rebuilding immediately without sleep") + } + } + + analytics.Stop() + wg.Wait() +} + +func archiveActiveOrgs(db *sqlx.DB, cfg *archives.Config, s3Client s3iface.S3API) error { + start := time.Now() + + // get our active orgs + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + orgs, err := archives.GetActiveOrgs(ctx, db, cfg) + cancel() + + if err != nil { + return errors.Wrap(err, "error getting active orgs") + } + + // for each org, do our export + for _, org := range orgs { + // no single org should take more than 12 hours + ctx, cancel := context.WithTimeout(context.Background(), time.Hour*12) + log := logrus.WithField("org", org.Name).WithField("org_id", org.ID) + + if cfg.ArchiveMessages { + _, _, err = archives.ArchiveOrg(ctx, time.Now(), cfg, db, s3Client, org, archives.MessageType) + if err != nil { + log.WithError(err).WithField("archive_type", archives.MessageType).Error("error archiving org messages") + } } + if cfg.ArchiveRuns { + _, _, err = archives.ArchiveOrg(ctx, time.Now(), cfg, db, s3Client, org, archives.RunType) + if err != nil { + log.WithError(err).WithField("archive_type", archives.RunType).Error("error archiving org runs") + } + } + + cancel() } + + timeTaken := time.Since(start) + logrus.WithField("time_taken", timeTaken).WithField("num_orgs", len(orgs)).Info("archiving of active orgs complete") + + analytics.Gauge("archiver.archive_elapsed", timeTaken.Seconds()) + analytics.Gauge("archiver.archive_orgs", float64(len(orgs))) + + return nil } diff --git a/go.mod b/go.mod index 1258266..4cecf0f 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/jmoiron/sqlx v1.3.5 github.com/lib/pq v1.10.6 github.com/nyaruka/ezconf v0.2.1 + github.com/nyaruka/gocommon v1.21.0 github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.7.1 @@ -22,6 +23,7 @@ require ( github.com/kylelemons/godebug v1.1.0 // indirect github.com/naoina/go-stringutil v0.1.0 // indirect github.com/naoina/toml v0.1.1 // indirect + github.com/nyaruka/librato v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect gopkg.in/yaml.v3 v3.0.0 // indirect diff --git a/go.sum b/go.sum index 705f890..a2e6a3c 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,5 @@ github.com/aws/aws-sdk-go v1.44.20 h1:nllTRN24EfhDSeKsNbIc6HoC8Ogd2NCJTRB8l84kDlM= github.com/aws/aws-sdk-go v1.44.20/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= -github.com/certifi/gocertifi v0.0.0-20180118203423-deb3ae2ef261 h1:6/yVvBsKeAw05IUj4AzvrxaCnDjN4nUqKjW9+w5wixg= -github.com/certifi/gocertifi v0.0.0-20180118203423-deb3ae2ef261/go.mod h1:GJKEexRPVJrBSOjoqN5VNOIKJ5Q3RViH6eu3puDRwx4= github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d h1:S2NE3iHSwP0XV47EEXL8mWmRdEfGscSJ+7EgePNgt0s= github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -9,12 +7,9 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/evalphobia/logrus_sentry v0.8.2 h1:dotxHq+YLZsT1Bb45bB5UQbfCh3gM/nFFetyN46VoDQ= github.com/evalphobia/logrus_sentry v0.8.2/go.mod h1:pKcp+vriitUqu9KiWj/VRFbRfFNUwz95/UkgG8a6MNc= -github.com/fatih/structs v1.0.0 h1:BrX964Rv5uQ3wwS+KRUAJCBBw5PQmgJfJ6v4yly5QwU= github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= -github.com/getsentry/raven-go v0.0.0-20180430182053-263040ce1a36 h1:i93kN7TI/4T5Wx9hsBct/G1yCcZ9tLyWESh7QA7neaI= -github.com/getsentry/raven-go v0.0.0-20180430182053-263040ce1a36/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ= github.com/getsentry/raven-go v0.2.0 h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs= github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= @@ -25,6 +20,7 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGw github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g= github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= @@ -38,23 +34,29 @@ github.com/naoina/toml v0.1.1 h1:PT/lllxVVN0gzzSqSlHEmP8MJB4MY2U7STGxiouV4X8= github.com/naoina/toml v0.1.1/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E= github.com/nyaruka/ezconf v0.2.1 h1:TDXWoqjqYya1uhou1mAJZg7rgFYL98EB0Tb3+BWtUh0= github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDYYxgnQw= +github.com/nyaruka/gocommon v1.21.0 h1:nu7M2cdSPrkqUPdGsEeWX047+neo69H4x+4g/OKpoLM= +github.com/nyaruka/gocommon v1.21.0/go.mod h1:cv9r6amof1gSktfPZROClZhLFzdSIH/N9KbW6Nny4g8= +github.com/nyaruka/librato v1.0.0 h1:Vznj9WCeC1yZXbBYyYp40KnbmXLbEkjKmHesV/v2SR0= +github.com/nyaruka/librato v1.0.0/go.mod h1:pkRNLFhFurOz0QqBz6/DuTFhHHxAubWxs4Jx+J7yUgg= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd h1:O7DYs+zxREGLKzKoMQrtrEacpb0ZVXA5rIwylE2Xchk= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8 h1:OH54vjqzRWmbJ62fjuhxy7AxFFgoHN0/DPc/UrL8cAs= -golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -63,9 +65,9 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA= gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From 8078d370743c71f9d78a3e0e9922bddbf59d6914 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Fri, 27 May 2022 14:39:32 -0500 Subject: [PATCH 2/2] Add analytics for total runs and messages archived --- archives/archives.go | 102 +++++++++++++++++++++++++++++----------- cmd/rp-archiver/main.go | 47 +----------------- 2 files changed, 76 insertions(+), 73 deletions(-) diff --git a/archives/archives.go b/archives/archives.go index f72f9b5..60d3fcc 100644 --- a/archives/archives.go +++ b/archives/archives.go @@ -17,6 +17,7 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/jmoiron/sqlx" "github.com/lib/pq" + "github.com/nyaruka/gocommon/analytics" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -688,13 +689,6 @@ func DeleteArchiveFile(archive *Archive) error { // CreateOrgArchives builds all the missing archives for the passed in org func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, error) { - log := logrus.WithFields(logrus.Fields{ - "org": org.Name, - "org_id": org.ID, - }) - records := 0 - start := time.Now() - archiveCount, err := GetCurrentArchiveCount(ctx, db, org, archiveType) if err != nil { return nil, errors.Wrapf(err, "error getting current archive count") @@ -731,20 +725,6 @@ func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *s archives = append(archives, daily...) defer ctx.Done() - // sum all records in the archives - for _, archive := range archives { - records += archive.RecordCount - } - - if len(archives) > 0 { - elapsed := time.Since(start) - rate := float32(records) / (float32(elapsed) / float32(time.Second)) - log.WithFields(logrus.Fields{ - "elapsed": elapsed, - "records_per_second": rate, - }).Info("completed archival for org") - } - return archives, nil } @@ -899,7 +879,7 @@ func executeInQuery(ctx context.Context, tx *sqlx.Tx, query string, ids []int64) var deleteTransactionSize = 100 -// DeleteArchivedOrgRecords deletes all the records for the passeg in org based on archives already created +// DeleteArchivedOrgRecords deletes all the records for the given org based on archives already created func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, error) { // get all the archives that haven't yet been deleted archives, err := GetArchivesNeedingDeletion(ctx, db, org, archiveType) @@ -949,13 +929,22 @@ func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config } // ArchiveOrg looks for any missing archives for the passed in org, creating and uploading them as necessary, returning the created archives -func ArchiveOrg(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, error) { - created, err := CreateOrgArchives(ctx, now, config, db, s3Client, org, archiveType) +func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, error) { + log := logrus.WithFields(logrus.Fields{"org": org.Name, "org_id": org.ID}) + start := time.Now() + + created, err := CreateOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType) if err != nil { return nil, nil, errors.Wrapf(err, "error creating archives") } - monthlies, err := RollupOrgArchives(ctx, now, config, db, s3Client, org, archiveType) + if len(created) > 0 { + elapsed := time.Since(start) + rate := float32(countRecords(created)) / (float32(elapsed) / float32(time.Second)) + log.WithFields(logrus.Fields{"elapsed": elapsed, "records_per_second": rate}).Info("completed archival for org") + } + + monthlies, err := RollupOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType) if err != nil { return nil, nil, errors.Wrapf(err, "error rolling up archives") } @@ -964,8 +953,8 @@ func ArchiveOrg(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, // finally delete any archives not yet actually archived deleted := make([]*Archive, 0, 1) - if config.Delete { - deleted, err = DeleteArchivedOrgRecords(ctx, now, config, db, s3Client, org, archiveType) + if cfg.Delete { + deleted, err = DeleteArchivedOrgRecords(ctx, now, cfg, db, s3Client, org, archiveType) if err != nil { return created, deleted, errors.Wrapf(err, "error deleting archived records") } @@ -973,3 +962,62 @@ func ArchiveOrg(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, return created, deleted, nil } + +// ArchiveActiveOrgs fetches active orgs and archives messages and runs +func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client s3iface.S3API) error { + start := time.Now() + + // get our active orgs + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + orgs, err := GetActiveOrgs(ctx, db, cfg) + cancel() + + if err != nil { + return errors.Wrap(err, "error getting active orgs") + } + + totalRunsArchived, totalMsgsArchived := 0, 0 + + // for each org, do our export + for _, org := range orgs { + // no single org should take more than 12 hours + ctx, cancel := context.WithTimeout(context.Background(), time.Hour*12) + log := logrus.WithField("org", org.Name).WithField("org_id", org.ID) + + if cfg.ArchiveMessages { + created, _, err := ArchiveOrg(ctx, time.Now(), cfg, db, s3Client, org, MessageType) + if err != nil { + log.WithError(err).WithField("archive_type", MessageType).Error("error archiving org messages") + } + totalMsgsArchived += countRecords(created) + } + if cfg.ArchiveRuns { + created, _, err := ArchiveOrg(ctx, time.Now(), cfg, db, s3Client, org, RunType) + if err != nil { + log.WithError(err).WithField("archive_type", RunType).Error("error archiving org runs") + } + totalRunsArchived += countRecords(created) + } + + cancel() + } + + timeTaken := time.Since(start) + logrus.WithField("time_taken", timeTaken).WithField("num_orgs", len(orgs)).Info("archiving of active orgs complete") + + analytics.Gauge("archiver.archive_elapsed", timeTaken.Seconds()) + analytics.Gauge("archiver.orgs_archived", float64(len(orgs))) + analytics.Gauge("archiver.msgs_archived", float64(totalMsgsArchived)) + analytics.Gauge("archiver.runs_archived", float64(totalRunsArchived)) + + return nil +} + +// counts the records in the given archives +func countRecords(as []*Archive) int { + n := 0 + for _, a := range as { + n += a.RecordCount + } + return n +} diff --git a/cmd/rp-archiver/main.go b/cmd/rp-archiver/main.go index a897dd9..3c019e3 100644 --- a/cmd/rp-archiver/main.go +++ b/cmd/rp-archiver/main.go @@ -1,7 +1,6 @@ package main import ( - "context" "os" "strings" "sync" @@ -14,7 +13,6 @@ import ( "github.com/nyaruka/ezconf" "github.com/nyaruka/gocommon/analytics" "github.com/nyaruka/rp-archiver/archives" - "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -102,7 +100,7 @@ func main() { } // try to archive all active orgs, and if it fails, wait 5 minutes and try again - err = archiveActiveOrgs(db, config, s3Client) + err = archives.ArchiveActiveOrgs(db, config, s3Client) if err != nil { logrus.WithError(err).Error("error archiving, will retry in 5 minutes") time.Sleep(time.Minute * 5) @@ -136,46 +134,3 @@ func main() { analytics.Stop() wg.Wait() } - -func archiveActiveOrgs(db *sqlx.DB, cfg *archives.Config, s3Client s3iface.S3API) error { - start := time.Now() - - // get our active orgs - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - orgs, err := archives.GetActiveOrgs(ctx, db, cfg) - cancel() - - if err != nil { - return errors.Wrap(err, "error getting active orgs") - } - - // for each org, do our export - for _, org := range orgs { - // no single org should take more than 12 hours - ctx, cancel := context.WithTimeout(context.Background(), time.Hour*12) - log := logrus.WithField("org", org.Name).WithField("org_id", org.ID) - - if cfg.ArchiveMessages { - _, _, err = archives.ArchiveOrg(ctx, time.Now(), cfg, db, s3Client, org, archives.MessageType) - if err != nil { - log.WithError(err).WithField("archive_type", archives.MessageType).Error("error archiving org messages") - } - } - if cfg.ArchiveRuns { - _, _, err = archives.ArchiveOrg(ctx, time.Now(), cfg, db, s3Client, org, archives.RunType) - if err != nil { - log.WithError(err).WithField("archive_type", archives.RunType).Error("error archiving org runs") - } - } - - cancel() - } - - timeTaken := time.Since(start) - logrus.WithField("time_taken", timeTaken).WithField("num_orgs", len(orgs)).Info("archiving of active orgs complete") - - analytics.Gauge("archiver.archive_elapsed", timeTaken.Seconds()) - analytics.Gauge("archiver.archive_orgs", float64(len(orgs))) - - return nil -}