Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add librato analytics #67

Merged
merged 2 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 75 additions & 27 deletions archives/archives.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/nyaruka/gocommon/analytics"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -688,13 +689,6 @@ func DeleteArchiveFile(archive *Archive) error {

// CreateOrgArchives builds all the missing archives for the passed in org
func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, error) {
log := logrus.WithFields(logrus.Fields{
"org": org.Name,
"org_id": org.ID,
})
records := 0
start := time.Now()

archiveCount, err := GetCurrentArchiveCount(ctx, db, org, archiveType)
if err != nil {
return nil, errors.Wrapf(err, "error getting current archive count")
Expand Down Expand Up @@ -731,20 +725,6 @@ func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *s
archives = append(archives, daily...)
defer ctx.Done()

// sum all records in the archives
for _, archive := range archives {
records += archive.RecordCount
}

if len(archives) > 0 {
elapsed := time.Since(start)
rate := float32(records) / (float32(elapsed) / float32(time.Second))
log.WithFields(logrus.Fields{
"elapsed": elapsed,
"records_per_second": rate,
}).Info("completed archival for org")
}

return archives, nil
}

Expand Down Expand Up @@ -899,7 +879,7 @@ func executeInQuery(ctx context.Context, tx *sqlx.Tx, query string, ids []int64)

var deleteTransactionSize = 100

// DeleteArchivedOrgRecords deletes all the records for the passeg in org based on archives already created
// DeleteArchivedOrgRecords deletes all the records for the given org based on archives already created
func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, error) {
// get all the archives that haven't yet been deleted
archives, err := GetArchivesNeedingDeletion(ctx, db, org, archiveType)
Expand Down Expand Up @@ -949,13 +929,22 @@ func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config
}

// ArchiveOrg looks for any missing archives for the passed in org, creating and uploading them as necessary, returning the created archives
func ArchiveOrg(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, error) {
created, err := CreateOrgArchives(ctx, now, config, db, s3Client, org, archiveType)
func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, error) {
log := logrus.WithFields(logrus.Fields{"org": org.Name, "org_id": org.ID})
start := time.Now()

created, err := CreateOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType)
if err != nil {
return nil, nil, errors.Wrapf(err, "error creating archives")
}

monthlies, err := RollupOrgArchives(ctx, now, config, db, s3Client, org, archiveType)
if len(created) > 0 {
elapsed := time.Since(start)
rate := float32(countRecords(created)) / (float32(elapsed) / float32(time.Second))
log.WithFields(logrus.Fields{"elapsed": elapsed, "records_per_second": rate}).Info("completed archival for org")
}

monthlies, err := RollupOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType)
if err != nil {
return nil, nil, errors.Wrapf(err, "error rolling up archives")
}
Expand All @@ -964,12 +953,71 @@ func ArchiveOrg(ctx context.Context, now time.Time, config *Config, db *sqlx.DB,

// finally delete any archives not yet actually archived
deleted := make([]*Archive, 0, 1)
if config.Delete {
deleted, err = DeleteArchivedOrgRecords(ctx, now, config, db, s3Client, org, archiveType)
if cfg.Delete {
deleted, err = DeleteArchivedOrgRecords(ctx, now, cfg, db, s3Client, org, archiveType)
if err != nil {
return created, deleted, errors.Wrapf(err, "error deleting archived records")
}
}

return created, deleted, nil
}

// ArchiveActiveOrgs fetches active orgs and archives messages and runs
func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client s3iface.S3API) error {
start := time.Now()

// get our active orgs
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
orgs, err := GetActiveOrgs(ctx, db, cfg)
cancel()

if err != nil {
return errors.Wrap(err, "error getting active orgs")
}

totalRunsArchived, totalMsgsArchived := 0, 0

// for each org, do our export
for _, org := range orgs {
// no single org should take more than 12 hours
ctx, cancel := context.WithTimeout(context.Background(), time.Hour*12)
log := logrus.WithField("org", org.Name).WithField("org_id", org.ID)

if cfg.ArchiveMessages {
created, _, err := ArchiveOrg(ctx, time.Now(), cfg, db, s3Client, org, MessageType)
if err != nil {
log.WithError(err).WithField("archive_type", MessageType).Error("error archiving org messages")
}
totalMsgsArchived += countRecords(created)
}
if cfg.ArchiveRuns {
created, _, err := ArchiveOrg(ctx, time.Now(), cfg, db, s3Client, org, RunType)
if err != nil {
log.WithError(err).WithField("archive_type", RunType).Error("error archiving org runs")
}
totalRunsArchived += countRecords(created)
}

cancel()
}

timeTaken := time.Since(start)
logrus.WithField("time_taken", timeTaken).WithField("num_orgs", len(orgs)).Info("archiving of active orgs complete")

analytics.Gauge("archiver.archive_elapsed", timeTaken.Seconds())
analytics.Gauge("archiver.orgs_archived", float64(len(orgs)))
analytics.Gauge("archiver.msgs_archived", float64(totalMsgsArchived))
analytics.Gauge("archiver.runs_archived", float64(totalRunsArchived))

return nil
}

