diff --git a/archives/archives.go b/archives/archives.go index d8a581e..f58e82b 100644 --- a/archives/archives.go +++ b/archives/archives.go @@ -688,44 +688,37 @@ func DeleteArchiveFile(archive *Archive) error { } // CreateOrgArchives builds all the missing archives for the passed in org -func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, error) { +func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, error) { archiveCount, err := GetCurrentArchiveCount(ctx, db, org, archiveType) if err != nil { - return nil, errors.Wrapf(err, "error getting current archive count") + return nil, nil, nil, nil, errors.Wrapf(err, "error getting current archive count") } - archives := make([]*Archive, 0) + var dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed []*Archive // no existing archives means this might be a backfill, figure out if there are full months we can build first if archiveCount == 0 { - archives, err = GetMissingMonthlyArchives(ctx, db, now, org, archiveType) + archives, err := GetMissingMonthlyArchives(ctx, db, now, org, archiveType) if err != nil { - return nil, errors.Wrapf(err, "error getting missing monthly archives") + return nil, nil, nil, nil, errors.Wrapf(err, "error getting missing monthly archives") } // we first create monthly archives - err = createArchives(ctx, db, config, s3Client, org, archives) - if err != nil { - return nil, errors.Wrapf(err, "error creating new monthly archives") - } + monthliesCreated, monthliesFailed = createArchives(ctx, db, config, s3Client, org, archives) } // then add in daily archives taking into account the monthly that have been built daily, err := GetMissingDailyArchives(ctx, db, now, org, archiveType) if err != nil { - return nil, errors.Wrapf(err, "error getting missing daily archives") + return nil, nil, nil, nil, errors.Wrapf(err, "error getting missing daily archives") } + // we then create missing daily archives - err = createArchives(ctx, db, config, s3Client, org, daily) - if err != nil { - return nil, errors.Wrapf(err, "error creating new daily archives") - } + dailiesCreated, dailiesFailed = createArchives(ctx, db, config, s3Client, org, daily) - // append daily archives to any monthly archives - archives = append(archives, daily...) defer ctx.Done() - return archives, nil + return dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, nil } func createArchive(ctx context.Context, db *sqlx.DB, config *Config, s3Client s3iface.S3API, archive *Archive) error { @@ -758,9 +751,12 @@ func createArchive(ctx context.Context, db *sqlx.DB, config *Config, s3Client s3 return nil } -func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client s3iface.S3API, org Org, archives []*Archive) error { +func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client s3iface.S3API, org Org, archives []*Archive) ([]*Archive, []*Archive) { log := logrus.WithFields(logrus.Fields{"org_id": org.ID, "org_name": org.Name}) + created := make([]*Archive, 0, len(archives)) + failed := make([]*Archive, 0, 5) + for _, archive := range archives { log.WithFields(logrus.Fields{"start_date": archive.StartDate, "end_date": archive.endDate(), "period": archive.Period, "archive_type": archive.ArchiveType}).Info("starting archive") start := time.Now() @@ -768,13 +764,14 @@ func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client s err := createArchive(ctx, db, config, s3Client, archive) if err != nil { log.WithError(err).Error("error creating archive") - continue + failed = append(failed, archive) + } else { + log.WithFields(logrus.Fields{"id": archive.ID, "record_count": archive.RecordCount, "elapsed": time.Since(start)}).Info("archive complete") + created = append(created, archive) } - - log.WithFields(logrus.Fields{"id": archive.ID, "record_count": archive.RecordCount, "elapsed": time.Since(start)}).Info("archive complete") } - return nil + return created, failed } // RollupOrgArchives rolls up monthly archives from our daily archives @@ -895,47 +892,45 @@ func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config } deleted = append(deleted, a) - log.WithFields(logrus.Fields{ - "elapsed": time.Since(start), - }).Info("deleted archive records") + log.WithFields(logrus.Fields{"elapsed": time.Since(start)}).Info("deleted archive records") } return deleted, nil } // ArchiveOrg looks for any missing archives for the passed in org, creating and uploading them as necessary, returning the created archives -func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, error) { +func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, []*Archive, error) { log := logrus.WithFields(logrus.Fields{"org_id": org.ID, "org_name": org.Name}) start := time.Now() - created, err := CreateOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType) + dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, err := CreateOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType) if err != nil { - return nil, nil, errors.Wrapf(err, "error creating archives") + return nil, nil, nil, nil, nil, errors.Wrapf(err, "error creating archives") } - if len(created) > 0 { + if len(dailiesCreated) > 0 { elapsed := time.Since(start) - rate := float32(countRecords(created)) / (float32(elapsed) / float32(time.Second)) + rate := float32(countRecords(dailiesCreated)) / (float32(elapsed) / float32(time.Second)) log.WithFields(logrus.Fields{"elapsed": elapsed, "records_per_second": rate}).Info("completed archival for org") } - monthlies, err := RollupOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType) + monthliesRolledUp, err := RollupOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType) if err != nil { - return nil, nil, errors.Wrapf(err, "error rolling up archives") + return nil, nil, nil, nil, nil, errors.Wrapf(err, "error rolling up archives") } - created = append(created, monthlies...) + monthliesCreated = append(monthliesCreated, monthliesRolledUp...) // finally delete any archives not yet actually archived - deleted := make([]*Archive, 0, 1) + var deleted []*Archive if cfg.Delete { deleted, err = DeleteArchivedOrgRecords(ctx, now, cfg, db, s3Client, org, archiveType) if err != nil { - return created, deleted, errors.Wrapf(err, "error deleting archived records") + return dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, nil, errors.Wrapf(err, "error deleting archived records") } } - return created, deleted, nil + return dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, deleted, nil } // ArchiveActiveOrgs fetches active orgs and archives messages and runs @@ -960,18 +955,18 @@ func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client s3iface.S3API) error { log := logrus.WithField("org_id", org.ID).WithField("org_name", org.Name) if cfg.ArchiveMessages { - created, _, err := ArchiveOrg(ctx, time.Now(), cfg, db, s3Client, org, MessageType) + dailiesCreated, _, _, _, _, err := ArchiveOrg(ctx, time.Now(), cfg, db, s3Client, org, MessageType) if err != nil { log.WithError(err).WithField("archive_type", MessageType).Error("error archiving org messages") } - totalMsgsArchived += countRecords(created) + totalMsgsArchived += countRecords(dailiesCreated) } if cfg.ArchiveRuns { - created, _, err := ArchiveOrg(ctx, time.Now(), cfg, db, s3Client, org, RunType) + dailiesCreated, _, _, _, _, err := ArchiveOrg(ctx, time.Now(), cfg, db, s3Client, org, RunType) if err != nil { log.WithError(err).WithField("archive_type", RunType).Error("error archiving org runs") } - totalRunsArchived += countRecords(created) + totalRunsArchived += countRecords(dailiesCreated) } cancel() diff --git a/archives/archives_test.go b/archives/archives_test.go index daf91b0..a56425f 100644 --- a/archives/archives_test.go +++ b/archives/archives_test.go @@ -322,53 +322,24 @@ func TestArchiveOrgMessages(t *testing.T) { assertCount(t, db, 4, `SELECT count(*) from msgs_broadcast WHERE org_id = $1`, 2) - created, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[1], MessageType) + dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[1], MessageType) assert.NoError(t, err) - assert.Equal(t, 63, len(created)) - assert.Equal(t, time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), created[0].StartDate) - assert.Equal(t, DayPeriod, created[0].Period) - assert.Equal(t, 0, created[0].RecordCount) - assert.Equal(t, int64(23), created[0].Size) - assert.Equal(t, "f0d79988b7772c003d04a28bd7417a62", created[0].Hash) - - assert.Equal(t, time.Date(2017, 8, 11, 0, 0, 0, 0, time.UTC), created[1].StartDate) - assert.Equal(t, DayPeriod, created[1].Period) - assert.Equal(t, 0, created[1].RecordCount) - assert.Equal(t, int64(23), created[1].Size) - assert.Equal(t, "f0d79988b7772c003d04a28bd7417a62", created[1].Hash) - - assert.Equal(t, time.Date(2017, 8, 12, 0, 0, 0, 0, time.UTC), created[2].StartDate) - assert.Equal(t, DayPeriod, created[2].Period) - assert.Equal(t, 3, created[2].RecordCount) - assert.Equal(t, int64(528), created[2].Size) - assert.Equal(t, "b3bf00bf1234ea47f14ffd0171a8ead0", created[2].Hash) - - assert.Equal(t, time.Date(2017, 8, 13, 0, 0, 0, 0, time.UTC), created[3].StartDate) - assert.Equal(t, DayPeriod, created[3].Period) - assert.Equal(t, 1, created[3].RecordCount) - assert.Equal(t, int64(312), created[3].Size) - assert.Equal(t, "32e61b1431217b59fca0170f637d78a3", created[3].Hash) - - assert.Equal(t, time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), created[60].StartDate) - assert.Equal(t, DayPeriod, created[60].Period) - assert.Equal(t, 0, created[60].RecordCount) - assert.Equal(t, int64(23), created[60].Size) - assert.Equal(t, "f0d79988b7772c003d04a28bd7417a62", created[60].Hash) - - assert.Equal(t, time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), created[61].StartDate) - assert.Equal(t, MonthPeriod, created[61].Period) - assert.Equal(t, 4, created[61].RecordCount) - assert.Equal(t, int64(553), created[61].Size) - assert.Equal(t, "156e45e29b6587cb85ccf75e03800b00", created[61].Hash) - - assert.Equal(t, time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), created[62].StartDate) - assert.Equal(t, MonthPeriod, created[62].Period) - assert.Equal(t, 0, created[62].RecordCount) - assert.Equal(t, int64(23), created[62].Size) - assert.Equal(t, "f0d79988b7772c003d04a28bd7417a62", created[62].Hash) + assert.Equal(t, 61, len(dailiesCreated)) + assertArchive(t, dailiesCreated[0], time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") + assertArchive(t, dailiesCreated[1], time.Date(2017, 8, 11, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") + assertArchive(t, dailiesCreated[2], time.Date(2017, 8, 12, 0, 0, 0, 0, time.UTC), DayPeriod, 3, 528, "b3bf00bf1234ea47f14ffd0171a8ead0") + assertArchive(t, dailiesCreated[3], time.Date(2017, 8, 13, 0, 0, 0, 0, time.UTC), DayPeriod, 1, 312, "32e61b1431217b59fca0170f637d78a3") + assertArchive(t, dailiesCreated[4], time.Date(2017, 8, 14, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") + + assert.Equal(t, 1, len(dailiesFailed)) + + assert.Equal(t, 2, len(monthliesCreated)) + assertArchive(t, monthliesCreated[0], time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 4, 553, "156e45e29b6587cb85ccf75e03800b00") + assertArchive(t, monthliesCreated[1], time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") // no rollup for october since that had one invalid daily archive + assert.Equal(t, 1, len(monthliesFailed)) assert.Equal(t, 63, len(deleted)) assert.Equal(t, time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), deleted[0].StartDate) @@ -440,6 +411,14 @@ func assertCount(t *testing.T, db *sqlx.DB, expected int, query string, args ... assert.Equal(t, expected, count, "counts mismatch for query %s", query) } +func assertArchive(t *testing.T, a *Archive, startDate time.Time, period ArchivePeriod, recordCount int, size int64, hash string) { + assert.Equal(t, startDate, a.StartDate) + assert.Equal(t, period, a.Period) + assert.Equal(t, recordCount, a.RecordCount) + assert.Equal(t, size, a.Size) + assert.Equal(t, hash, a.Hash) +} + func TestArchiveOrgRuns(t *testing.T) { db := setup(t) ctx := context.Background() @@ -461,34 +440,16 @@ func TestArchiveOrgRuns(t *testing.T) { s3Client, err := NewS3Client(config) assert.NoError(t, err) - created, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[2], RunType) + dailiesCreated, _, monthliesCreated, _, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[2], RunType) assert.NoError(t, err) - assert.Equal(t, 12, len(created)) - - assert.Equal(t, time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), created[0].StartDate) - assert.Equal(t, MonthPeriod, created[0].Period) - assert.Equal(t, 1, created[0].RecordCount) - assert.Equal(t, int64(490), created[0].Size) - assert.Equal(t, "c2138e3c3009a9c09fc55482903d93e4", created[0].Hash) - - assert.Equal(t, time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), created[1].StartDate) - assert.Equal(t, MonthPeriod, created[1].Period) - assert.Equal(t, 0, created[1].RecordCount) - assert.Equal(t, int64(23), created[1].Size) - assert.Equal(t, "f0d79988b7772c003d04a28bd7417a62", created[1].Hash) - - assert.Equal(t, time.Date(2017, 10, 1, 0, 0, 0, 0, time.UTC), created[2].StartDate) - assert.Equal(t, DayPeriod, created[2].Period) - assert.Equal(t, 0, created[2].RecordCount) - assert.Equal(t, int64(23), created[2].Size) - assert.Equal(t, "f0d79988b7772c003d04a28bd7417a62", created[2].Hash) - - assert.Equal(t, time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), created[11].StartDate) - assert.Equal(t, DayPeriod, created[11].Period) - assert.Equal(t, 2, created[11].RecordCount) - assert.Equal(t, int64(1984), created[11].Size) - assert.Equal(t, "869cc00ad4cca0371d07c88d8cf2bf26", created[11].Hash) + assert.Equal(t, 10, len(dailiesCreated)) + assertArchive(t, dailiesCreated[0], time.Date(2017, 10, 1, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") + assertArchive(t, dailiesCreated[9], time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), DayPeriod, 2, 1984, "869cc00ad4cca0371d07c88d8cf2bf26") + + assert.Equal(t, 2, len(monthliesCreated)) + assertArchive(t, monthliesCreated[0], time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 1, 490, "c2138e3c3009a9c09fc55482903d93e4") + assertArchive(t, monthliesCreated[1], time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") assert.Equal(t, 12, len(deleted))