Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include rollups in monthlies failed metric #74

Merged
merged 1 commit into from
Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 13 additions & 30 deletions archives/archives.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,19 +770,21 @@ func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client s
}

// RollupOrgArchives rolls up monthly archives from our daily archives
func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, error) {
func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, error) {
ctx, cancel := context.WithTimeout(ctx, time.Hour*3)
defer cancel()

log := logrus.WithFields(logrus.Fields{"org_id": org.ID, "org_name": org.Name, "archive_type": archiveType})
created := make([]*Archive, 0, 1)

// get our missing monthly archives
archives, err := GetMissingMonthlyArchives(ctx, db, now, org, archiveType)
if err != nil {
return nil, err
return nil, nil, err
}

created := make([]*Archive, 0, len(archives))
failed := make([]*Archive, 0, 1)

// build them from rollups
for _, archive := range archives {
log := log.WithFields(logrus.Fields{"start_date": archive.StartDate})
Expand All @@ -791,20 +793,23 @@ func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *s
err = BuildRollupArchive(ctx, db, config, s3Client, archive, now, org, archiveType)
if err != nil {
log.WithError(err).Error("error building monthly archive")
failed = append(failed, archive)
continue
}

if config.UploadToS3 {
err = UploadArchive(ctx, s3Client, config.S3Bucket, archive)
if err != nil {
log.WithError(err).Error("error writing archive to s3")
failed = append(failed, archive)
continue
}
}

err = WriteArchiveToDB(ctx, db, archive)
if err != nil {
log.WithError(err).Error("error writing record to db")
failed = append(failed, archive)
continue
}

Expand All @@ -820,7 +825,7 @@ func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *s
created = append(created, archive)
}

return created, nil
return created, failed, nil
}

const setArchiveDeleted = `
Expand All @@ -829,21 +834,6 @@ SET needs_deletion = FALSE, deleted_on = $2
WHERE id = $1
`

// helper method to safely execute an IN query in the passed in transaction
func executeInQuery(ctx context.Context, tx *sqlx.Tx, query string, ids []int64) error {
q, vs, err := sqlx.In(query, ids)
if err != nil {
return err
}
q = tx.Rebind(q)

_, err = tx.ExecContext(ctx, q, vs...)
if err != nil {
tx.Rollback()
}
return err
}

var deleteTransactionSize = 100

// DeleteArchivedOrgRecords deletes all the records for the given org based on archives already created
Expand Down Expand Up @@ -909,12 +899,14 @@ func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3
log.WithFields(logrus.Fields{"elapsed": elapsed, "records_per_second": rate}).Info("completed archival for org")
}

monthliesRolledUp, err := RollupOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType)
rollupsCreated, rollupsFailed, err := RollupOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType)
if err != nil {
return nil, nil, nil, nil, nil, errors.Wrapf(err, "error rolling up archives")
}

monthliesCreated = append(monthliesCreated, monthliesRolledUp...)
monthliesCreated = append(monthliesCreated, rollupsCreated...)
monthliesFailed = append(monthliesFailed, rollupsFailed...)
monthliesFailed = removeDuplicates(monthliesFailed) // don't double report monthlies that fail being built from db and rolled up from dailies

// finally delete any archives not yet actually archived
var deleted []*Archive
Expand Down Expand Up @@ -997,12 +989,3 @@ func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client s3iface.S3API) error {

return nil
}

// counts the records in the given archives
func countRecords(as []*Archive) int {
n := 0
for _, a := range as {
n += a.RecordCount
}
return n
}
47 changes: 47 additions & 0 deletions archives/utils.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,52 @@
package archives

import (
"context"
"fmt"
"time"

"github.com/jmoiron/sqlx"
)

// helper method to safely execute an IN query in the passed in transaction
func executeInQuery(ctx context.Context, tx *sqlx.Tx, query string, ids []int64) error {
q, vs, err := sqlx.In(query, ids)
if err != nil {
return err
}
q = tx.Rebind(q)

_, err = tx.ExecContext(ctx, q, vs...)
if err != nil {
tx.Rollback()
}
return err
}

// counts the records in the given archives
func countRecords(as []*Archive) int {
n := 0
for _, a := range as {
n += a.RecordCount
}
return n
}

// removes duplicates from a slice of archives
func removeDuplicates(as []*Archive) []*Archive {
unique := make([]*Archive, 0, len(as))
seen := make(map[string]bool)

for _, a := range as {
key := fmt.Sprintf("%s:%s:%s", a.ArchiveType, a.Period, a.StartDate.Format(time.RFC3339))
if !seen[key] {
unique = append(unique, a)
seen[key] = true
}
}
return unique
}

// chunks a slice of in64 IDs
func chunkIDs(ids []int64, size int) [][]int64 {
chunks := make([][]int64, 0, len(ids)/size+1)
Expand Down