// counts the records in the given archives
func countRecords(as []*Archive) int {
n := 0
for _, a := range as {
n += a.RecordCount
}
return n
}
14 changes: 7 additions & 7 deletions archives/archives_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestGetMissingDayArchives(t *testing.T) {

// get the tasks for our org
ctx := context.Background()
config := NewConfig()
config := NewDefaultConfig()
orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)

Expand Down Expand Up @@ -82,7 +82,7 @@ func TestGetMissingMonthArchives(t *testing.T) {

// get the tasks for our org
ctx := context.Background()
config := NewConfig()
config := NewDefaultConfig()
orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)

Expand Down Expand Up @@ -115,7 +115,7 @@ func TestCreateMsgArchive(t *testing.T) {
err := EnsureTempArchiveDirectory("/tmp")
assert.NoError(t, err)

config := NewConfig()
config := NewDefaultConfig()
orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)
Expand Down Expand Up @@ -192,7 +192,7 @@ func TestCreateRunArchive(t *testing.T) {
err := EnsureTempArchiveDirectory("/tmp")
assert.NoError(t, err)

config := NewConfig()
config := NewDefaultConfig()
orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)
Expand Down Expand Up @@ -249,7 +249,7 @@ func TestWriteArchiveToDB(t *testing.T) {
db := setup(t)
ctx := context.Background()

config := NewConfig()
config := NewDefaultConfig()
orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)
Expand Down Expand Up @@ -303,7 +303,7 @@ func TestArchiveOrgMessages(t *testing.T) {
ctx := context.Background()
deleteTransactionSize = 1

config := NewConfig()
config := NewDefaultConfig()
orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)
Expand Down Expand Up @@ -444,7 +444,7 @@ func TestArchiveOrgRuns(t *testing.T) {
db := setup(t)
ctx := context.Background()

config := NewConfig()
config := NewDefaultConfig()
orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)
Expand Down
14 changes: 12 additions & 2 deletions archives/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package archives

import "os"

