Skip to content

Commit

Permalink
Add librato analytics for time elapsed and number of orgs
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed May 27, 2022
1 parent 770112e commit c9dfa92
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 49 deletions.
14 changes: 7 additions & 7 deletions archives/archives_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestGetMissingDayArchives(t *testing.T) {

// get the tasks for our org
ctx := context.Background()
config := NewConfig()
config := NewDefaultConfig()
orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)

Expand Down Expand Up @@ -82,7 +82,7 @@ func TestGetMissingMonthArchives(t *testing.T) {

// get the tasks for our org
ctx := context.Background()
config := NewConfig()
config := NewDefaultConfig()
orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)

Expand Down Expand Up @@ -115,7 +115,7 @@ func TestCreateMsgArchive(t *testing.T) {
err := EnsureTempArchiveDirectory("/tmp")
assert.NoError(t, err)

config := NewConfig()
config := NewDefaultConfig()
orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)
Expand Down Expand Up @@ -192,7 +192,7 @@ func TestCreateRunArchive(t *testing.T) {
err := EnsureTempArchiveDirectory("/tmp")
assert.NoError(t, err)

config := NewConfig()
config := NewDefaultConfig()
orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)
Expand Down Expand Up @@ -249,7 +249,7 @@ func TestWriteArchiveToDB(t *testing.T) {
db := setup(t)
ctx := context.Background()

config := NewConfig()
config := NewDefaultConfig()
orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)
Expand Down Expand Up @@ -303,7 +303,7 @@ func TestArchiveOrgMessages(t *testing.T) {
ctx := context.Background()
deleteTransactionSize = 1

config := NewConfig()
config := NewDefaultConfig()
orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)
Expand Down Expand Up @@ -444,7 +444,7 @@ func TestArchiveOrgRuns(t *testing.T) {
db := setup(t)
ctx := context.Background()

config := NewConfig()
config := NewDefaultConfig()
orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)
Expand Down
14 changes: 12 additions & 2 deletions archives/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package archives

import "os"

