diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0397357..e07e9f4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,62 +1,62 @@ name: CI on: [push, pull_request] env: - go-version: '1.17.x' + go-version: "1.18.x" jobs: test: name: Test strategy: matrix: - pg-version: ['12', '13'] + pg-version: ["12", "13"] runs-on: ubuntu-latest steps: - - name: Checkout code - uses: actions/checkout@v1 + - name: Checkout code + uses: actions/checkout@v1 - - name: Install PostgreSQL - uses: harmon758/postgresql-action@v1 - with: - postgresql version: ${{ matrix.pg-version }} - postgresql db: archiver_test - postgresql user: temba - postgresql password: temba + - name: Install PostgreSQL + uses: harmon758/postgresql-action@v1 + with: + postgresql version: ${{ matrix.pg-version }} + postgresql db: archiver_test + postgresql user: temba + postgresql password: temba - - name: Install Go - uses: actions/setup-go@v1 - with: - go-version: ${{ env.go-version }} + - name: Install Go + uses: actions/setup-go@v1 + with: + go-version: ${{ env.go-version }} - - name: Run tests - run: go test -p=1 -coverprofile=coverage.text -covermode=atomic ./... - env: - ARCHIVER_AWS_ACCESS_KEY_ID: ${{ secrets.ARCHIVER_AWS_ACCESS_KEY_ID }} - ARCHIVER_AWS_SECRET_ACCESS_KEY: ${{ secrets.ARCHIVER_AWS_SECRET_ACCESS_KEY }} + - name: Run tests + run: go test -p=1 -coverprofile=coverage.text -covermode=atomic ./... + env: + ARCHIVER_AWS_ACCESS_KEY_ID: ${{ secrets.ARCHIVER_AWS_ACCESS_KEY_ID }} + ARCHIVER_AWS_SECRET_ACCESS_KEY: ${{ secrets.ARCHIVER_AWS_SECRET_ACCESS_KEY }} + + - name: Upload coverage + if: success() + uses: codecov/codecov-action@v1 + with: + token: ${{ secrets.CODECOV_TOKEN }} - - name: Upload coverage - if: success() - uses: codecov/codecov-action@v1 - with: - token: ${{ secrets.CODECOV_TOKEN }} - release: name: Release needs: [test] if: startsWith(github.ref, 'refs/tags/') runs-on: ubuntu-latest steps: - - name: Checkout code - uses: actions/checkout@v1 + - name: Checkout code + uses: actions/checkout@v1 - - name: Install Go - uses: actions/setup-go@v1 - with: - go-version: ${{ env.go-version }} + - name: Install Go + uses: actions/setup-go@v1 + with: + go-version: ${{ env.go-version }} - - name: Publish release - uses: goreleaser/goreleaser-action@v1 - with: - version: v0.147.2 - args: release --rm-dist - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - fail_ci_if_error: true + - name: Publish release + uses: goreleaser/goreleaser-action@v1 + with: + version: v0.147.2 + args: release --rm-dist + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + fail_ci_if_error: true diff --git a/.gitignore b/.gitignore index 223874a..a40925b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,4 @@ # Binaries for programs and plugins -rp-archiver *.exe *.exe~ *.dll diff --git a/CHANGELOG.md b/CHANGELOG.md index f2af975..53075fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,70 @@ +v7.4.0 +---------- + * Include rollups in monthlies failed metric as well as monthlies created from scratch + +v7.3.7 +---------- + * Change query used to update rollup_id on dailies + * Remove temporary logging + +v7.3.6 +---------- + * Add temporary additional logging + * Replace ExitOnCompletion config option with Once which makes it run once and exit + +v7.3.5 +---------- + * Improve librato analytics and add tests + +v7.3.4 +---------- + * Rework stats reporting + * Log version at startup + +v7.3.3 +---------- + * Fix parsing start times after midday + +v7.3.2 +---------- + * Don't log entire run JSON on error, just UUID + * Make archival happen at configured start time even on first pass + +v7.3.1 +---------- + * Add librato analytics for time elapsed and number of orgs, msgs and runs + +v7.3.0 +---------- + * Update to go 1.18 and upgrade dependencies + * Add support for Msg.visibility=X (deleted by sender) + * Add arm64 as a build target + +v7.2.0 +---------- + * Tweak README + +v7.1.6 +---------- + * Stop setting delete_reason on runs before deletion + +v7.1.5 +---------- + * Stop updating msgs_msg.delete_reason which is no longer needed + +v7.1.4 +---------- + * Record flow on msgs + +v7.1.3 +---------- + * Remove deletion of recent runs as these are no longer created + +v7.1.2 +---------- + * Use run status instead of is_active and exit_type + * No longer include events in run archives + v7.1.1 ---------- * Remove references to flowrun.parent_id which is no longer set by mailroom diff --git a/README.md b/README.md index c943915..3ee84cd 100644 --- a/README.md +++ b/README.md @@ -8,16 +8,17 @@ RP-Archiver is the [RapidPro](https://github.com/rapidpro/rapidpro) component re old runs and messages. It interacts directly with the RapidPro database and writes archive files to an S3 compatible endpoint. -# Deploying +## Deploying As Archiver is a Go application, it compiles to a binary and that binary along with the config file is all you need to run it on your server. You can find bundles for each platform in the [releases directory](https://github.com/nyaruka/rp-archiver/releases). You should only run a single archiver instance for a deployment. -# Configuration +## Configuration Archiver uses a tiered configuration system, each option takes precendence over the ones above it: + 1. The configuration file 2. Environment variables starting with `ARCHIVER_` 3. Command line parameters @@ -26,7 +27,7 @@ We recommend running Archiver with no changes to the configuration and no parame environment variables to configure it. You can use `% rp-archiver --help` to see a list of the environment variables and parameters and for more details on each option. -# RapidPro Configuration +### RapidPro Configuration For use with RapidPro, you will want to configure these settings: @@ -46,7 +47,7 @@ Recommended settings for error reporting: * `ARCHIVER_SENTRY_DSN`: The DSN to use when logging errors to Sentry -# Development +## Development Once you've checked out the code, you can build Archiver with: @@ -65,70 +66,58 @@ $ createdb archiver_test To run all of the tests: ``` -go test ./... -p=1 +go test -p=1 ./... ``` -## Usage +## Reference -``` -Archives RapidPro runs and msgs to S3 +These are the configuration options that can be provided as parameters or environment variables. If using environment +varibles, convert to uppercase, replace dashes with underscores and prefix the name with `ARCHIVER_`, e.g. `-archive-messages` +becomes `ARCHIVER_ARCHIVE_MESSAGES`. -Usage of archiver: +``` -archive-messages - whether we should archive messages (default true) + whether we should archive messages (default true) -archive-runs - whether we should archive runs (default true) + whether we should archive runs (default true) -aws-access-key-id string - the access key id to use when authenticating S3 (default "missing_aws_access_key_id") + the access key id to use when authenticating S3 (default "missing_aws_access_key_id") -aws-secret-access-key string - the secret access key id to use when authenticating S3 (default "missing_aws_secret_access_key") + the secret access key id to use when authenticating S3 (default "missing_aws_secret_access_key") -db string - the connection string for our database (default "postgres://localhost/archiver_test?sslmode=disable") + the connection string for our database (default "postgres://localhost/archiver_test?sslmode=disable") -debug-conf - print where config values are coming from + print where config values are coming from -delete - whether to delete messages and runs from the db after archival (default false) + whether to delete messages and runs from the db after archival (default false) -help - print usage information + print usage information -keep-files - whether we should keep local archive files after upload (default false) + whether we should keep local archive files after upload (default false) + -librato-username + the Librato username for metrics reporting + -librato-token + the Librato token for metrics reporting -log-level string - the log level, one of error, warn, info, debug (default "info") + the log level, one of error, warn, info, debug (default "info") + -once + run archving immediately and then exit -retention-period int - the number of days to keep before archiving (default 90) + the number of days to keep before archiving (default 90) -s3-bucket string - the S3 bucket we will write archives to (default "dl-archiver-test") + the S3 bucket we will write archives to (default "dl-archiver-test") -s3-disable-ssl - whether we disable SSL when accessing S3. Should always be set to False unless you're hosting an S3 compatible service within a secure internal network + whether we disable SSL when accessing S3. Should always be set to False unless you're hosting an S3 compatible service within a secure internal network -s3-endpoint string - the S3 endpoint we will write archives to (default "https://s3.amazonaws.com") + the S3 endpoint we will write archives to (default "https://s3.amazonaws.com") -s3-force-path-style - whether we force S3 path style. Should generally need to default to False unless you're hosting an S3 compatible service + whether we force S3 path style. Should generally need to default to False unless you're hosting an S3 compatible service -s3-region string - the S3 region we will write archives to (default "us-east-1") + the S3 region we will write archives to (default "us-east-1") -sentry-dsn string - the sentry configuration to log errors to, if any + the sentry configuration to log errors to, if any -temp-dir string - directory where temporary archive files are written (default "/tmp") + directory where temporary archive files are written (default "/tmp") -upload-to-s3 - whether we should upload archive to S3 (default true) - -Environment variables: - ARCHIVER_ARCHIVE_MESSAGES - bool - ARCHIVER_ARCHIVE_RUNS - bool - ARCHIVER_AWS_ACCESS_KEY_ID - string - ARCHIVER_AWS_SECRET_ACCESS_KEY - string - ARCHIVER_DB - string - ARCHIVER_DELETE - bool - ARCHIVER_KEEP_FILES - bool - ARCHIVER_LOG_LEVEL - string - ARCHIVER_RETENTION_PERIOD - int - ARCHIVER_S3_BUCKET - string - ARCHIVER_S3_DISABLE_SSL - bool - ARCHIVER_S3_ENDPOINT - string - ARCHIVER_S3_FORCE_PATH_STYLE - bool - ARCHIVER_S3_REGION - string - ARCHIVER_SENTRY_DSN - string - ARCHIVER_TEMP_DIR - string - ARCHIVER_UPLOAD_TO_S3 - bool + whether we should upload archive to S3 (default true) ``` diff --git a/WENI-CHANGELOG.md b/WENI-CHANGELOG.md index 3f5b3e0..03cbaa5 100644 --- a/WENI-CHANGELOG.md +++ b/WENI-CHANGELOG.md @@ -1,3 +1,7 @@ +1.2.3-archiver-7.4.0 +---------- + * Update to rp-archiver v7.4.0 + 1.2.2-archiver-7.1.1 ---------- * No longer consider retention time on delete operation diff --git a/archives/archives.go b/archives/archives.go index f72f9b5..35bda2e 100644 --- a/archives/archives.go +++ b/archives/archives.go @@ -17,6 +17,8 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/jmoiron/sqlx" "github.com/lib/pq" + "github.com/nyaruka/gocommon/analytics" + "github.com/nyaruka/gocommon/dates" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -322,7 +324,7 @@ func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client ctx, cancel := context.WithTimeout(ctx, time.Hour) defer cancel() - start := time.Now() + start := dates.Now() // figure out the first day in the monthlyArchive we'll archive startDate := monthlyArchive.StartDate @@ -339,7 +341,7 @@ func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client } if len(missingDailies) != 0 { - return fmt.Errorf("missing '%d' daily archives", len(missingDailies)) + return fmt.Errorf("missing %d daily archives", len(missingDailies)) } // great, we have all the dailies we need, download them @@ -423,7 +425,7 @@ func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client } monthlyArchive.Size = stat.Size() monthlyArchive.RecordCount = recordCount - monthlyArchive.BuildTime = int(time.Since(start) / time.Millisecond) + monthlyArchive.BuildTime = int(dates.Since(start) / time.Millisecond) monthlyArchive.Dailies = dailies monthlyArchive.NeedsDeletion = false @@ -466,7 +468,7 @@ func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archi ctx, cancel := context.WithTimeout(ctx, time.Hour*3) defer cancel() - start := time.Now() + start := dates.Now() log := logrus.WithFields(logrus.Fields{ "org_id": archive.Org.ID, @@ -539,14 +541,14 @@ func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archi archive.ArchiveFile = file.Name() archive.Size = stat.Size() archive.RecordCount = recordCount - archive.BuildTime = int(time.Since(start) / time.Millisecond) + archive.BuildTime = int(dates.Since(start) / time.Millisecond) log.WithFields(logrus.Fields{ "record_count": recordCount, "filename": file.Name(), "file_size": archive.Size, "file_hash": archive.Hash, - "elapsed": time.Since(start), + "elapsed": dates.Since(start), }).Debug("completed writing archive file") return nil @@ -598,19 +600,13 @@ VALUES(:archive_type, :org_id, :created_on, :start_date, :period, :record_count, RETURNING id ` -const updateRollups = ` -UPDATE archives_archive -SET rollup_id = $1 -WHERE ARRAY[id] <@ $2 -` - // WriteArchiveToDB write an archive to the Database func WriteArchiveToDB(ctx context.Context, db *sqlx.DB, archive *Archive) error { ctx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() archive.OrgID = archive.Org.ID - archive.CreatedOn = time.Now() + archive.CreatedOn = dates.Now() tx, err := db.BeginTxx(ctx, nil) if err != nil { @@ -639,7 +635,7 @@ func WriteArchiveToDB(ctx context.Context, db *sqlx.DB, archive *Archive) error childIDs = append(childIDs, c.ID) } - result, err := tx.ExecContext(ctx, updateRollups, archive.ID, pq.Array(childIDs)) + result, err := tx.ExecContext(ctx, `UPDATE archives_archive SET rollup_id = $1 WHERE id = ANY($2)`, archive.ID, pq.Array(childIDs)) if err != nil { tx.Rollback() return errors.Wrapf(err, "error updating rollup ids") @@ -687,65 +683,37 @@ func DeleteArchiveFile(archive *Archive) error { } // CreateOrgArchives builds all the missing archives for the passed in org -func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, error) { - log := logrus.WithFields(logrus.Fields{ - "org": org.Name, - "org_id": org.ID, - }) - records := 0 - start := time.Now() - +func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, error) { archiveCount, err := GetCurrentArchiveCount(ctx, db, org, archiveType) if err != nil { - return nil, errors.Wrapf(err, "error getting current archive count") + return nil, nil, nil, nil, errors.Wrapf(err, "error getting current archive count") } - archives := make([]*Archive, 0) + var dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed []*Archive // no existing archives means this might be a backfill, figure out if there are full months we can build first if archiveCount == 0 { - archives, err = GetMissingMonthlyArchives(ctx, db, now, org, archiveType) + archives, err := GetMissingMonthlyArchives(ctx, db, now, org, archiveType) if err != nil { - return nil, errors.Wrapf(err, "error getting missing monthly archives") + return nil, nil, nil, nil, errors.Wrapf(err, "error getting missing monthly archives") } // we first create monthly archives - err = createArchives(ctx, db, config, s3Client, org, archives) - if err != nil { - return nil, errors.Wrapf(err, "error creating new monthly archives") - } + monthliesCreated, monthliesFailed = createArchives(ctx, db, config, s3Client, org, archives) } // then add in daily archives taking into account the monthly that have been built daily, err := GetMissingDailyArchives(ctx, db, now, org, archiveType) if err != nil { - return nil, errors.Wrapf(err, "error getting missing daily archives") + return nil, nil, nil, nil, errors.Wrapf(err, "error getting missing daily archives") } + // we then create missing daily archives - err = createArchives(ctx, db, config, s3Client, org, daily) - if err != nil { - return nil, errors.Wrapf(err, "error creating new daily archives") - } + dailiesCreated, dailiesFailed = createArchives(ctx, db, config, s3Client, org, daily) - // append daily archives to any monthly archives - archives = append(archives, daily...) defer ctx.Done() - // sum all records in the archives - for _, archive := range archives { - records += archive.RecordCount - } - - if len(archives) > 0 { - elapsed := time.Since(start) - rate := float32(records) / (float32(elapsed) / float32(time.Second)) - log.WithFields(logrus.Fields{ - "elapsed": elapsed, - "records_per_second": rate, - }).Info("completed archival for org") - } - - return archives, nil + return dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, nil } func createArchive(ctx context.Context, db *sqlx.DB, config *Config, s3Client s3iface.S3API, archive *Archive) error { @@ -778,68 +746,54 @@ func createArchive(ctx context.Context, db *sqlx.DB, config *Config, s3Client s3 return nil } -func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client s3iface.S3API, org Org, archives []*Archive) error { - log := logrus.WithFields(logrus.Fields{ - "org": org.Name, - "org_id": org.ID, - }) +func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client s3iface.S3API, org Org, archives []*Archive) ([]*Archive, []*Archive) { + log := logrus.WithFields(logrus.Fields{"org_id": org.ID, "org_name": org.Name}) - for _, archive := range archives { - log.WithFields(logrus.Fields{ - "start_date": archive.StartDate, - "end_date": archive.endDate(), - "period": archive.Period, - "archive_type": archive.ArchiveType, - }).Info("starting archive") + created := make([]*Archive, 0, len(archives)) + failed := make([]*Archive, 0, 5) - start := time.Now() + for _, archive := range archives { + log.WithFields(logrus.Fields{"start_date": archive.StartDate, "end_date": archive.endDate(), "period": archive.Period, "archive_type": archive.ArchiveType}).Debug("starting archive") + start := dates.Now() err := createArchive(ctx, db, config, s3Client, archive) if err != nil { log.WithError(err).Error("error creating archive") - continue + failed = append(failed, archive) + } else { + log.WithFields(logrus.Fields{"id": archive.ID, "record_count": archive.RecordCount, "elapsed": dates.Since(start)}).Debug("archive complete") + created = append(created, archive) } - - elapsed := time.Since(start) - log.WithFields(logrus.Fields{ - "id": archive.ID, - "record_count": archive.RecordCount, - "elapsed": elapsed, - }).Info("archive complete") } - return nil + return created, failed } // RollupOrgArchives rolls up monthly archives from our daily archives -func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, error) { +func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, error) { ctx, cancel := context.WithTimeout(ctx, time.Hour*3) defer cancel() - log := logrus.WithFields(logrus.Fields{ - "org": org.Name, - "org_id": org.ID, - }) - created := make([]*Archive, 0, 1) + log := logrus.WithFields(logrus.Fields{"org_id": org.ID, "org_name": org.Name, "archive_type": archiveType}) // get our missing monthly archives archives, err := GetMissingMonthlyArchives(ctx, db, now, org, archiveType) if err != nil { - return nil, err + return nil, nil, err } + created := make([]*Archive, 0, len(archives)) + failed := make([]*Archive, 0, 1) + // build them from rollups for _, archive := range archives { - log.WithFields(logrus.Fields{ - "start_date": archive.StartDate, - "archive_type": archive.ArchiveType, - }) - start := time.Now() - log.Info("starting rollup") + log := log.WithFields(logrus.Fields{"start_date": archive.StartDate}) + start := dates.Now() err = BuildRollupArchive(ctx, db, config, s3Client, archive, now, org, archiveType) if err != nil { log.WithError(err).Error("error building monthly archive") + failed = append(failed, archive) continue } @@ -847,6 +801,7 @@ func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *s err = UploadArchive(ctx, s3Client, config.S3Bucket, archive) if err != nil { log.WithError(err).Error("error writing archive to s3") + failed = append(failed, archive) continue } } @@ -854,6 +809,7 @@ func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *s err = WriteArchiveToDB(ctx, db, archive) if err != nil { log.WithError(err).Error("error writing record to db") + failed = append(failed, archive) continue } @@ -865,15 +821,11 @@ func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *s } } - log.WithFields(logrus.Fields{ - "id": archive.ID, - "record_count": archive.RecordCount, - "elapsed": time.Since(start), - }).Info("rollup complete") + log.WithFields(logrus.Fields{"id": archive.ID, "record_count": archive.RecordCount, "elapsed": dates.Since(start)}).Info("rollup created") created = append(created, archive) } - return created, nil + return created, failed, nil } const setArchiveDeleted = ` @@ -882,24 +834,9 @@ SET needs_deletion = FALSE, deleted_on = $2 WHERE id = $1 ` -// helper method to safely execute an IN query in the passed in transaction -func executeInQuery(ctx context.Context, tx *sqlx.Tx, query string, ids []int64) error { - q, vs, err := sqlx.In(query, ids) - if err != nil { - return err - } - q = tx.Rebind(q) - - _, err = tx.ExecContext(ctx, q, vs...) - if err != nil { - tx.Rollback() - } - return err -} - var deleteTransactionSize = 100 -// DeleteArchivedOrgRecords deletes all the records for the passeg in org based on archives already created +// DeleteArchivedOrgRecords deletes all the records for the given org based on archives already created func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, error) { // get all the archives that haven't yet been deleted archives, err := GetArchivesNeedingDeletion(ctx, db, org, archiveType) @@ -919,7 +856,7 @@ func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config "period": a.Period, }) - start := time.Now() + start := dates.Now() switch a.ArchiveType { case MessageType: @@ -940,36 +877,115 @@ func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config } deleted = append(deleted, a) - log.WithFields(logrus.Fields{ - "elapsed": time.Since(start), - }).Info("deleted archive records") + log.WithFields(logrus.Fields{"elapsed": dates.Since(start)}).Info("deleted archive records") } return deleted, nil } // ArchiveOrg looks for any missing archives for the passed in org, creating and uploading them as necessary, returning the created archives -func ArchiveOrg(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, error) { - created, err := CreateOrgArchives(ctx, now, config, db, s3Client, org, archiveType) +func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, []*Archive, error) { + log := logrus.WithFields(logrus.Fields{"org_id": org.ID, "org_name": org.Name}) + start := dates.Now() + + dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, err := CreateOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType) if err != nil { - return nil, nil, errors.Wrapf(err, "error creating archives") + return nil, nil, nil, nil, nil, errors.Wrapf(err, "error creating archives") } - monthlies, err := RollupOrgArchives(ctx, now, config, db, s3Client, org, archiveType) + if len(dailiesCreated) > 0 { + elapsed := dates.Since(start) + rate := float32(countRecords(dailiesCreated)) / (float32(elapsed) / float32(time.Second)) + log.WithFields(logrus.Fields{"elapsed": elapsed, "records_per_second": rate}).Info("completed archival for org") + } + + rollupsCreated, rollupsFailed, err := RollupOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType) if err != nil { - return nil, nil, errors.Wrapf(err, "error rolling up archives") + return nil, nil, nil, nil, nil, errors.Wrapf(err, "error rolling up archives") } - created = append(created, monthlies...) + monthliesCreated = append(monthliesCreated, rollupsCreated...) + monthliesFailed = append(monthliesFailed, rollupsFailed...) + monthliesFailed = removeDuplicates(monthliesFailed) // don't double report monthlies that fail being built from db and rolled up from dailies // finally delete any archives not yet actually archived - deleted := make([]*Archive, 0, 1) - if config.Delete { - deleted, err = DeleteArchivedOrgRecords(ctx, now, config, db, s3Client, org, archiveType) + var deleted []*Archive + if cfg.Delete { + deleted, err = DeleteArchivedOrgRecords(ctx, now, cfg, db, s3Client, org, archiveType) if err != nil { - return created, deleted, errors.Wrapf(err, "error deleting archived records") + return dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, nil, errors.Wrapf(err, "error deleting archived records") } } - return created, deleted, nil + return dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, deleted, nil +} + +// ArchiveActiveOrgs fetches active orgs and archives messages and runs +func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client s3iface.S3API) error { + start := dates.Now() + + // get our active orgs + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + orgs, err := GetActiveOrgs(ctx, db, cfg) + cancel() + + if err != nil { + return errors.Wrap(err, "error getting active orgs") + } + + totalRunsRecordsArchived, totalMsgsRecordsArchived := 0, 0 + totalRunsArchivesCreated, totalMsgsArchivesCreated := 0, 0 + totalRunsArchivesFailed, totalMsgsArchivesFailed := 0, 0 + totalRunsRollupsCreated, totalMsgsRollupsCreated := 0, 0 + totalRunsRollupsFailed, totalMsgsRollupsFailed := 0, 0 + + // for each org, do our export + for _, org := range orgs { + // no single org should take more than 12 hours + ctx, cancel := context.WithTimeout(context.Background(), time.Hour*12) + log := logrus.WithField("org_id", org.ID).WithField("org_name", org.Name) + + if cfg.ArchiveMessages { + dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, start, cfg, db, s3Client, org, MessageType) + if err != nil { + log.WithError(err).WithField("archive_type", MessageType).Error("error archiving org messages") + } + totalMsgsRecordsArchived += countRecords(dailiesCreated) + totalMsgsArchivesCreated += len(dailiesCreated) + totalMsgsArchivesFailed += len(dailiesFailed) + totalMsgsRollupsCreated += len(monthliesCreated) + totalMsgsRollupsFailed += len(monthliesFailed) + } + if cfg.ArchiveRuns { + dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, start, cfg, db, s3Client, org, RunType) + if err != nil { + log.WithError(err).WithField("archive_type", RunType).Error("error archiving org runs") + } + totalRunsRecordsArchived += countRecords(dailiesCreated) + totalRunsArchivesCreated += len(dailiesCreated) + totalRunsArchivesFailed += len(dailiesFailed) + totalRunsRollupsCreated += len(monthliesCreated) + totalRunsRollupsFailed += len(monthliesFailed) + } + + cancel() + } + + timeTaken := dates.Now().Sub(start) + logrus.WithField("time_taken", timeTaken).WithField("num_orgs", len(orgs)).Info("archiving of active orgs complete") + + analytics.Gauge("archiver.archive_elapsed", timeTaken.Seconds()) + analytics.Gauge("archiver.orgs_archived", float64(len(orgs))) + analytics.Gauge("archiver.msgs_records_archived", float64(totalMsgsRecordsArchived)) + analytics.Gauge("archiver.msgs_archives_created", float64(totalMsgsArchivesCreated)) + analytics.Gauge("archiver.msgs_archives_failed", float64(totalMsgsArchivesFailed)) + analytics.Gauge("archiver.msgs_rollups_created", float64(totalMsgsRollupsCreated)) + analytics.Gauge("archiver.msgs_rollups_failed", float64(totalMsgsRollupsFailed)) + analytics.Gauge("archiver.runs_records_archived", float64(totalRunsRecordsArchived)) + analytics.Gauge("archiver.runs_archives_created", float64(totalRunsArchivesCreated)) + analytics.Gauge("archiver.runs_archives_failed", float64(totalRunsArchivesFailed)) + analytics.Gauge("archiver.runs_rollups_created", float64(totalRunsRollupsCreated)) + analytics.Gauge("archiver.runs_rollups_failed", float64(totalRunsRollupsFailed)) + + return nil } diff --git a/archives/archives_test.go b/archives/archives_test.go index 58aafaf..f017cef 100644 --- a/archives/archives_test.go +++ b/archives/archives_test.go @@ -11,6 +11,8 @@ import ( "github.com/jmoiron/sqlx" _ "github.com/lib/pq" "github.com/nyaruka/ezconf" + "github.com/nyaruka/gocommon/analytics" + "github.com/nyaruka/gocommon/dates" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" ) @@ -34,7 +36,7 @@ func TestGetMissingDayArchives(t *testing.T) { // get the tasks for our org ctx := context.Background() - config := NewConfig() + config := NewDefaultConfig() orgs, err := GetActiveOrgs(ctx, db, config) assert.NoError(t, err) @@ -82,7 +84,7 @@ func TestGetMissingMonthArchives(t *testing.T) { // get the tasks for our org ctx := context.Background() - config := NewConfig() + config := NewDefaultConfig() orgs, err := GetActiveOrgs(ctx, db, config) assert.NoError(t, err) @@ -115,7 +117,7 @@ func TestCreateMsgArchive(t *testing.T) { err := EnsureTempArchiveDirectory("/tmp") assert.NoError(t, err) - config := NewConfig() + config := NewDefaultConfig() orgs, err := GetActiveOrgs(ctx, db, config) assert.NoError(t, err) now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) @@ -143,9 +145,9 @@ func TestCreateMsgArchive(t *testing.T) { // should have two records, second will have attachments assert.Equal(t, 3, task.RecordCount) - assert.Equal(t, int64(483), task.Size) + assert.Equal(t, int64(528), task.Size) assert.Equal(t, time.Date(2017, 8, 12, 0, 0, 0, 0, time.UTC), task.StartDate) - assert.Equal(t, "6fe9265860425cf1f9757ba3d91b1a05", task.Hash) + assert.Equal(t, "b3bf00bf1234ea47f14ffd0171a8ead0", task.Hash) assertArchiveFile(t, task, "messages1.jsonl") DeleteArchiveFile(task) @@ -163,8 +165,8 @@ func TestCreateMsgArchive(t *testing.T) { // should have one record assert.Equal(t, 1, task.RecordCount) - assert.Equal(t, int64(290), task.Size) - assert.Equal(t, "a719c7ec64c516a6e159d26a70cb4225", task.Hash) + assert.Equal(t, int64(294), task.Size) + assert.Equal(t, "bd163ead077774425aa559e30d48ca87", task.Hash) assertArchiveFile(t, task, "messages2.jsonl") DeleteArchiveFile(task) @@ -192,7 +194,7 @@ func TestCreateRunArchive(t *testing.T) { err := EnsureTempArchiveDirectory("/tmp") assert.NoError(t, err) - config := NewConfig() + config := NewDefaultConfig() orgs, err := GetActiveOrgs(ctx, db, config) assert.NoError(t, err) now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) @@ -218,8 +220,8 @@ func TestCreateRunArchive(t *testing.T) { // should have two record assert.Equal(t, 2, task.RecordCount) - assert.Equal(t, int64(642), task.Size) - assert.Equal(t, "f793f863f5e060b9d67c5688a555da6a", task.Hash) + assert.Equal(t, int64(472), task.Size) + assert.Equal(t, "734d437e1c66d09e033d698c732178f8", task.Hash) assertArchiveFile(t, task, "runs1.jsonl") DeleteArchiveFile(task) @@ -238,8 +240,8 @@ func TestCreateRunArchive(t *testing.T) { // should have one record assert.Equal(t, 1, task.RecordCount) - assert.Equal(t, int64(497), task.Size) - assert.Equal(t, "074de71dfb619c78dbac5b6709dd66c2", task.Hash) + assert.Equal(t, int64(490), task.Size) + assert.Equal(t, "c2138e3c3009a9c09fc55482903d93e4", task.Hash) assertArchiveFile(t, task, "runs2.jsonl") DeleteArchiveFile(task) @@ -249,7 +251,7 @@ func TestWriteArchiveToDB(t *testing.T) { db := setup(t) ctx := context.Background() - config := NewConfig() + config := NewDefaultConfig() orgs, err := GetActiveOrgs(ctx, db, config) assert.NoError(t, err) now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) @@ -303,7 +305,7 @@ func TestArchiveOrgMessages(t *testing.T) { ctx := context.Background() deleteTransactionSize = 1 - config := NewConfig() + config := NewDefaultConfig() orgs, err := GetActiveOrgs(ctx, db, config) assert.NoError(t, err) now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) @@ -322,53 +324,23 @@ func TestArchiveOrgMessages(t *testing.T) { assertCount(t, db, 4, `SELECT count(*) from msgs_broadcast WHERE org_id = $1`, 2) - created, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[1], MessageType) + dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[1], MessageType) assert.NoError(t, err) - assert.Equal(t, 63, len(created)) - assert.Equal(t, time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), created[0].StartDate) - assert.Equal(t, DayPeriod, created[0].Period) - assert.Equal(t, 0, created[0].RecordCount) - assert.Equal(t, int64(23), created[0].Size) - assert.Equal(t, "f0d79988b7772c003d04a28bd7417a62", created[0].Hash) - - assert.Equal(t, time.Date(2017, 8, 11, 0, 0, 0, 0, time.UTC), created[1].StartDate) - assert.Equal(t, DayPeriod, created[1].Period) - assert.Equal(t, 0, created[1].RecordCount) - assert.Equal(t, int64(23), created[1].Size) - assert.Equal(t, "f0d79988b7772c003d04a28bd7417a62", created[1].Hash) - - assert.Equal(t, time.Date(2017, 8, 12, 0, 0, 0, 0, time.UTC), created[2].StartDate) - assert.Equal(t, DayPeriod, created[2].Period) - assert.Equal(t, 3, created[2].RecordCount) - assert.Equal(t, int64(483), created[2].Size) - assert.Equal(t, "6fe9265860425cf1f9757ba3d91b1a05", created[2].Hash) - - assert.Equal(t, time.Date(2017, 8, 13, 0, 0, 0, 0, time.UTC), created[3].StartDate) - assert.Equal(t, DayPeriod, created[3].Period) - assert.Equal(t, 1, created[3].RecordCount) - assert.Equal(t, int64(306), created[3].Size) - assert.Equal(t, "7ece4401d3afac9c08a913398f213ffa", created[3].Hash) - - assert.Equal(t, time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), created[60].StartDate) - assert.Equal(t, DayPeriod, created[60].Period) - assert.Equal(t, 0, created[60].RecordCount) - assert.Equal(t, int64(23), created[60].Size) - assert.Equal(t, "f0d79988b7772c003d04a28bd7417a62", created[60].Hash) - - assert.Equal(t, time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), created[61].StartDate) - assert.Equal(t, MonthPeriod, created[61].Period) - assert.Equal(t, 4, created[61].RecordCount) - assert.Equal(t, int64(509), created[61].Size) - assert.Equal(t, "9e40be76913bf58655b70ee96dcac25d", created[61].Hash) - - assert.Equal(t, time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), created[62].StartDate) - assert.Equal(t, MonthPeriod, created[62].Period) - assert.Equal(t, 0, created[62].RecordCount) - assert.Equal(t, int64(23), created[62].Size) - assert.Equal(t, "f0d79988b7772c003d04a28bd7417a62", created[62].Hash) - - // no rollup for october since that had one invalid daily archive + assert.Equal(t, 61, len(dailiesCreated)) + assertArchive(t, dailiesCreated[0], time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") + assertArchive(t, dailiesCreated[1], time.Date(2017, 8, 11, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") + assertArchive(t, dailiesCreated[2], time.Date(2017, 8, 12, 0, 0, 0, 0, time.UTC), DayPeriod, 3, 528, "b3bf00bf1234ea47f14ffd0171a8ead0") + assertArchive(t, dailiesCreated[3], time.Date(2017, 8, 13, 0, 0, 0, 0, time.UTC), DayPeriod, 1, 312, "32e61b1431217b59fca0170f637d78a3") + assertArchive(t, dailiesCreated[4], time.Date(2017, 8, 14, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") + + assert.Equal(t, 0, len(dailiesFailed)) + + assert.Equal(t, 2, len(monthliesCreated)) + assertArchive(t, monthliesCreated[0], time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 4, 553, "156e45e29b6587cb85ccf75e03800b00") + assertArchive(t, monthliesCreated[1], time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") + + assert.Equal(t, 0, len(monthliesFailed)) assert.Equal(t, 63, len(deleted)) assert.Equal(t, time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), deleted[0].StartDate) @@ -440,11 +412,19 @@ func assertCount(t *testing.T, db *sqlx.DB, expected int, query string, args ... assert.Equal(t, expected, count, "counts mismatch for query %s", query) } +func assertArchive(t *testing.T, a *Archive, startDate time.Time, period ArchivePeriod, recordCount int, size int64, hash string) { + assert.Equal(t, startDate, a.StartDate) + assert.Equal(t, period, a.Period) + assert.Equal(t, recordCount, a.RecordCount) + assert.Equal(t, size, a.Size) + assert.Equal(t, hash, a.Hash) +} + func TestArchiveOrgRuns(t *testing.T) { db := setup(t) ctx := context.Background() - config := NewConfig() + config := NewDefaultConfig() orgs, err := GetActiveOrgs(ctx, db, config) assert.NoError(t, err) now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) @@ -461,34 +441,16 @@ func TestArchiveOrgRuns(t *testing.T) { s3Client, err := NewS3Client(config) assert.NoError(t, err) - created, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[2], RunType) + dailiesCreated, _, monthliesCreated, _, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[2], RunType) assert.NoError(t, err) - assert.Equal(t, 12, len(created)) - - assert.Equal(t, time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), created[0].StartDate) - assert.Equal(t, MonthPeriod, created[0].Period) - assert.Equal(t, 1, created[0].RecordCount) - assert.Equal(t, int64(497), created[0].Size) - assert.Equal(t, "074de71dfb619c78dbac5b6709dd66c2", created[0].Hash) - - assert.Equal(t, time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), created[1].StartDate) - assert.Equal(t, MonthPeriod, created[1].Period) - assert.Equal(t, 0, created[1].RecordCount) - assert.Equal(t, int64(23), created[1].Size) - assert.Equal(t, "f0d79988b7772c003d04a28bd7417a62", created[1].Hash) - - assert.Equal(t, time.Date(2017, 10, 1, 0, 0, 0, 0, time.UTC), created[2].StartDate) - assert.Equal(t, DayPeriod, created[2].Period) - assert.Equal(t, 0, created[2].RecordCount) - assert.Equal(t, int64(23), created[2].Size) - assert.Equal(t, "f0d79988b7772c003d04a28bd7417a62", created[2].Hash) + assert.Equal(t, 10, len(dailiesCreated)) + assertArchive(t, dailiesCreated[0], time.Date(2017, 10, 1, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") + assertArchive(t, dailiesCreated[9], time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), DayPeriod, 2, 1984, "869cc00ad4cca0371d07c88d8cf2bf26") - assert.Equal(t, time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), created[11].StartDate) - assert.Equal(t, DayPeriod, created[11].Period) - assert.Equal(t, 2, created[11].RecordCount) - assert.Equal(t, int64(2002), created[11].Size) - assert.Equal(t, "b75d6ee33ce26b786f1b341e875ecd62", created[11].Hash) + assert.Equal(t, 2, len(monthliesCreated)) + assertArchive(t, monthliesCreated[0], time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 1, 490, "c2138e3c3009a9c09fc55482903d93e4") + assertArchive(t, monthliesCreated[1], time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") assert.Equal(t, 12, len(deleted)) @@ -517,7 +479,7 @@ func TestArchiveOrgRuns(t *testing.T) { time.Date(2020, 2, 1, 0, 0, 0, 0, time.UTC), ) assert.NoError(t, err) - assert.Equal(t, 2, count) + assert.Equal(t, 3, count) // more recent run unaffected (even though it was parent) count, err = getCountInRange( @@ -529,5 +491,63 @@ func TestArchiveOrgRuns(t *testing.T) { ) assert.NoError(t, err) assert.Equal(t, 1, count) + + // org 2 has a run that can't be archived because it's still active - as it has no existing archives + // this will manifest itself as a monthly which fails to save + dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[1], RunType) + assert.NoError(t, err) + + assert.Equal(t, 31, len(dailiesCreated)) + assertArchive(t, dailiesCreated[0], time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") + + assert.Equal(t, 1, len(dailiesFailed)) + assertArchive(t, dailiesFailed[0], time.Date(2017, 8, 14, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 0, "") + + assert.Equal(t, 1, len(monthliesCreated)) + assertArchive(t, monthliesCreated[0], time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") + + assert.Equal(t, 1, len(monthliesFailed)) + assertArchive(t, monthliesFailed[0], time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 0, "") + } +} + +func TestArchiveActiveOrgs(t *testing.T) { + db := setup(t) + config := NewDefaultConfig() + + os.Args = []string{"rp-archiver"} + loader := ezconf.NewLoader(&config, "archiver", "Archives RapidPro runs and msgs to S3", nil) + loader.MustLoad() + + mockAnalytics := analytics.NewMock() + analytics.RegisterBackend(mockAnalytics) + analytics.Start() + + dates.SetNowSource(dates.NewSequentialNowSource(time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC))) + defer dates.SetNowSource(dates.DefaultNowSource) + + if config.AWSAccessKeyID != "missing_aws_access_key_id" && config.AWSSecretAccessKey != "missing_aws_secret_access_key" { + s3Client, err := NewS3Client(config) + assert.NoError(t, err) + + err = ArchiveActiveOrgs(db, config, s3Client) + assert.NoError(t, err) + + assert.Equal(t, map[string][]float64{ + "archiver.archive_elapsed": {848.0}, + "archiver.orgs_archived": {3}, + "archiver.msgs_records_archived": {5}, + "archiver.msgs_archives_created": {92}, + "archiver.msgs_archives_failed": {0}, + "archiver.msgs_rollups_created": {3}, + "archiver.msgs_rollups_failed": {0}, + "archiver.runs_records_archived": {4}, + "archiver.runs_archives_created": {41}, + "archiver.runs_archives_failed": {1}, + "archiver.runs_rollups_created": {3}, + "archiver.runs_rollups_failed": {1}, + }, mockAnalytics.Gauges) } + + analytics.Stop() } diff --git a/archives/config.go b/archives/config.go index 5ec635a..c1f4a1c 100644 --- a/archives/config.go +++ b/archives/config.go @@ -1,5 +1,7 @@ package archives +import "os" + // Config is our top level configuration object type Config struct { DB string `help:"the connection string for our database"` @@ -19,16 +21,22 @@ type Config struct { KeepFiles bool `help:"whether we should keep local archive files after upload (default false)"` UploadToS3 bool `help:"whether we should upload archive to S3"` - ArchiveMessages bool `help:"whether we should archive messages"` - ArchiveRuns bool `help:"whether we should archive runs"` - RetentionPeriod int `help:"the number of days to keep before archiving"` - Delete bool `help:"whether to delete messages and runs from the db after archival (default false)"` - ExitOnCompletion bool `help:"whether archiver should exit after completing archiving job (default false)"` - StartTime string `help:"what time archive jobs should run in UTC HH:MM "` + ArchiveMessages bool `help:"whether we should archive messages"` + ArchiveRuns bool `help:"whether we should archive runs"` + RetentionPeriod int `help:"the number of days to keep before archiving"` + Delete bool `help:"whether to delete messages and runs from the db after archival (default false)"` + StartTime string `help:"what time archive jobs should run in UTC HH:MM "` + Once bool `help:"whether archiver should run once and exit (default false)"` + + LibratoUsername string `help:"the username that will be used to authenticate to Librato"` + LibratoToken string `help:"the token that will be used to authenticate to Librato"` + InstanceName string `help:"the unique name of this instance used for analytics"` } -// NewConfig returns a new default configuration object -func NewConfig() *Config { +// NewDefaultConfig returns a new default configuration object +func NewDefaultConfig() *Config { + hostname, _ := os.Hostname() + config := Config{ DB: "postgres://localhost/archiver_test?sslmode=disable", LogLevel: "info", @@ -46,12 +54,14 @@ func NewConfig() *Config { KeepFiles: false, UploadToS3: true, - ArchiveMessages: true, - ArchiveRuns: true, - RetentionPeriod: 90, - Delete: false, - ExitOnCompletion: false, - StartTime: "00:01", + ArchiveMessages: true, + ArchiveRuns: true, + RetentionPeriod: 90, + Delete: false, + StartTime: "00:01", + Once: false, + + InstanceName: hostname, } return &config diff --git a/archives/messages.go b/archives/messages.go index b957015..634b209 100644 --- a/archives/messages.go +++ b/archives/messages.go @@ -8,63 +8,57 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/jmoiron/sqlx" + "github.com/nyaruka/gocommon/dates" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) +const ( + visibilityDeletedByUser = "D" + visibilityDeletedBySender = "X" +) + const lookupMsgs = ` SELECT rec.visibility, row_to_json(rec) FROM ( SELECT - mm.id, - broadcast_id as broadcast, - row_to_json(contact) as contact, - CASE WHEN oo.is_anon = False THEN ccu.identity ELSE null END as urn, - row_to_json(channel) as channel, - CASE WHEN direction = 'I' THEN 'in' - WHEN direction = 'O' THEN 'out' - ELSE NULL - END as direction, - CASE WHEN msg_type = 'F' - THEN 'flow' - WHEN msg_type = 'V' - THEN 'ivr' - WHEN msg_type = 'I' - THEN 'inbox' - ELSE NULL - END as "type", - CASE when status = 'I' then 'initializing' - WHEN status = 'P' then 'queued' - WHEN status = 'Q' then 'queued' - WHEN status = 'W' then 'wired' - WHEN status = 'D' then 'delivered' - WHEN status = 'H' then 'handled' - WHEN status = 'E' then 'errored' - WHEN status = 'F' then 'failed' - WHEN status = 'S' then 'sent' - WHEN status = 'R' then 'resent' + mm.id, + broadcast_id as broadcast, + row_to_json(contact) as contact, + CASE WHEN oo.is_anon = FALSE THEN ccu.identity ELSE NULL END AS urn, + row_to_json(channel) as channel, + row_to_json(flow) as flow, + CASE WHEN direction = 'I' THEN 'in' WHEN direction = 'O' THEN 'out' ELSE NULL END AS direction, + CASE WHEN msg_type = 'F' THEN 'flow' WHEN msg_type = 'V' THEN 'ivr' WHEN msg_type = 'I' THEN 'inbox' ELSE NULL END AS "type", + CASE + WHEN status = 'I' THEN 'initializing' + WHEN status = 'P' THEN 'queued' + WHEN status = 'Q' THEN 'queued' + WHEN status = 'W' THEN 'wired' + WHEN status = 'D' THEN 'delivered' + WHEN status = 'H' THEN 'handled' + WHEN status = 'E' THEN 'errored' + WHEN status = 'F' THEN 'failed' + WHEN status = 'S' THEN 'sent' + WHEN status = 'R' THEN 'resent' ELSE NULL - END as status, - - CASE WHEN visibility = 'V' THEN 'visible' - WHEN visibility = 'A' THEN 'archived' - WHEN visibility = 'D' THEN 'deleted' - ELSE NULL - END as visibility, - text, - (select coalesce(jsonb_agg(attach_row), '[]'::jsonb) FROM (select attach_data.attachment[1] as content_type, attach_data.attachment[2] as url FROM (select regexp_matches(unnest(attachments), '^(.*?):(.*)$') attachment) as attach_data) as attach_row) as attachments, - labels_agg.data as labels, - mm.created_on as created_on, - sent_on, - mm.modified_on as modified_on + END as status, + CASE WHEN visibility = 'V' THEN 'visible' WHEN visibility = 'A' THEN 'archived' WHEN visibility = 'D' THEN 'deleted' WHEN visibility = 'X' THEN 'deleted' ELSE NULL END as visibility, + text, + (select coalesce(jsonb_agg(attach_row), '[]'::jsonb) FROM (select attach_data.attachment[1] as content_type, attach_data.attachment[2] as url FROM (select regexp_matches(unnest(attachments), '^(.*?):(.*)$') attachment) as attach_data) as attach_row) as attachments, + labels_agg.data as labels, + mm.created_on as created_on, + sent_on, + mm.modified_on as modified_on FROM msgs_msg mm - JOIN orgs_org oo ON mm.org_id = oo.id - JOIN LATERAL (select uuid, name from contacts_contact cc where cc.id = mm.contact_id) as contact ON True - LEFT JOIN contacts_contacturn ccu ON mm.contact_urn_id = ccu.id - LEFT JOIN LATERAL (select uuid, name from channels_channel ch where ch.id = mm.channel_id) as channel ON True - LEFT JOIN LATERAL (select coalesce(jsonb_agg(label_row), '[]'::jsonb) as data from (select uuid, name from msgs_label ml INNER JOIN msgs_msg_labels mml ON ml.id = mml.label_id AND mml.msg_id = mm.id) as label_row) as labels_agg ON True - - WHERE mm.org_id = $1 AND mm.created_on >= $2 AND mm.created_on < $3 - ORDER BY created_on ASC, id ASC) rec; + JOIN orgs_org oo ON mm.org_id = oo.id + JOIN LATERAL (select uuid, name from contacts_contact cc where cc.id = mm.contact_id) as contact ON True + LEFT JOIN contacts_contacturn ccu ON mm.contact_urn_id = ccu.id + LEFT JOIN LATERAL (select uuid, name from channels_channel ch where ch.id = mm.channel_id) as channel ON True + LEFT JOIN LATERAL (select uuid, name from flows_flow f where f.id = mm.flow_id) as flow ON True + LEFT JOIN LATERAL (select coalesce(jsonb_agg(label_row), '[]'::jsonb) as data from (select uuid, name from msgs_label ml INNER JOIN msgs_msg_labels mml ON ml.id = mml.label_id AND mml.msg_id = mm.id) as label_row) as labels_agg ON True + + WHERE mm.org_id = $1 AND mm.created_on >= $2 AND mm.created_on < $3 +ORDER BY created_on ASC, id ASC) rec; ` // writeMessageRecords writes the messages in the archive's date range to the passed in writer @@ -107,12 +101,6 @@ WHERE mm.org_id = $1 AND mm.created_on >= $2 AND mm.created_on < $3 ORDER BY mm.created_on ASC, mm.id ASC ` -const setMessageDeleteReason = ` -UPDATE msgs_msg -SET delete_reason = 'A' -WHERE id IN(?) -` - const deleteMessageLogs = ` DELETE FROM channels_channellog WHERE msg_id IN(?) @@ -136,7 +124,7 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 outer, cancel := context.WithTimeout(ctx, time.Hour*3) defer cancel() - start := time.Now() + start := dates.Now() log := logrus.WithFields(logrus.Fields{ "id": archive.ID, "org_id": archive.OrgID, @@ -177,7 +165,7 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 msgIDs = append(msgIDs, msgID) // keep track of the number of visible messages, ie, not deleted - if visibility != "D" { + if visibility != visibilityDeletedByUser && visibility != visibilityDeletedBySender { visibleCount++ } } @@ -196,7 +184,7 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 ctx, cancel := context.WithTimeout(ctx, time.Minute*15) defer cancel() - start := time.Now() + start := dates.Now() // start our transaction tx, err := db.BeginTxx(ctx, nil) @@ -204,13 +192,7 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 return err } - // first update our delete_reason - err = executeInQuery(ctx, tx, setMessageDeleteReason, idBatch) - if err != nil { - return errors.Wrap(err, "error updating delete reason") - } - - // now delete any channel logs + // first delete any channel logs err = executeInQuery(ctx, tx, deleteMessageLogs, idBatch) if err != nil { return errors.Wrap(err, "error removing channel logs") @@ -234,7 +216,7 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 return errors.Wrap(err, "error committing message delete transaction") } - log.WithField("elapsed", time.Since(start)).WithField("count", len(idBatch)).Debug("deleted batch of messages") + log.WithField("elapsed", dates.Since(start)).WithField("count", len(idBatch)).Debug("deleted batch of messages") cancel() } @@ -242,7 +224,7 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 outer, cancel = context.WithTimeout(ctx, time.Minute) defer cancel() - deletedOn := time.Now() + deletedOn := dates.Now() // all went well! mark our archive as no longer needing deletion _, err = db.ExecContext(outer, setArchiveDeleted, archive.ID, deletedOn) @@ -252,7 +234,7 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 archive.NeedsDeletion = false archive.DeletedOn = &deletedOn - logrus.WithField("elapsed", time.Since(start)).Info("completed deleting messages") + logrus.WithField("elapsed", dates.Since(start)).Info("completed deleting messages") return nil } @@ -274,7 +256,7 @@ LIMIT 1000000; // DeleteBroadcasts deletes all broadcasts older than 90 days for the passed in org which have no active messages on them func DeleteBroadcasts(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, org Org) error { - start := time.Now() + start := dates.Now() threshhold := now.AddDate(0, 0, -org.RetentionPeriod) rows, err := db.QueryxContext(ctx, selectOldOrgBroadcasts, org.ID, threshhold) @@ -290,7 +272,7 @@ func DeleteBroadcasts(ctx context.Context, now time.Time, config *Config, db *sq } // been deleting this org more than an hour? thats enough for today, exit out - if time.Since(start) > time.Hour { + if dates.Since(start) > time.Hour { break } @@ -363,7 +345,7 @@ func DeleteBroadcasts(ctx context.Context, now time.Time, config *Config, db *sq if count > 0 { logrus.WithFields(logrus.Fields{ - "elapsed": time.Since(start), + "elapsed": dates.Since(start), "count": count, "org_id": org.ID, }).Info("completed deleting broadcasts") diff --git a/archives/runs.go b/archives/runs.go index ee6ecfa..6cab301 100644 --- a/archives/runs.go +++ b/archives/runs.go @@ -8,12 +8,22 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/jmoiron/sqlx" + "github.com/nyaruka/gocommon/dates" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) +const ( + RunStatusActive = "A" + RunStatusWaiting = "W" + RunStatusCompleted = "C" + RunStatusExpired = "X" + RunStatusInterrupted = "I" + RunStatusFailed = "F" +) + const lookupFlowRuns = ` -SELECT rec.exited_on, row_to_json(rec) +SELECT rec.uuid, rec.exited_on, row_to_json(rec) FROM ( SELECT fr.id as id, @@ -29,24 +39,15 @@ FROM ( SELECT key, jsonb_build_object('name', value -> 'name', 'value', value -> 'value', 'input', value -> 'input', 'time', (value -> 'created_on')::text::timestamptz, 'category', value -> 'category', 'node', value -> 'node_uuid') as value FROM jsonb_each(fr.results::jsonb)) AS values_data ) as values, - CASE - WHEN $1 - THEN '[]'::jsonb - ELSE - coalesce(fr.events, '[]'::jsonb) - END AS events, fr.created_on, fr.modified_on, fr.exited_on, CASE - WHEN exit_type = 'C' - THEN 'completed' - WHEN exit_type = 'I' - THEN 'interrupted' - WHEN exit_type = 'E' - THEN 'expired' - ELSE - null + WHEN status = 'C' THEN 'completed' + WHEN status = 'I' THEN 'interrupted' + WHEN status = 'X' THEN 'expired' + WHEN status = 'F' THEN 'failed' + ELSE NULL END as exit_type, a.username as submitted_by @@ -55,7 +56,7 @@ FROM ( JOIN LATERAL (SELECT uuid, name FROM flows_flow WHERE flows_flow.id = fr.flow_id) AS flow_struct ON True JOIN LATERAL (SELECT uuid, name FROM contacts_contact cc WHERE cc.id = fr.contact_id) AS contact_struct ON True - WHERE fr.org_id = $2 AND fr.modified_on >= $3 AND fr.modified_on < $4 + WHERE fr.org_id = $1 AND fr.modified_on >= $2 AND fr.modified_on < $3 ORDER BY fr.modified_on ASC, id ASC ) as rec; ` @@ -63,27 +64,30 @@ FROM ( // writeRunRecords writes the runs in the archive's date range to the passed in writer func writeRunRecords(ctx context.Context, db *sqlx.DB, archive *Archive, writer *bufio.Writer) (int, error) { var rows *sqlx.Rows - rows, err := db.QueryxContext(ctx, lookupFlowRuns, archive.Org.IsAnon, archive.Org.ID, archive.StartDate, archive.endDate()) + rows, err := db.QueryxContext(ctx, lookupFlowRuns, archive.Org.ID, archive.StartDate, archive.endDate()) if err != nil { return 0, errors.Wrapf(err, "error querying run records for org: %d", archive.Org.ID) } defer rows.Close() recordCount := 0 + + var runUUID string + var runExitedOn *time.Time var record string - var exitedOn *time.Time - for rows.Next() { - err = rows.Scan(&exitedOn, &record) - // shouldn't be archiving an active run, that's an error - if exitedOn == nil { - return 0, fmt.Errorf("run still active, cannot archive: %s", record) - } + for rows.Next() { + err = rows.Scan(&runUUID, &runExitedOn, &record) if err != nil { return 0, errors.Wrapf(err, "error scanning run record for org: %d", archive.Org.ID) } + // shouldn't be archiving an active run, that's an error + if runExitedOn == nil { + return 0, fmt.Errorf("run %s still active, cannot archive", runUUID) + } + writer.WriteString(record) writer.WriteString("\n") recordCount++ @@ -93,24 +97,13 @@ func writeRunRecords(ctx context.Context, db *sqlx.DB, archive *Archive, writer } const selectOrgRunsInRange = ` -SELECT fr.id, fr.is_active +SELECT fr.id, fr.status FROM flows_flowrun fr LEFT JOIN contacts_contact cc ON cc.id = fr.contact_id WHERE fr.org_id = $1 AND fr.modified_on >= $2 AND fr.modified_on < $3 ORDER BY fr.modified_on ASC, fr.id ASC ` -const setRunDeleteReason = ` -UPDATE flows_flowrun -SET delete_reason = 'A' -WHERE id IN(?) -` - -const deleteRecentRuns = ` -DELETE FROM flows_flowpathrecentrun -WHERE run_id IN(?) -` - const deleteRuns = ` DELETE FROM flows_flowrun WHERE id IN(?) @@ -124,7 +117,7 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie outer, cancel := context.WithTimeout(ctx, time.Hour*3) defer cancel() - start := time.Now() + start := dates.Now() log := logrus.WithFields(logrus.Fields{ "id": archive.ID, "org_id": archive.OrgID, @@ -154,18 +147,18 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie defer rows.Close() var runID int64 - var isActive bool + var status string runCount := 0 runIDs := make([]int64, 0, archive.RecordCount) for rows.Next() { - err = rows.Scan(&runID, &isActive) + err = rows.Scan(&runID, &status) if err != nil { return err } // if this run is still active, something has gone wrong, throw an error - if isActive { - return fmt.Errorf("run %d in archive is still active", runID) + if status == RunStatusActive || status == RunStatusWaiting { + return fmt.Errorf("run #%d in archive hadn't exited", runID) } // increment our count @@ -187,7 +180,7 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie ctx, cancel := context.WithTimeout(ctx, time.Minute*15) defer cancel() - start := time.Now() + start := dates.Now() // start our transaction tx, err := db.BeginTxx(ctx, nil) @@ -195,19 +188,7 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie return err } - // first update our delete_reason - err = executeInQuery(ctx, tx, setRunDeleteReason, idBatch) - if err != nil { - return errors.Wrap(err, "error updating delete reason") - } - - // any recent runs - err = executeInQuery(ctx, tx, deleteRecentRuns, idBatch) - if err != nil { - return errors.Wrap(err, "error deleting recent runs") - } - - // finally, delete our runs + // delete our runs err = executeInQuery(ctx, tx, deleteRuns, idBatch) if err != nil { return errors.Wrap(err, "error deleting runs") @@ -219,7 +200,7 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie return errors.Wrap(err, "error committing run delete transaction") } - log.WithField("elapsed", time.Since(start)).WithField("count", len(idBatch)).Debug("deleted batch of runs") + log.WithField("elapsed", dates.Since(start)).WithField("count", len(idBatch)).Debug("deleted batch of runs") cancel() } @@ -227,7 +208,7 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie outer, cancel = context.WithTimeout(ctx, time.Minute) defer cancel() - deletedOn := time.Now() + deletedOn := dates.Now() // all went well! mark our archive as no longer needing deletion _, err = db.ExecContext(outer, setArchiveDeleted, archive.ID, deletedOn) @@ -237,7 +218,7 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie archive.NeedsDeletion = false archive.DeletedOn = &deletedOn - logrus.WithField("elapsed", time.Since(start)).Info("completed deleting runs") + logrus.WithField("elapsed", dates.Since(start)).Info("completed deleting runs") return nil } diff --git a/archives/s3.go b/archives/s3.go index 2a755e6..02c8d48 100644 --- a/archives/s3.go +++ b/archives/s3.go @@ -47,7 +47,6 @@ func NewS3Client(config *Config) (s3iface.S3API, error) { return nil, err } - logrus.Info("s3 bucket ok") return s3Client, nil } diff --git a/archives/testdata/messages1.jsonl b/archives/testdata/messages1.jsonl index f1c6bda..6b53f3e 100644 --- a/archives/testdata/messages1.jsonl +++ b/archives/testdata/messages1.jsonl @@ -1,3 +1,3 @@ -{"id":1,"broadcast":null,"contact":{"uuid":"3e814add-e614-41f7-8b5d-a07f670a698f","name":"Ajodinabiff Dane"},"urn":"tel:+12067797777","channel":{"uuid":"60f2ed5b-05f2-4156-9ff0-e44e90da1b85","name":"Channel 2"},"direction":"in","type":"inbox","status":"handled","visibility":"visible","text":"message 1","attachments":[],"labels":[{"name": "Label 1", "uuid": "1d9e3188-b74b-4ae0-a166-0de31aedb34a"}, {"name": "Label 2", "uuid": "c5a69101-8dc3-444f-8b0b-5ab816e46eec"}],"created_on":"2017-08-12T21:11:59.890662+00:00","sent_on":"2017-08-12T21:11:59.890662+00:00","modified_on":"2017-08-12T21:11:59.890662+00:00"} -{"id":3,"broadcast":null,"contact":{"uuid":"3e814add-e614-41f7-8b5d-a07f670a698f","name":"Ajodinabiff Dane"},"urn":"tel:+12067797777","channel":null,"direction":"out","type":"inbox","status":"handled","visibility":"visible","text":"message 3","attachments":[{"url": "https://foo.bar/image1.png", "content_type": "image/png"}, {"url": "https://foo.bar/image2.png", "content_type": "image/png"}],"labels":[{"name": "Label 2", "uuid": "c5a69101-8dc3-444f-8b0b-5ab816e46eec"}],"created_on":"2017-08-12T21:11:59.890662+00:00","sent_on":"2017-08-12T21:11:59.890662+00:00","modified_on":"2017-08-12T21:11:59.890662+00:00"} -{"id":9,"broadcast":null,"contact":{"uuid":"3e814add-e614-41f7-8b5d-a07f670a698f","name":"Ajodinabiff Dane"},"urn":null,"channel":null,"direction":"out","type":"flow","status":"sent","visibility":"visible","text":"message 9","attachments":[],"labels":[],"created_on":"2017-08-12T21:11:59.890662+00:00","sent_on":"2017-08-12T21:11:59.890662+00:00","modified_on":"2017-08-12T21:11:59.890662+00:00"} +{"id":1,"broadcast":null,"contact":{"uuid":"3e814add-e614-41f7-8b5d-a07f670a698f","name":"Ajodinabiff Dane"},"urn":"tel:+12067797777","channel":{"uuid":"60f2ed5b-05f2-4156-9ff0-e44e90da1b85","name":"Channel 2"},"flow":null,"direction":"in","type":"inbox","status":"handled","visibility":"visible","text":"message 1","attachments":[],"labels":[{"name": "Label 1", "uuid": "1d9e3188-b74b-4ae0-a166-0de31aedb34a"}, {"name": "Label 2", "uuid": "c5a69101-8dc3-444f-8b0b-5ab816e46eec"}],"created_on":"2017-08-12T21:11:59.890662+00:00","sent_on":"2017-08-12T21:11:59.890662+00:00","modified_on":"2017-08-12T21:11:59.890662+00:00"} +{"id":3,"broadcast":null,"contact":{"uuid":"3e814add-e614-41f7-8b5d-a07f670a698f","name":"Ajodinabiff Dane"},"urn":"tel:+12067797777","channel":null,"flow":null,"direction":"out","type":"inbox","status":"handled","visibility":"visible","text":"message 3","attachments":[{"url": "https://foo.bar/image1.png", "content_type": "image/png"}, {"url": "https://foo.bar/image2.png", "content_type": "image/png"}],"labels":[{"name": "Label 2", "uuid": "c5a69101-8dc3-444f-8b0b-5ab816e46eec"}],"created_on":"2017-08-12T21:11:59.890662+00:00","sent_on":"2017-08-12T21:11:59.890662+00:00","modified_on":"2017-08-12T21:11:59.890662+00:00"} +{"id":9,"broadcast":null,"contact":{"uuid":"3e814add-e614-41f7-8b5d-a07f670a698f","name":"Ajodinabiff Dane"},"urn":null,"channel":null,"flow":{"uuid":"3914b88e-625b-4603-bd9f-9319dc331c6b","name":"Flow 3"},"direction":"out","type":"flow","status":"sent","visibility":"visible","text":"message 9","attachments":[],"labels":[],"created_on":"2017-08-12T21:11:59.890662+00:00","sent_on":"2017-08-12T21:11:59.890662+00:00","modified_on":"2017-08-12T21:11:59.890662+00:00"} diff --git a/archives/testdata/messages2.jsonl b/archives/testdata/messages2.jsonl index a01bd44..01f5bc6 100644 --- a/archives/testdata/messages2.jsonl +++ b/archives/testdata/messages2.jsonl @@ -1 +1 @@ -{"id":5,"broadcast":null,"contact":{"uuid":"7051dff0-0a27-49d7-af1f-4494239139e6","name":"Joanne Stone"},"urn":null,"channel":{"uuid":"b79e0054-068f-4928-a5f4-339d10a7ad5a","name":"Channel 3"},"direction":"in","type":"inbox","status":"handled","visibility":"visible","text":"message 5","attachments":[],"labels":[],"created_on":"2017-08-11T19:11:59.890662+00:00","sent_on":"2017-08-11T19:11:59.890662+00:00","modified_on":"2017-08-11T19:11:59.890662+00:00"} +{"id":5,"broadcast":null,"contact":{"uuid":"7051dff0-0a27-49d7-af1f-4494239139e6","name":"Joanne Stone"},"urn":null,"channel":{"uuid":"b79e0054-068f-4928-a5f4-339d10a7ad5a","name":"Channel 3"},"flow":null,"direction":"in","type":"inbox","status":"handled","visibility":"visible","text":"message 5","attachments":[],"labels":[],"created_on":"2017-08-11T19:11:59.890662+00:00","sent_on":"2017-08-11T19:11:59.890662+00:00","modified_on":"2017-08-11T19:11:59.890662+00:00"} diff --git a/archives/testdata/runs1.jsonl b/archives/testdata/runs1.jsonl index 5ca0964..3e7f8ea 100644 --- a/archives/testdata/runs1.jsonl +++ b/archives/testdata/runs1.jsonl @@ -1,2 +1,2 @@ -{"id":1,"uuid":"4ced1260-9cfe-4b7f-81dd-b637108f15b9","flow":{"uuid":"6639286a-9120-45d4-aa39-03ae3942a4a6","name":"Flow 1"},"contact":{"uuid":"3e814add-e614-41f7-8b5d-a07f670a698f","name":"Ajodinabiff Dane"},"responded":true,"path":[],"values":{},"events":[],"created_on":"2017-08-12T19:11:59.890662+00:00","modified_on":"2017-08-12T19:11:59.890662+00:00","exited_on":"2017-08-12T19:11:59.890662+00:00","exit_type":"completed","submitted_by":null} -{"id":2,"uuid":"7d68469c-0494-498a-bdf3-bac68321fd6d","flow":{"uuid":"6639286a-9120-45d4-aa39-03ae3942a4a6","name":"Flow 1"},"contact":{"uuid":"3e814add-e614-41f7-8b5d-a07f670a698f","name":"Ajodinabiff Dane"},"responded":true,"path":[{"node": "10896d63-8df7-4022-88dd-a9d93edf355b", "time": "2017-08-12T13:07:24.049815+00:00"}],"values":{"agree": {"name": "Do you agree?", "node": "a0434c54-3e26-4eb0-bafc-46cdeaf435ac", "time": "2017-05-03T12:25:21.714339+00:00", "input": "A", "value": "A", "category": "Strongly agree"}},"events":[{"msg": {"urn": "tel:+12076661212", "text": "hola", "uuid": "cf05c58f-31fb-4ce8-9e65-4ecc9fd47cbe", "channel": {"name": "1223", "uuid": "bbfe2e9c-cf69-4d0a-b42e-00ac3dc0b0b8"}}, "type": "msg_created", "step_uuid": "659cdae5-1f29-4a58-9437-10421f724268", "created_on": "2018-01-22T15:06:47.357682+00:00"}],"created_on":"2017-08-12T19:11:59.890662+00:00","modified_on":"2017-08-12T19:11:59.890662+00:00","exited_on":"2017-08-12T19:11:59.890662+00:00","exit_type":"completed","submitted_by":null} +{"id":1,"uuid":"4ced1260-9cfe-4b7f-81dd-b637108f15b9","flow":{"uuid":"6639286a-9120-45d4-aa39-03ae3942a4a6","name":"Flow 1"},"contact":{"uuid":"3e814add-e614-41f7-8b5d-a07f670a698f","name":"Ajodinabiff Dane"},"responded":true,"path":[],"values":{},"created_on":"2017-08-12T19:11:59.890662+00:00","modified_on":"2017-08-12T19:11:59.890662+00:00","exited_on":"2017-08-12T19:11:59.890662+00:00","exit_type":"completed","submitted_by":null} +{"id":2,"uuid":"7d68469c-0494-498a-bdf3-bac68321fd6d","flow":{"uuid":"6639286a-9120-45d4-aa39-03ae3942a4a6","name":"Flow 1"},"contact":{"uuid":"3e814add-e614-41f7-8b5d-a07f670a698f","name":"Ajodinabiff Dane"},"responded":true,"path":[{"node": "10896d63-8df7-4022-88dd-a9d93edf355b", "time": "2017-08-12T13:07:24.049815+00:00"}],"values":{"agree": {"name": "Do you agree?", "node": "a0434c54-3e26-4eb0-bafc-46cdeaf435ac", "time": "2017-05-03T12:25:21.714339+00:00", "input": "A", "value": "A", "category": "Strongly agree"}},"created_on":"2017-08-12T19:11:59.890662+00:00","modified_on":"2017-08-12T19:11:59.890662+00:00","exited_on":"2017-08-12T19:11:59.890662+00:00","exit_type":"completed","submitted_by":null} diff --git a/archives/testdata/runs2.jsonl b/archives/testdata/runs2.jsonl index 2875b6e..3c82b3c 100644 --- a/archives/testdata/runs2.jsonl +++ b/archives/testdata/runs2.jsonl @@ -1 +1 @@ -{"id":3,"uuid":"de782b35-a398-46ed-8550-34c66053841b","flow":{"uuid":"629db399-a5fb-4fa0-88e6-f479957b63d2","name":"Flow 2"},"contact":{"uuid":"7051dff0-0a27-49d7-af1f-4494239139e6","name":"Joanne Stone"},"responded":true,"path":[{"node": "accbc6e2-b0df-46cd-9a76-bff0fdf4d753", "time": "2017-08-12T13:07:24.049815+00:00"}],"values":{"agree": {"name": "Agree", "node": "084c8cf1-715d-4d0a-b38d-a616ed74e638", "time": "2017-05-03T12:25:21.714339+00:00", "input": "A", "value": "A", "category": "Strongly agree"}, "confirm_agree": {"name": "Do you agree?", "node": "a0434c54-3e26-4eb0-bafc-46cdeaf435ab", "time": "2017-05-03T12:25:21.714339+00:00", "input": "A", "value": "A", "category": "Confirmed Strongly agree"}},"events":[],"created_on":"2017-08-10T19:11:59.890662+00:00","modified_on":"2017-08-10T19:11:59.890662+00:00","exited_on":"2017-08-10T19:11:59.890662+00:00","exit_type":"completed","submitted_by":"greg@gmail.com"} +{"id":3,"uuid":"de782b35-a398-46ed-8550-34c66053841b","flow":{"uuid":"629db399-a5fb-4fa0-88e6-f479957b63d2","name":"Flow 2"},"contact":{"uuid":"7051dff0-0a27-49d7-af1f-4494239139e6","name":"Joanne Stone"},"responded":true,"path":[{"node": "accbc6e2-b0df-46cd-9a76-bff0fdf4d753", "time": "2017-08-12T13:07:24.049815+00:00"}],"values":{"agree": {"name": "Agree", "node": "084c8cf1-715d-4d0a-b38d-a616ed74e638", "time": "2017-05-03T12:25:21.714339+00:00", "input": "A", "value": "A", "category": "Strongly agree"}, "confirm_agree": {"name": "Do you agree?", "node": "a0434c54-3e26-4eb0-bafc-46cdeaf435ab", "time": "2017-05-03T12:25:21.714339+00:00", "input": "A", "value": "A", "category": "Confirmed Strongly agree"}},"created_on":"2017-08-10T19:11:59.890662+00:00","modified_on":"2017-08-10T19:11:59.890662+00:00","exited_on":"2017-08-10T19:11:59.890662+00:00","exit_type":"completed","submitted_by":"greg@gmail.com"} diff --git a/archives/utils.go b/archives/utils.go index d50db23..0a853c0 100644 --- a/archives/utils.go +++ b/archives/utils.go @@ -1,5 +1,52 @@ package archives +import ( + "context" + "fmt" + "time" + + "github.com/jmoiron/sqlx" +) + +// helper method to safely execute an IN query in the passed in transaction +func executeInQuery(ctx context.Context, tx *sqlx.Tx, query string, ids []int64) error { + q, vs, err := sqlx.In(query, ids) + if err != nil { + return err + } + q = tx.Rebind(q) + + _, err = tx.ExecContext(ctx, q, vs...) + if err != nil { + tx.Rollback() + } + return err +} + +// counts the records in the given archives +func countRecords(as []*Archive) int { + n := 0 + for _, a := range as { + n += a.RecordCount + } + return n +} + +// removes duplicates from a slice of archives +func removeDuplicates(as []*Archive) []*Archive { + unique := make([]*Archive, 0, len(as)) + seen := make(map[string]bool) + + for _, a := range as { + key := fmt.Sprintf("%s:%s:%s", a.ArchiveType, a.Period, a.StartDate.Format(time.RFC3339)) + if !seen[key] { + unique = append(unique, a) + seen[key] = true + } + } + return unique +} + // chunks a slice of in64 IDs func chunkIDs(ids []int64, size int) [][]int64 { chunks := make([][]int64, 0, len(ids)/size+1) diff --git a/cmd/rp-archiver/main.go b/cmd/rp-archiver/main.go index c48f4d2..585cc38 100644 --- a/cmd/rp-archiver/main.go +++ b/cmd/rp-archiver/main.go @@ -1,9 +1,9 @@ package main import ( - "context" "os" "strings" + "sync" "time" "github.com/aws/aws-sdk-go/service/s3/s3iface" @@ -11,12 +11,20 @@ import ( "github.com/jmoiron/sqlx" _ "github.com/lib/pq" "github.com/nyaruka/ezconf" + "github.com/nyaruka/gocommon/analytics" + "github.com/nyaruka/gocommon/dates" "github.com/nyaruka/rp-archiver/archives" "github.com/sirupsen/logrus" ) +var ( + // https://goreleaser.com/cookbooks/using-main.version + version = "dev" + date = "unknown" +) + func main() { - config := archives.NewConfig() + config := archives.NewDefaultConfig() loader := ezconf.NewLoader(&config, "archiver", "Archives RapidPro runs and msgs to S3", []string{"archiver.toml"}) loader.MustLoad() @@ -24,15 +32,15 @@ func main() { logrus.Fatal("cannot delete archives and also not upload to s3") } - // configure our logger - logrus.SetOutput(os.Stdout) - logrus.SetFormatter(&logrus.TextFormatter{}) - level, err := logrus.ParseLevel(config.LogLevel) if err != nil { logrus.Fatalf("Invalid log level '%s'", level) } + logrus.SetLevel(level) + logrus.SetOutput(os.Stdout) + logrus.SetFormatter(&logrus.TextFormatter{}) + logrus.WithField("version", version).WithField("released", date).Info("starting archiver") // if we have a DSN entry, try to initialize it if config.SentryDSN != "" { @@ -62,87 +70,82 @@ func main() { db, err := sqlx.Open("postgres", config.DB) if err != nil { logrus.Fatal(err) + } else { + db.SetMaxOpenConns(2) + logrus.WithField("state", "starting").Info("db ok") } - db.SetMaxOpenConns(2) var s3Client s3iface.S3API if config.UploadToS3 { s3Client, err = archives.NewS3Client(config) if err != nil { logrus.WithError(err).Fatal("unable to initialize s3 client") + } else { + logrus.WithField("state", "starting").Info("s3 bucket ok") } } + wg := &sync.WaitGroup{} + // ensure that we can actually write to the temp directory err = archives.EnsureTempArchiveDirectory(config.TempDir) if err != nil { logrus.WithError(err).Fatal("cannot write to temp directory") + } else { + logrus.WithField("state", "starting").Info("tmp file access ok") } - for { - start := time.Now().In(time.UTC) + // parse our start time + timeOfDay, err := dates.ParseTimeOfDay("tt:mm", config.StartTime) + if err != nil { + logrus.WithError(err).Fatal("invalid start time supplied, format: HH:MM") + } - // convert the starttime to time.Time - layout := "15:04" - hour, err := time.Parse(layout, config.StartTime) - if err != nil { - logrus.WithError(err).Fatal("invalid start time supplied, format: HH:mm") - } + // if we have a librato token, configure it + if config.LibratoToken != "" { + analytics.RegisterBackend(analytics.NewLibrato(config.LibratoUsername, config.LibratoToken, config.InstanceName, time.Second, wg)) + } - // get our active orgs - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - orgs, err := archives.GetActiveOrgs(ctx, db, config) - cancel() + analytics.Start() - if err != nil { - logrus.WithError(err).Error("error getting active orgs") - time.Sleep(time.Minute * 5) - continue - } + if config.Once { + doArchival(db, config, s3Client) + } else { + for { + nextArchival := getNextArchivalTime(timeOfDay) + napTime := time.Until(nextArchival) - // for each org, do our export - for _, org := range orgs { - // no single org should take more than 12 hours - ctx, cancel := context.WithTimeout(context.Background(), time.Hour*12) - log := logrus.WithField("org", org.Name).WithField("org_id", org.ID) - - if config.ArchiveMessages { - _, _, err = archives.ArchiveOrg(ctx, time.Now(), config, db, s3Client, org, archives.MessageType) - if err != nil { - log.WithError(err).WithField("archive_type", archives.MessageType).Error("error archiving org messages") - } - } - if config.ArchiveRuns { - _, _, err = archives.ArchiveOrg(ctx, time.Now(), config, db, s3Client, org, archives.RunType) - if err != nil { - log.WithError(err).WithField("archive_type", archives.RunType).Error("error archiving org runs") - } - } - - cancel() - } + logrus.WithField("sleep_time", napTime).WithField("next_archival", nextArchival).Info("sleeping until next archival") + time.Sleep(napTime) - // ok, we did all our work for our orgs, quit if so configured or sleep until the next day - if config.ExitOnCompletion { - break + doArchival(db, config, s3Client) } + } - // build up our next start - now := time.Now().In(time.UTC) - nextDay := time.Date(now.Year(), now.Month(), now.Day(), hour.Hour(), hour.Minute(), 0, 0, time.UTC) + analytics.Stop() + wg.Wait() +} - // if this time is before our actual start, add a day - if nextDay.Before(start) { - nextDay = nextDay.AddDate(0, 0, 1) +func doArchival(db *sqlx.DB, cfg *archives.Config, s3Client s3iface.S3API) { + for { + // try to archive all active orgs, and if it fails, wait 5 minutes and try again + err := archives.ArchiveActiveOrgs(db, cfg, s3Client) + if err != nil { + logrus.WithError(err).Error("error archiving, will retry in 5 minutes") + time.Sleep(time.Minute * 5) + continue + } else { + break } + } +} - napTime := nextDay.Sub(time.Now().In(time.UTC)) +func getNextArchivalTime(tod dates.TimeOfDay) time.Time { + t := dates.ExtractDate(dates.Now().In(time.UTC)).Combine(tod, time.UTC) - if napTime > time.Duration(0) { - logrus.WithField("time", napTime).WithField("next_start", nextDay).Info("Sleeping until next UTC day") - time.Sleep(napTime) - } else { - logrus.WithField("next_start", nextDay).Info("Rebuilding immediately without sleep") - } + // if this time is in the past, add a day + if t.Before(dates.Now()) { + t = t.Add(time.Hour * 24) } + return t } diff --git a/go.mod b/go.mod index 686190b..bd08b0f 100644 --- a/go.mod +++ b/go.mod @@ -1,35 +1,30 @@ module github.com/nyaruka/rp-archiver +go 1.18 + require ( - github.com/aws/aws-sdk-go v1.13.47 - github.com/certifi/gocertifi v0.0.0-20180118203423-deb3ae2ef261 // indirect - github.com/davecgh/go-spew v1.1.0 // indirect - github.com/evalphobia/logrus_sentry v0.4.5 - github.com/getsentry/raven-go v0.0.0-20180430182053-263040ce1a36 // indirect - github.com/go-ini/ini v1.36.0 // indirect - github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8 // indirect - github.com/jmoiron/sqlx v1.2.0 - github.com/kylelemons/godebug v1.1.0 // indirect - github.com/lib/pq v1.0.0 + github.com/aws/aws-sdk-go v1.44.44 + github.com/evalphobia/logrus_sentry v0.8.2 + github.com/jmoiron/sqlx v1.3.5 + github.com/lib/pq v1.10.6 github.com/nyaruka/ezconf v0.2.1 - github.com/pkg/errors v0.8.1 - github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/sirupsen/logrus v1.0.5 - github.com/smartystreets/goconvey v1.6.4 // indirect - github.com/stretchr/testify v1.2.1 - golang.org/x/net v0.0.0-20191119073136-fc4aabc6c914 // indirect - google.golang.org/appengine v1.6.5 // indirect - gopkg.in/airbrake/gobrake.v2 v2.0.9 // indirect - gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 // indirect - gopkg.in/ini.v1 v1.51.0 // indirect + github.com/nyaruka/gocommon v1.22.4 + github.com/pkg/errors v0.9.1 + github.com/sirupsen/logrus v1.8.1 + github.com/stretchr/testify v1.8.0 ) require ( - github.com/fatih/structs v1.0.0 // indirect + github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/fatih/structs v1.1.0 // indirect + github.com/getsentry/raven-go v0.2.0 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect github.com/naoina/go-stringutil v0.1.0 // indirect github.com/naoina/toml v0.1.1 // indirect - golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 // indirect - golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a // indirect + github.com/nyaruka/librato v1.0.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) - -go 1.17 diff --git a/go.sum b/go.sum index 4b700ac..f04c336 100644 --- a/go.sum +++ b/go.sum @@ -1,70 +1,75 @@ -github.com/aws/aws-sdk-go v1.13.47 h1:sht0j3Vg76sftGWhMMPa9j0QnJbYGIe/327+ALltkgQ= -github.com/aws/aws-sdk-go v1.13.47/go.mod h1:ZRmQr0FajVIyZ4ZzBYKG5P3ZqPz9IHG41ZoMu1ADI3k= -github.com/certifi/gocertifi v0.0.0-20180118203423-deb3ae2ef261 h1:6/yVvBsKeAw05IUj4AzvrxaCnDjN4nUqKjW9+w5wixg= -github.com/certifi/gocertifi v0.0.0-20180118203423-deb3ae2ef261/go.mod h1:GJKEexRPVJrBSOjoqN5VNOIKJ5Q3RViH6eu3puDRwx4= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/aws/aws-sdk-go v1.44.44 h1:XLEcUxILvVBYO/frO+TTCd8NIxklX/ZOzSJSBZ+b7B8= +github.com/aws/aws-sdk-go v1.44.44/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= +github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d h1:S2NE3iHSwP0XV47EEXL8mWmRdEfGscSJ+7EgePNgt0s= +github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/evalphobia/logrus_sentry v0.4.5 h1:weRoBjojMYPp57TLDjPEkP58JVHHSiqNrxG+h3ODdPM= -github.com/evalphobia/logrus_sentry v0.4.5/go.mod h1:pKcp+vriitUqu9KiWj/VRFbRfFNUwz95/UkgG8a6MNc= -github.com/fatih/structs v1.0.0 h1:BrX964Rv5uQ3wwS+KRUAJCBBw5PQmgJfJ6v4yly5QwU= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/evalphobia/logrus_sentry v0.8.2 h1:dotxHq+YLZsT1Bb45bB5UQbfCh3gM/nFFetyN46VoDQ= +github.com/evalphobia/logrus_sentry v0.8.2/go.mod h1:pKcp+vriitUqu9KiWj/VRFbRfFNUwz95/UkgG8a6MNc= github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= -github.com/getsentry/raven-go v0.0.0-20180430182053-263040ce1a36 h1:i93kN7TI/4T5Wx9hsBct/G1yCcZ9tLyWESh7QA7neaI= -github.com/getsentry/raven-go v0.0.0-20180430182053-263040ce1a36/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ= -github.com/go-ini/ini v1.36.0 h1:63En8accP8FKkFZ77ztSfvQf9kGRJN3qBIdItP46RRk= -github.com/go-ini/ini v1.36.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= -github.com/go-sql-driver/mysql v1.4.0 h1:7LxgVwFb2hIQtMm87NdgAVfXjnt4OePseqT1tKx+opk= -github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= -github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= -github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= -github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8 h1:12VvqtR6Aowv3l/EQUlocDHW2Cp4G9WJVH7uyH8QFJE= -github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= -github.com/jmoiron/sqlx v1.2.0 h1:41Ip0zITnmWNR/vHV+S4m+VoUivnWY5E4OJfLZjCJMA= -github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= -github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= -github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= +github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= +github.com/getsentry/raven-go v0.2.0 h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs= +github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ= +github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= +github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g= +github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= -github.com/lib/pq v1.0.0 h1:X5PMW56eZitiTeO7tKzZxFCSpbFZJtkMMooicw2us9A= -github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= -github.com/mattn/go-sqlite3 v1.9.0 h1:pDRiWfl+++eC2FEFRy6jXmQlvp4Yh3z1MJKg4UeYM/4= -github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= +github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.10.6 h1:jbk+ZieJ0D7EVGJYpL9QTz7/YW6UHbmdnZWYyK5cdBs= +github.com/lib/pq v1.10.6/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg= +github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/naoina/go-stringutil v0.1.0 h1:rCUeRUHjBjGTSHl0VC00jUPLz8/F9dDzYI70Hzifhks= github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0= github.com/naoina/toml v0.1.1 h1:PT/lllxVVN0gzzSqSlHEmP8MJB4MY2U7STGxiouV4X8= github.com/naoina/toml v0.1.1/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E= github.com/nyaruka/ezconf v0.2.1 h1:TDXWoqjqYya1uhou1mAJZg7rgFYL98EB0Tb3+BWtUh0= github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDYYxgnQw= -github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= -github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/nyaruka/gocommon v1.22.4 h1:NCAItnrQbXlipDeOszoYbjXEFa1J1M+alS8VSk/uero= +github.com/nyaruka/gocommon v1.22.4/go.mod h1:g6/d9drZXDUrtRSPe2Kf8lTUS+baHt/0G0dwHq3qeIU= +github.com/nyaruka/librato v1.0.0 h1:Vznj9WCeC1yZXbBYyYp40KnbmXLbEkjKmHesV/v2SR0= +github.com/nyaruka/librato v1.0.0/go.mod h1:pkRNLFhFurOz0QqBz6/DuTFhHHxAubWxs4Jx+J7yUgg= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/sirupsen/logrus v1.0.5 h1:8c8b5uO0zS4X6RPl/sd1ENwSkIc0/H2PaHxE3udaE8I= -github.com/sirupsen/logrus v1.0.5/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc= -github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= -github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= -github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= -github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= -github.com/stretchr/testify v1.2.1 h1:52QO5WkIUcHGIR7EnGagH88x1bUzqGXTC5/1bDTUQ7U= -github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= -golang.org/x/net v0.0.0-20191119073136-fc4aabc6c914 h1:MlY3mEfbnWGmUi4rtHOtNnnnN4UJRGSyLPx+DXA5Sq4= -golang.org/x/net v0.0.0-20191119073136-fc4aabc6c914/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= -golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= +github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20220614195744-fb05da6f9022 h1:0qjDla5xICC2suMtyRH/QqX3B1btXTfNsIt/i4LFgO0= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b h1:2n253B2r0pYSmEV+UNCQoPfU/FiaizQEK5Gu4Bq4JE8= +golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -google.golang.org/appengine v1.6.5 h1:tycE03LOZYQNhDpS27tcQdAzLCVMaj7QT2SXxebnpCM= -google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= -gopkg.in/airbrake/gobrake.v2 v2.0.9 h1:7z2uVWwn7oVeeugY1DtlPAy5H+KYgB1KeKTnqjNatLo= -gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= -gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 h1:OAj3g0cR6Dx/R07QgQe8wkA9RNjB2u4i700xBkIT4e0= -gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo= -gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno= -gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/goreleaser.yml b/goreleaser.yml index 18a9749..698c2cf 100644 --- a/goreleaser.yml +++ b/goreleaser.yml @@ -7,6 +7,7 @@ build: - linux goarch: - amd64 + - arm64 archives: - id: main diff --git a/testdb.sql b/testdb.sql index 27d78f4..4bacc63 100644 --- a/testdb.sql +++ b/testdb.sql @@ -62,6 +62,13 @@ CREATE TABLE contacts_contactgroup_contacts ( contact_id integer NOT NULL ); +DROP TABLE IF EXISTS flows_flow CASCADE; +CREATE TABLE flows_flow ( + id serial primary key, + uuid character varying(36) NOT NULL, + name character varying(128) NOT NULL +); + DROP TABLE IF EXISTS channels_channellog CASCADE; DROP TABLE IF EXISTS msgs_msg_labels CASCADE; DROP TABLE IF EXISTS msgs_msg CASCADE; @@ -88,9 +95,9 @@ CREATE TABLE msgs_msg ( contact_id integer NOT NULL references contacts_contact(id) on delete cascade, contact_urn_id integer NULL references contacts_contacturn(id) on delete cascade, org_id integer NOT NULL references orgs_org(id) on delete cascade, + flow_id integer NULL references flows_flow(id) on delete cascade, metadata text, - topup_id integer, - delete_reason char(1) NULL + topup_id integer ); DROP TABLE IF EXISTS msgs_broadcast_recipients; @@ -145,13 +152,6 @@ CREATE TABLE msgs_msg_labels ( label_id integer NOT NULL ); -DROP TABLE IF EXISTS flows_flow CASCADE; -CREATE TABLE flows_flow ( - id serial primary key, - uuid character varying(36) NOT NULL, - name character varying(128) NOT NULL -); - DROP TABLE IF EXISTS auth_user CASCADE; CREATE TABLE auth_user ( id serial primary key, @@ -159,12 +159,10 @@ CREATE TABLE auth_user ( ); DROP TABLE IF EXISTS api_webhookevent CASCADE; -DROP TABLE IF EXISTS flows_flowpathrecentrun CASCADE; DROP TABLE IF EXISTS flows_actionlog CASCADE; DROP TABLE IF EXISTS flows_flowrun CASCADE; CREATE TABLE flows_flowrun ( id serial primary key, - is_active boolean NOT NULL DEFAULT FALSE, uuid character varying(36) NOT NULL UNIQUE, responded boolean NOT NULL, contact_id integer NOT NULL references contacts_contact(id), @@ -172,15 +170,12 @@ CREATE TABLE flows_flowrun ( org_id integer NOT NULL references orgs_org(id), results text NOT NULL, path text NOT NULL, - events jsonb NOT NULL, - parent_id integer NULL references flows_flowrun(id), created_on timestamp with time zone NOT NULL, modified_on timestamp with time zone NOT NULL, exited_on timestamp with time zone NULL, submitted_by_id integer NULL references auth_user(id), status varchar(1) NOT NULL, - exit_type varchar(1) NULL, - delete_reason char(1) NULL + delete_from_results boolean ); DROP TABLE IF EXISTS archives_archive CASCADE; @@ -206,11 +201,6 @@ CREATE TABLE channels_channellog ( msg_id integer NOT NULL references msgs_msg(id) ); -CREATE TABLE flows_flowpathrecentrun ( - id serial primary key, - run_id integer NOT NULL references flows_flowrun(id) DEFERRABLE INITIALLY DEFERRED -); - INSERT INTO orgs_org(id, name, is_active, is_anon, created_on) VALUES (1, 'Org 1', TRUE, FALSE, '2017-11-10 21:11:59.890662+00'), (2, 'Org 2', TRUE, FALSE, '2017-08-10 21:11:59.890662+00'), @@ -262,21 +252,27 @@ INSERT INTO contacts_contactgroup_contacts(id, contact_id, contactgroup_id) VALU (3, 1, 4), (4, 3, 4); +INSERT INTO flows_flow(id, uuid, name) VALUES +(1, '6639286a-9120-45d4-aa39-03ae3942a4a6', 'Flow 1'), +(2, '629db399-a5fb-4fa0-88e6-f479957b63d2', 'Flow 2'), +(3, '3914b88e-625b-4603-bd9f-9319dc331c6b', 'Flow 3'), +(4, 'cfa2371d-2f06-481d-84b2-d974f3803bb0', 'Flow 4'); + INSERT INTO msgs_broadcast(id, text, created_on, purged, org_id, schedule_id) VALUES (1, 'eng=>"hello",fre=>"bonjour"'::hstore, '2017-08-12 22:11:59.890662+02:00', TRUE, 2, 1), (2, 'base=>"hola"'::hstore, '2017-08-12 22:11:59.890662+02:00', TRUE, 2, NULL), (3, 'base=>"not purged"'::hstore, '2017-08-12 19:11:59.890662+02:00', FALSE, 2, NULL), (4, 'base=>"new"'::hstore, '2019-08-12 19:11:59.890662+02:00', FALSE, 2, NULL); -INSERT INTO msgs_msg(id, broadcast_id, uuid, text, created_on, sent_on, modified_on, direction, status, visibility, msg_type, attachments, channel_id, contact_id, contact_urn_id, org_id, msg_count, error_count, next_attempt) VALUES -(1, NULL, '2f969340-704a-4aa2-a1bd-2f832a21d257', 'message 1', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00'), -(2, NULL, 'abe87ac1-015c-4803-be29-1e89509fe682', 'message 2', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'D', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00'), -(3, NULL, 'a7e83a22-a6ff-4e18-82d0-19545640ccba', 'message 3', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'O', 'H', 'V', 'I', '{"image/png:https://foo.bar/image1.png", "image/png:https://foo.bar/image2.png"}', NULL, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00'), -(4, NULL, '1cad36af-5581-4c8a-81cd-83708398f61e', 'message 4', '2017-08-13 21:11:59.890662+00', '2017-08-13 21:11:59.890662+00', '2017-08-13 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-13 21:11:59.890662+00'), -(5, NULL, 'f557972e-2eb5-42fa-9b87-902116d18787', 'message 5', '2017-08-11 21:11:59.890662+02:00', '2017-08-11 21:11:59.890662+02:00', '2017-08-11 21:11:59.890662+02:00', 'I', 'H', 'V', 'I', NULL, 3, 7, 8, 3, 1, 0, '2017-08-11 21:11:59.890662+02:00'), -(6, 2, '579d148c-0ab1-4afb-832f-afb1fe0e19b7', 'message 6', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-10-08 21:11:59.890662+00'), -(7, NULL, '7aeca469-2593-444e-afe4-4702317534c9', 'message 7', '2018-01-02 21:11:59.890662+00', '2018-01-02 21:11:59.890662+00', '2018-01-02 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2018-01-02 21:11:59.890662+00'), -(9, NULL, 'e14ab466-0d3b-436d-a0f7-5851fd7d9b7d', 'message 9', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'O', 'S', 'V', 'F', NULL, NULL, 6, NULL, 2, 1, 0, '2017-08-12 21:11:59.890662+00'); +INSERT INTO msgs_msg(id, broadcast_id, uuid, text, created_on, sent_on, modified_on, direction, status, visibility, msg_type, attachments, channel_id, contact_id, contact_urn_id, org_id, flow_id, msg_count, error_count, next_attempt) VALUES +(1, NULL, '2f969340-704a-4aa2-a1bd-2f832a21d257', 'message 1', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, NULL, 1, 0, '2017-08-12 21:11:59.890662+00'), +(2, NULL, 'abe87ac1-015c-4803-be29-1e89509fe682', 'message 2', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'D', 'I', NULL, 2, 6, 7, 2, NULL, 1, 0, '2017-08-12 21:11:59.890662+00'), +(3, NULL, 'a7e83a22-a6ff-4e18-82d0-19545640ccba', 'message 3', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'O', 'H', 'V', 'I', '{"image/png:https://foo.bar/image1.png", "image/png:https://foo.bar/image2.png"}', NULL, 6, 7, 2, NULL, 1, 0, '2017-08-12 21:11:59.890662+00'), +(4, NULL, '1cad36af-5581-4c8a-81cd-83708398f61e', 'message 4', '2017-08-13 21:11:59.890662+00', '2017-08-13 21:11:59.890662+00', '2017-08-13 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, NULL, 1, 0, '2017-08-13 21:11:59.890662+00'), +(5, NULL, 'f557972e-2eb5-42fa-9b87-902116d18787', 'message 5', '2017-08-11 21:11:59.890662+02:00', '2017-08-11 21:11:59.890662+02:00', '2017-08-11 21:11:59.890662+02:00', 'I', 'H', 'V', 'I', NULL, 3, 7, 8, 3, NULL, 1, 0, '2017-08-11 21:11:59.890662+02:00'), +(6, 2, '579d148c-0ab1-4afb-832f-afb1fe0e19b7', 'message 6', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, NULL, 1, 0, '2017-10-08 21:11:59.890662+00'), +(7, NULL, '7aeca469-2593-444e-afe4-4702317534c9', 'message 7', '2018-01-02 21:11:59.890662+00', '2018-01-02 21:11:59.890662+00', '2018-01-02 21:11:59.890662+00', 'I', 'H', 'X', 'F', NULL, 2, 6, 7, 2, 2, 1, 0, '2018-01-02 21:11:59.890662+00'), +(9, NULL, 'e14ab466-0d3b-436d-a0f7-5851fd7d9b7d', 'message 9', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'O', 'S', 'V', 'F', NULL, NULL, 6, NULL, 2, 3, 1, 0, '2017-08-12 21:11:59.890662+00'); INSERT INTO msgs_label(id, uuid, name) VALUES (1, '1d9e3188-b74b-4ae0-a166-0de31aedb34a', 'Label 1'), @@ -297,37 +293,26 @@ INSERT INTO channels_channellog(id, msg_id) VALUES (5, 5), (6, 6); -INSERT INTO flows_flow(id, uuid, name) VALUES -(1, '6639286a-9120-45d4-aa39-03ae3942a4a6', 'Flow 1'), -(2, '629db399-a5fb-4fa0-88e6-f479957b63d2', 'Flow 2'), -(3, '3914b88e-625b-4603-bd9f-9319dc331c6b', 'Flow 3'), -(4, 'cfa2371d-2f06-481d-84b2-d974f3803bb0', 'Flow 4'); - INSERT INTO auth_user(id, username) VALUES (1, 'greg@gmail.com'); -INSERT INTO flows_flowrun(id, uuid, responded, contact_id, flow_id, org_id, results, path, events, created_on, modified_on, exited_on, status, exit_type, submitted_by_id) VALUES -(1, '4ced1260-9cfe-4b7f-81dd-b637108f15b9', TRUE, 6, 1, 2, '{}', '[]', '[]', '2017-08-12 21:11:59.890662+02:00','2017-08-12 21:11:59.890662+02:00','2017-08-12 21:11:59.890662+02:00', 'C', 'C', NULL), +INSERT INTO flows_flowrun(id, uuid, responded, contact_id, flow_id, org_id, results, path, created_on, modified_on, exited_on, status, submitted_by_id) VALUES +(1, '4ced1260-9cfe-4b7f-81dd-b637108f15b9', TRUE, 6, 1, 2, '{}', '[]', '2017-08-12 21:11:59.890662+02:00','2017-08-12 21:11:59.890662+02:00','2017-08-12 21:11:59.890662+02:00', 'C', NULL), (2, '7d68469c-0494-498a-bdf3-bac68321fd6d', TRUE, 6, 1, 2, '{"agree": {"category": "Strongly agree", "node_uuid": "a0434c54-3e26-4eb0-bafc-46cdeaf435ac", "name": "Do you agree?", "value": "A", "created_on": "2017-05-03T12:25:21.714339+00:00", "input": "A"}}', '[{"uuid": "c3d0b417-db75-417c-8050-33776ec8f620", "node_uuid": "10896d63-8df7-4022-88dd-a9d93edf355b", "arrived_on": "2017-08-12T15:07:24.049815+02:00", "exit_uuid": "2f890507-2ad2-4bd1-92fc-0ca031155fca"}]', -'[{"msg": {"urn": "tel:+12076661212", "text": "hola", "uuid": "cf05c58f-31fb-4ce8-9e65-4ecc9fd47cbe", "channel": {"name": "1223", "uuid": "bbfe2e9c-cf69-4d0a-b42e-00ac3dc0b0b8"}}, "type": "msg_created", "step_uuid": "659cdae5-1f29-4a58-9437-10421f724268", "created_on": "2018-01-22T15:06:47.357682+00:00"}]', -'2017-08-12 21:11:59.890662+02:00','2017-08-12 21:11:59.890662+02:00','2017-08-12 21:11:59.890662+02:00', 'C', 'C', NULL), +'2017-08-12 21:11:59.890662+02:00','2017-08-12 21:11:59.890662+02:00','2017-08-12 21:11:59.890662+02:00', 'C', NULL), (3, 'de782b35-a398-46ed-8550-34c66053841b', TRUE, 7, 2, 3, '{"agree": {"category": "Strongly agree", "node_uuid": "084c8cf1-715d-4d0a-b38d-a616ed74e638", "name": "Agree", "value": "A", "created_on": "2017-05-03T12:25:21.714339+00:00", "input": "A"}, "confirm_agree": {"category": "Confirmed Strongly agree", "node_uuid": "a0434c54-3e26-4eb0-bafc-46cdeaf435ab", "name": "Do you agree?", "value": "A", "created_on": "2017-05-03T12:25:21.714339+00:00", "input": "A"}}', '[{"uuid": "600ac5b4-4895-4161-ad97-6e2f1bb48bcb", "node_uuid": "accbc6e2-b0df-46cd-9a76-bff0fdf4d753", "arrived_on": "2017-08-12T15:07:24.049815+02:00", "exit_uuid": "8249e2dc-c893-4200-b6d2-398d07a459bc"}]', -'[{"msg": {"urn": "tel:+12076661212", "text": "hola", "uuid": "9ea50923-0888-4596-9a9d-4890994934a9", "channel": {"name": "1223", "uuid": "d6597e08-8285-428c-8e7e-97c68adfa073"}}, "type": "msg_created", "step_uuid": "ae067248-df92-41c8-bb29-92506e984259", "created_on": "2018-01-22T15:06:47.357682+00:00"}]', -'2017-08-10 21:11:59.890662+02:00','2017-08-10 21:11:59.890662+02:00','2017-08-10 21:11:59.890662+02:00', 'C', 'C', 1), +'2017-08-10 21:11:59.890662+02:00','2017-08-10 21:11:59.890662+02:00','2017-08-10 21:11:59.890662+02:00', 'C', 1), (4, '329a5d24-64fc-479c-8d24-9674c9b46530', TRUE, 7, 2, 3, '{"agree": {"category": "Disagree", "node_uuid": "084c8cf1-715d-4d0a-b38d-a616ed74e638", "name": "Agree", "value": "B", "created_on": "2017-10-10T12:25:21.714339+00:00", "input": "B"}}', '[{"uuid": "babf4fc8-e12c-4bb9-a9dd-61178a118b5a", "node_uuid": "accbc6e2-b0df-46cd-9a76-bff0fdf4d753", "arrived_on": "2017-10-12T15:07:24.049815+02:00", "exit_uuid": "8249e2dc-c893-4200-b6d2-398d07a459bc"}]', -'[{"msg": {"urn": "tel:+12076661212", "text": "hi hi", "uuid": "543d2c4b-ff0b-4b87-a9a4-b2d6745cf470", "channel": {"name": "1223", "uuid": "d6597e08-8285-428c-8e7e-97c68adfa073"}}, "type": "msg_created", "step_uuid": "3a5014dd-7b14-4b7a-be52-0419c09340a6", "created_on": "2018-10-12T15:06:47.357682+00:00"}]', -'2017-10-10 21:11:59.890662+02:00','2017-10-10 21:11:59.890662+02:00','2017-10-10 21:11:59.890662+02:00', 'C', 'C', NULL), -(5, 'abed67d2-06b8-4749-8bb9-ecda037b673b', TRUE, 7, 2, 3, '{}', '[]', '[]', '2017-10-10 21:11:59.890663+02:00','2017-10-10 21:11:59.890662+02:00','2017-10-10 21:11:59.890662+02:00', 'C', 'C', NULL), -(6, '6262eefe-a6e9-4201-9b76-a7f25e3b7f29', TRUE, 7, 2, 3, '{}', '[]', '[]', '2017-12-12 21:11:59.890662+02:00','2017-12-12 21:11:59.890662+02:00','2017-12-12 21:11:59.890662+02:00', 'C', 'C', NULL); - -INSERT INTO flows_flowpathrecentrun(id, run_id) VALUES -(1, 3); +'2017-10-10 21:11:59.890662+02:00','2017-10-10 21:11:59.890662+02:00','2017-10-10 21:11:59.890662+02:00', 'C', NULL), +(5, 'abed67d2-06b8-4749-8bb9-ecda037b673b', TRUE, 7, 2, 3, '{}', '[]', '2017-10-10 21:11:59.890663+02:00','2017-10-10 21:11:59.890662+02:00','2017-10-10 21:11:59.890662+02:00', 'C', NULL), +(6, '6262eefe-a6e9-4201-9b76-a7f25e3b7f29', TRUE, 7, 2, 3, '{}', '[]', '2017-12-12 21:11:59.890662+02:00','2017-12-12 21:11:59.890662+02:00','2017-12-12 21:11:59.890662+02:00', 'C', NULL), +(7, '6c0d7db9-076b-4edc-ab4b-38576ae394fc', TRUE, 7, 2, 2, '{}', '[]', '2017-08-13 13:11:59.890662+02:00','2017-08-14 16:11:59.890662+02:00', NULL, 'W', NULL); -- update run #5 to have a path longer than 500 steps UPDATE flows_flowrun SET path = s.path FROM (