// Config is our top level configuration object
type Config struct {
DB string `help:"the connection string for our database"`
Expand All @@ -25,10 +27,16 @@ type Config struct {
Delete bool `help:"whether to delete messages and runs from the db after archival (default false)"`
ExitOnCompletion bool `help:"whether archiver should exit after completing archiving job (default false)"`
StartTime string `help:"what time archive jobs should run in UTC HH:MM "`

LibratoUsername string `help:"the username that will be used to authenticate to Librato"`
LibratoToken string `help:"the token that will be used to authenticate to Librato"`
InstanceName string `help:"the unique name of this instance used for analytics"`
}

// NewConfig returns a new default configuration object
func NewConfig() *Config {
// NewDefaultConfig returns a new default configuration object
func NewDefaultConfig() *Config {
hostname, _ := os.Hostname()

config := Config{
DB: "postgres://localhost/archiver_test?sslmode=disable",
LogLevel: "info",
Expand All @@ -52,6 +60,8 @@ func NewConfig() *Config {
Delete: false,
ExitOnCompletion: false,
StartTime: "00:01",

InstanceName: hostname,
}

return &config
Expand Down
54 changes: 21 additions & 33 deletions cmd/rp-archiver/main.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
package main

import (
"context"
"os"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/evalphobia/logrus_sentry"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
"github.com/nyaruka/ezconf"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/rp-archiver/archives"
"github.com/sirupsen/logrus"
)

func main() {
config := archives.NewConfig()
config := archives.NewDefaultConfig()
loader := ezconf.NewLoader(&config, "archiver", "Archives RapidPro runs and msgs to S3", []string{"archiver.toml"})
loader.MustLoad()

Expand Down Expand Up @@ -73,6 +74,15 @@ func main() {
}
}

wg := &sync.WaitGroup{}

// if we have a librato token, configure it
if config.LibratoToken != "" {
analytics.RegisterBackend(analytics.NewLibrato(config.LibratoUsername, config.LibratoToken, config.InstanceName, time.Second, wg))
}

analytics.Start()

// ensure that we can actually write to the temp directory
err = archives.EnsureTempArchiveDirectory(config.TempDir)
if err != nil {
Expand All @@ -82,46 +92,21 @@ func main() {
for {
start := time.Now().In(time.UTC)

// convert the starttime to time.Time
// convert the start time to time.Time
layout := "15:04"
hour, err := time.Parse(layout, config.StartTime)
if err != nil {
logrus.WithError(err).Fatal("invalid start time supplied, format: HH:mm")
}

// get our active orgs
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
orgs, err := archives.GetActiveOrgs(ctx, db, config)
cancel()

// try to archive all active orgs, and if it fails, wait 5 minutes and try again
err = archives.ArchiveActiveOrgs(db, config, s3Client)
if err != nil {
logrus.WithError(err).Error("error getting active orgs")
logrus.WithError(err).Error("error archiving, will retry in 5 minutes")
time.Sleep(time.Minute * 5)
continue
}

// for each org, do our export
for _, org := range orgs {
// no single org should take more than 12 hours
ctx, cancel := context.WithTimeout(context.Background(), time.Hour*12)
log := logrus.WithField("org", org.Name).WithField("org_id", org.ID)

if config.ArchiveMessages {
_, _, err = archives.ArchiveOrg(ctx, time.Now(), config, db, s3Client, org, archives.MessageType)
if err != nil {
log.WithError(err).WithField("archive_type", archives.MessageType).Error("error archiving org messages")
}
}
if config.ArchiveRuns {
_, _, err = archives.ArchiveOrg(ctx, time.Now(), config, db, s3Client, org, archives.RunType)
if err != nil {
log.WithError(err).WithField("archive_type", archives.RunType).Error("error archiving org runs")
}
}

cancel()
}

// ok, we did all our work for our orgs, quit if so configured or sleep until the next day
if config.ExitOnCompletion {
break
Expand All @@ -139,10 +124,13 @@ func main() {
napTime := nextDay.Sub(time.Now().In(time.UTC))

if napTime > time.Duration(0) {
logrus.WithField("time", napTime).WithField("next_start", nextDay).Info("Sleeping until next UTC day")
logrus.WithField("time", napTime).WithField("next_start", nextDay).Info("sleeping until next UTC day")
time.Sleep(napTime)
} else {
logrus.WithField("next_start", nextDay).Info("Rebuilding immediately without sleep")
logrus.WithField("next_start", nextDay).Info("rebuilding immediately without sleep")
}
}

analytics.Stop()
wg.Wait()
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/jmoiron/sqlx v1.3.5
github.com/lib/pq v1.10.6
github.com/nyaruka/ezconf v0.2.1
github.com/nyaruka/gocommon v1.21.0
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.1
Expand All @@ -22,6 +23,7 @@ require (
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/naoina/go-stringutil v0.1.0 // indirect
github.com/naoina/toml v0.1.1 // indirect
github.com/nyaruka/librato v1.0.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
gopkg.in/yaml.v3 v3.0.0 // indirect
Expand Down
Loading