// Config is our top level configuration object
type Config struct {
DB string `help:"the connection string for our database"`
Expand All @@ -25,10 +27,16 @@ type Config struct {
Delete bool `help:"whether to delete messages and runs from the db after archival (default false)"`
ExitOnCompletion bool `help:"whether archiver should exit after completing archiving job (default false)"`
StartTime string `help:"what time archive jobs should run in UTC HH:MM "`

LibratoUsername string `help:"the username that will be used to authenticate to Librato"`
LibratoToken string `help:"the token that will be used to authenticate to Librato"`
InstanceName string `help:"the unique name of this instance used for analytics"`
}

// NewConfig returns a new default configuration object
func NewConfig() *Config {
// NewDefaultConfig returns a new default configuration object
func NewDefaultConfig() *Config {
hostname, _ := os.Hostname()

config := Config{
DB: "postgres://localhost/archiver_test?sslmode=disable",
LogLevel: "info",
Expand All @@ -52,6 +60,8 @@ func NewConfig() *Config {
Delete: false,
ExitOnCompletion: false,
StartTime: "00:01",

InstanceName: hostname,
}

return &config
Expand Down
97 changes: 65 additions & 32 deletions cmd/rp-archiver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@ import (
"context"
"os"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/evalphobia/logrus_sentry"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
"github.com/nyaruka/ezconf"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/rp-archiver/archives"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

func main() {
config := archives.NewConfig()
config := archives.NewDefaultConfig()
loader := ezconf.NewLoader(&config, "archiver", "Archives RapidPro runs and msgs to S3", []string{"archiver.toml"})
loader.MustLoad()

Expand Down Expand Up @@ -73,6 +76,15 @@ func main() {
}
}

wg := &sync.WaitGroup{}

// if we have a librato token, configure it
if config.LibratoToken != "" {
analytics.RegisterBackend(analytics.NewLibrato(config.LibratoUsername, config.LibratoToken, config.InstanceName, time.Second, wg))
}

analytics.Start()

// ensure that we can actually write to the temp directory
err = archives.EnsureTempArchiveDirectory(config.TempDir)
if err != nil {
Expand All @@ -82,46 +94,21 @@ func main() {
for {
start := time.Now().In(time.UTC)

// convert the starttime to time.Time
// convert the start time to time.Time
layout := "15:04"
hour, err := time.Parse(layout, config.StartTime)
if err != nil {
logrus.WithError(err).Fatal("invalid start time supplied, format: HH:mm")
}

// get our active orgs
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
orgs, err := archives.GetActiveOrgs(ctx, db, config)
cancel()

// try to archive all active orgs, and if it fails, wait 5 minutes and try again
err = archiveActiveOrgs(db, config, s3Client)
if err != nil {
logrus.WithError(err).Error("error getting active orgs")
logrus.WithError(err).Error("error archiving, will retry in 5 minutes")
time.Sleep(time.Minute * 5)
continue
}

// for each org, do our export
for _, org := range orgs {
// no single org should take more than 12 hours
ctx, cancel := context.WithTimeout(context.Background(), time.Hour*12)
log := logrus.WithField("org", org.Name).WithField("org_id", org.ID)

if config.ArchiveMessages {
_, _, err = archives.ArchiveOrg(ctx, time.Now(), config, db, s3Client, org, archives.MessageType)
if err != nil {
log.WithError(err).WithField("archive_type", archives.MessageType).Error("error archiving org messages")
}
}
if config.ArchiveRuns {
_, _, err = archives.ArchiveOrg(ctx, time.Now(), config, db, s3Client, org, archives.RunType)
if err != nil {
log.WithError(err).WithField("archive_type", archives.RunType).Error("error archiving org runs")
}
}

cancel()
}

// ok, we did all our work for our orgs, quit if so configured or sleep until the next day
if config.ExitOnCompletion {
break
Expand All @@ -139,10 +126,56 @@ func main() {
napTime := nextDay.Sub(time.Now().In(time.UTC))

if napTime > time.Duration(0) {
logrus.WithField("time", napTime).WithField("next_start", nextDay).Info("Sleeping until next UTC day")
logrus.WithField("time", napTime).WithField("next_start", nextDay).Info("sleeping until next UTC day")
time.Sleep(napTime)
} else {
logrus.WithField("next_start", nextDay).Info("Rebuilding immediately without sleep")
logrus.WithField("next_start", nextDay).Info("rebuilding immediately without sleep")
}
}

analytics.Stop()
wg.Wait()
}

func archiveActiveOrgs(db *sqlx.DB, cfg *archives.Config, s3Client s3iface.S3API) error {
start := time.Now()

// get our active orgs
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
orgs, err := archives.GetActiveOrgs(ctx, db, cfg)
cancel()

if err != nil {
return errors.Wrap(err, "error getting active orgs")
}

// for each org, do our export
for _, org := range orgs {
// no single org should take more than 12 hours
ctx, cancel := context.WithTimeout(context.Background(), time.Hour*12)
log := logrus.WithField("org", org.Name).WithField("org_id", org.ID)

if cfg.ArchiveMessages {
_, _, err = archives.ArchiveOrg(ctx, time.Now(), cfg, db, s3Client, org, archives.MessageType)
if err != nil {
log.WithError(err).WithField("archive_type", archives.MessageType).Error("error archiving org messages")
}
}
if cfg.ArchiveRuns {
_, _, err = archives.ArchiveOrg(ctx, time.Now(), cfg, db, s3Client, org, archives.RunType)
if err != nil {
log.WithError(err).WithField("archive_type", archives.RunType).Error("error archiving org runs")
}
}

cancel()
}

timeTaken := time.Since(start)
logrus.WithField("time_taken", timeTaken).WithField("num_orgs", len(orgs)).Info("archiving of active orgs complete")

analytics.Gauge("archiver.archive_elapsed", timeTaken.Seconds())
analytics.Gauge("archiver.archive_orgs", float64(len(orgs)))

return nil
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/jmoiron/sqlx v1.3.5
github.com/lib/pq v1.10.6
github.com/nyaruka/ezconf v0.2.1
github.com/nyaruka/gocommon v1.21.0
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.1
Expand All @@ -22,6 +23,7 @@ require (
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/naoina/go-stringutil v0.1.0 // indirect
github.com/naoina/toml v0.1.1 // indirect
github.com/nyaruka/librato v1.0.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
gopkg.in/yaml.v3 v3.0.0 // indirect
Expand Down
18 changes: 10 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
github.com/aws/aws-sdk-go v1.44.20 h1:nllTRN24EfhDSeKsNbIc6HoC8Ogd2NCJTRB8l84kDlM=
github.com/aws/aws-sdk-go v1.44.20/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/certifi/gocertifi v0.0.0-20180118203423-deb3ae2ef261 h1:6/yVvBsKeAw05IUj4AzvrxaCnDjN4nUqKjW9+w5wixg=
github.com/certifi/gocertifi v0.0.0-20180118203423-deb3ae2ef261/go.mod h1:GJKEexRPVJrBSOjoqN5VNOIKJ5Q3RViH6eu3puDRwx4=
github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d h1:S2NE3iHSwP0XV47EEXL8mWmRdEfGscSJ+7EgePNgt0s=
github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/evalphobia/logrus_sentry v0.8.2 h1:dotxHq+YLZsT1Bb45bB5UQbfCh3gM/nFFetyN46VoDQ=
github.com/evalphobia/logrus_sentry v0.8.2/go.mod h1:pKcp+vriitUqu9KiWj/VRFbRfFNUwz95/UkgG8a6MNc=
github.com/fatih/structs v1.0.0 h1:BrX964Rv5uQ3wwS+KRUAJCBBw5PQmgJfJ6v4yly5QwU=
github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/getsentry/raven-go v0.0.0-20180430182053-263040ce1a36 h1:i93kN7TI/4T5Wx9hsBct/G1yCcZ9tLyWESh7QA7neaI=
github.com/getsentry/raven-go v0.0.0-20180430182053-263040ce1a36/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
github.com/getsentry/raven-go v0.2.0 h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs=
github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
Expand All @@ -25,6 +20,7 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGw
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g=
github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
Expand All @@ -38,23 +34,29 @@ github.com/naoina/toml v0.1.1 h1:PT/lllxVVN0gzzSqSlHEmP8MJB4MY2U7STGxiouV4X8=
github.com/naoina/toml v0.1.1/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E=
github.com/nyaruka/ezconf v0.2.1 h1:TDXWoqjqYya1uhou1mAJZg7rgFYL98EB0Tb3+BWtUh0=
github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDYYxgnQw=
github.com/nyaruka/gocommon v1.21.0 h1:nu7M2cdSPrkqUPdGsEeWX047+neo69H4x+4g/OKpoLM=
github.com/nyaruka/gocommon v1.21.0/go.mod h1:cv9r6amof1gSktfPZROClZhLFzdSIH/N9KbW6Nny4g8=
github.com/nyaruka/librato v1.0.0 h1:Vznj9WCeC1yZXbBYyYp40KnbmXLbEkjKmHesV/v2SR0=
github.com/nyaruka/librato v1.0.0/go.mod h1:pkRNLFhFurOz0QqBz6/DuTFhHHxAubWxs4Jx+J7yUgg=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd h1:O7DYs+zxREGLKzKoMQrtrEacpb0ZVXA5rIwylE2Xchk=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8 h1:OH54vjqzRWmbJ62fjuhxy7AxFFgoHN0/DPc/UrL8cAs=
golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand All @@ -63,9 +65,9 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA=
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

0 comments on commit c9dfa92

Please sign in to comment.