Skip to content

Commit

Permalink
Add analytics for total runs and messages archived
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed May 27, 2022
1 parent c9dfa92 commit 8078d37
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 73 deletions.
102 changes: 75 additions & 27 deletions archives/archives.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/nyaruka/gocommon/analytics"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -688,13 +689,6 @@ func DeleteArchiveFile(archive *Archive) error {

// CreateOrgArchives builds all the missing archives for the passed in org
func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, error) {
log := logrus.WithFields(logrus.Fields{
"org": org.Name,
"org_id": org.ID,
})
records := 0
start := time.Now()

archiveCount, err := GetCurrentArchiveCount(ctx, db, org, archiveType)
if err != nil {
return nil, errors.Wrapf(err, "error getting current archive count")
Expand Down Expand Up @@ -731,20 +725,6 @@ func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *s
archives = append(archives, daily...)
defer ctx.Done()

// sum all records in the archives
for _, archive := range archives {
records += archive.RecordCount
}

if len(archives) > 0 {
elapsed := time.Since(start)
rate := float32(records) / (float32(elapsed) / float32(time.Second))
log.WithFields(logrus.Fields{
"elapsed": elapsed,
"records_per_second": rate,
}).Info("completed archival for org")
}

return archives, nil
}

Expand Down Expand Up @@ -899,7 +879,7 @@ func executeInQuery(ctx context.Context, tx *sqlx.Tx, query string, ids []int64)

var deleteTransactionSize = 100

// DeleteArchivedOrgRecords deletes all the records for the passeg in org based on archives already created
// DeleteArchivedOrgRecords deletes all the records for the given org based on archives already created
func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, error) {
// get all the archives that haven't yet been deleted
archives, err := GetArchivesNeedingDeletion(ctx, db, org, archiveType)
Expand Down Expand Up @@ -949,13 +929,22 @@ func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config
}

// ArchiveOrg looks for any missing archives for the passed in org, creating and uploading them as necessary, returning the created archives
func ArchiveOrg(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, error) {
created, err := CreateOrgArchives(ctx, now, config, db, s3Client, org, archiveType)
func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, error) {
log := logrus.WithFields(logrus.Fields{"org": org.Name, "org_id": org.ID})
start := time.Now()

created, err := CreateOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType)
if err != nil {
return nil, nil, errors.Wrapf(err, "error creating archives")
}

monthlies, err := RollupOrgArchives(ctx, now, config, db, s3Client, org, archiveType)
if len(created) > 0 {
elapsed := time.Since(start)
rate := float32(countRecords(created)) / (float32(elapsed) / float32(time.Second))
log.WithFields(logrus.Fields{"elapsed": elapsed, "records_per_second": rate}).Info("completed archival for org")
}

monthlies, err := RollupOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType)
if err != nil {
return nil, nil, errors.Wrapf(err, "error rolling up archives")
}
Expand All @@ -964,12 +953,71 @@ func ArchiveOrg(ctx context.Context, now time.Time, config *Config, db *sqlx.DB,

// finally delete any archives not yet actually archived
deleted := make([]*Archive, 0, 1)
if config.Delete {
deleted, err = DeleteArchivedOrgRecords(ctx, now, config, db, s3Client, org, archiveType)
if cfg.Delete {
deleted, err = DeleteArchivedOrgRecords(ctx, now, cfg, db, s3Client, org, archiveType)
if err != nil {
return created, deleted, errors.Wrapf(err, "error deleting archived records")
}
}

return created, deleted, nil
}

// ArchiveActiveOrgs fetches active orgs and archives messages and runs
func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client s3iface.S3API) error {
start := time.Now()

// get our active orgs
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
orgs, err := GetActiveOrgs(ctx, db, cfg)
cancel()

if err != nil {
return errors.Wrap(err, "error getting active orgs")
}

totalRunsArchived, totalMsgsArchived := 0, 0

// for each org, do our export
for _, org := range orgs {
// no single org should take more than 12 hours
ctx, cancel := context.WithTimeout(context.Background(), time.Hour*12)
log := logrus.WithField("org", org.Name).WithField("org_id", org.ID)

if cfg.ArchiveMessages {
created, _, err := ArchiveOrg(ctx, time.Now(), cfg, db, s3Client, org, MessageType)
if err != nil {
log.WithError(err).WithField("archive_type", MessageType).Error("error archiving org messages")
}
totalMsgsArchived += countRecords(created)
}
if cfg.ArchiveRuns {
created, _, err := ArchiveOrg(ctx, time.Now(), cfg, db, s3Client, org, RunType)
if err != nil {
log.WithError(err).WithField("archive_type", RunType).Error("error archiving org runs")
}
totalRunsArchived += countRecords(created)
}

cancel()
}

timeTaken := time.Since(start)
logrus.WithField("time_taken", timeTaken).WithField("num_orgs", len(orgs)).Info("archiving of active orgs complete")

analytics.Gauge("archiver.archive_elapsed", timeTaken.Seconds())
analytics.Gauge("archiver.orgs_archived", float64(len(orgs)))
analytics.Gauge("archiver.msgs_archived", float64(totalMsgsArchived))
analytics.Gauge("archiver.runs_archived", float64(totalRunsArchived))

return nil
}

// counts the records in the given archives
func countRecords(as []*Archive) int {
n := 0
for _, a := range as {
n += a.RecordCount
}
return n
}
47 changes: 1 addition & 46 deletions cmd/rp-archiver/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"context"
"os"
"strings"
"sync"
Expand All @@ -14,7 +13,6 @@ import (
"github.com/nyaruka/ezconf"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/rp-archiver/archives"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -102,7 +100,7 @@ func main() {
}

// try to archive all active orgs, and if it fails, wait 5 minutes and try again
err = archiveActiveOrgs(db, config, s3Client)
err = archives.ArchiveActiveOrgs(db, config, s3Client)
if err != nil {
logrus.WithError(err).Error("error archiving, will retry in 5 minutes")
time.Sleep(time.Minute * 5)
Expand Down Expand Up @@ -136,46 +134,3 @@ func main() {
analytics.Stop()
wg.Wait()
}

func archiveActiveOrgs(db *sqlx.DB, cfg *archives.Config, s3Client s3iface.S3API) error {
start := time.Now()

// get our active orgs
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
orgs, err := archives.GetActiveOrgs(ctx, db, cfg)
cancel()

if err != nil {
return errors.Wrap(err, "error getting active orgs")
}

// for each org, do our export
for _, org := range orgs {
// no single org should take more than 12 hours
ctx, cancel := context.WithTimeout(context.Background(), time.Hour*12)
log := logrus.WithField("org", org.Name).WithField("org_id", org.ID)

if cfg.ArchiveMessages {
_, _, err = archives.ArchiveOrg(ctx, time.Now(), cfg, db, s3Client, org, archives.MessageType)
if err != nil {
log.WithError(err).WithField("archive_type", archives.MessageType).Error("error archiving org messages")
}
}
if cfg.ArchiveRuns {
_, _, err = archives.ArchiveOrg(ctx, time.Now(), cfg, db, s3Client, org, archives.RunType)
if err != nil {
log.WithError(err).WithField("archive_type", archives.RunType).Error("error archiving org runs")
}
}

cancel()
}

timeTaken := time.Since(start)
logrus.WithField("time_taken", timeTaken).WithField("num_orgs", len(orgs)).Info("archiving of active orgs complete")

analytics.Gauge("archiver.archive_elapsed", timeTaken.Seconds())
analytics.Gauge("archiver.archive_orgs", float64(len(orgs)))

return nil
}

0 comments on commit 8078d37

Please sign in to comment.