diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d7440a3..0397357 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,11 +1,13 @@ name: CI on: [push, pull_request] +env: + go-version: '1.17.x' jobs: test: name: Test strategy: matrix: - pg-version: ['11', '12'] + pg-version: ['12', '13'] runs-on: ubuntu-latest steps: - name: Checkout code @@ -22,7 +24,7 @@ jobs: - name: Install Go uses: actions/setup-go@v1 with: - go-version: 1.15.x + go-version: ${{ env.go-version }} - name: Run tests run: go test -p=1 -coverprofile=coverage.text -covermode=atomic ./... @@ -48,7 +50,7 @@ jobs: - name: Install Go uses: actions/setup-go@v1 with: - go-version: 1.15.x + go-version: ${{ env.go-version }} - name: Publish release uses: goreleaser/goreleaser-action@v1 diff --git a/CHANGELOG.md b/CHANGELOG.md index 89bf411..d6f00d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +v7.0.0 +---------- + * Test on PG12 and 13 + +v6.5.0 +---------- + * Limit paths in archived runs to first 500 steps + * Use go 1.17 + v6.4.0 ---------- * 6.4.0 Release Candidate diff --git a/README.md b/README.md index 402a0b7..c943915 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ S3 compatible endpoint. # Deploying -As Archiver is a go application, it compiles to a binary and that binary along with the config file is all +As Archiver is a Go application, it compiles to a binary and that binary along with the config file is all you need to run it on your server. You can find bundles for each platform in the [releases directory](https://github.com/nyaruka/rp-archiver/releases). You should only run a single archiver instance for a deployment. diff --git a/archiver.go b/archives/archives.go similarity index 61% rename from archiver.go rename to archives/archives.go index af10024..54ba31d 100644 --- a/archiver.go +++ b/archives/archives.go @@ -1,4 +1,4 @@ -package archiver +package archives import ( "bufio" @@ -10,10 +10,9 @@ import ( "fmt" "io" "io/ioutil" - "time" - "os" "path/filepath" + "time" "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/jmoiron/sqlx" @@ -91,11 +90,6 @@ func (a *Archive) endDate() time.Time { return endDate } -func (a *Archive) coversDate(d time.Time) bool { - end := a.endDate() - return !a.StartDate.After(d) && end.After(d) -} - const lookupActiveOrgs = ` SELECT o.id, o.name, o.created_on, o.is_anon FROM orgs_org o @@ -460,183 +454,16 @@ func EnsureTempArchiveDirectory(path string) error { testFilePath := filepath.Join(path, ".test_file") testFile, err := os.Create(testFilePath) - defer testFile.Close() - if err != nil { return fmt.Errorf("directory '%s' is not writable", path) } + defer testFile.Close() + err = os.Remove(testFilePath) return err } -const lookupMsgs = ` -SELECT rec.visibility, row_to_json(rec) FROM ( - SELECT - mm.id, - broadcast_id as broadcast, - row_to_json(contact) as contact, - CASE WHEN oo.is_anon = False THEN ccu.identity ELSE null END as urn, - row_to_json(channel) as channel, - CASE WHEN direction = 'I' THEN 'in' - WHEN direction = 'O' THEN 'out' - ELSE NULL - END as direction, - CASE WHEN msg_type = 'F' - THEN 'flow' - WHEN msg_type = 'V' - THEN 'ivr' - WHEN msg_type = 'I' - THEN 'inbox' - ELSE NULL - END as "type", - CASE when status = 'I' then 'initializing' - WHEN status = 'P' then 'queued' - WHEN status = 'Q' then 'queued' - WHEN status = 'W' then 'wired' - WHEN status = 'D' then 'delivered' - WHEN status = 'H' then 'handled' - WHEN status = 'E' then 'errored' - WHEN status = 'F' then 'failed' - WHEN status = 'S' then 'sent' - WHEN status = 'R' then 'resent' - ELSE NULL - END as status, - - CASE WHEN visibility = 'V' THEN 'visible' - WHEN visibility = 'A' THEN 'archived' - WHEN visibility = 'D' THEN 'deleted' - ELSE NULL - END as visibility, - text, - (select coalesce(jsonb_agg(attach_row), '[]'::jsonb) FROM (select attach_data.attachment[1] as content_type, attach_data.attachment[2] as url FROM (select regexp_matches(unnest(attachments), '^(.*?):(.*)$') attachment) as attach_data) as attach_row) as attachments, - labels_agg.data as labels, - mm.created_on as created_on, - sent_on, - mm.modified_on as modified_on - FROM msgs_msg mm - JOIN orgs_org oo ON mm.org_id = oo.id - JOIN LATERAL (select uuid, name from contacts_contact cc where cc.id = mm.contact_id) as contact ON True - LEFT JOIN contacts_contacturn ccu ON mm.contact_urn_id = ccu.id - LEFT JOIN LATERAL (select uuid, name from channels_channel ch where ch.id = mm.channel_id) as channel ON True - LEFT JOIN LATERAL (select coalesce(jsonb_agg(label_row), '[]'::jsonb) as data from (select uuid, name from msgs_label ml INNER JOIN msgs_msg_labels mml ON ml.id = mml.label_id AND mml.msg_id = mm.id) as label_row) as labels_agg ON True - - WHERE mm.org_id = $1 AND mm.created_on >= $2 AND mm.created_on < $3 - ORDER BY created_on ASC, id ASC) rec; -` - -// writeMessageRecords writes the messages in the archive's date range to the passed in writer -func writeMessageRecords(ctx context.Context, db *sqlx.DB, archive *Archive, writer *bufio.Writer) (int, error) { - var rows *sqlx.Rows - recordCount := 0 - - // first write our normal records - var record, visibility string - - rows, err := db.QueryxContext(ctx, lookupMsgs, archive.Org.ID, archive.StartDate, archive.endDate()) - if err != nil { - return 0, errors.Wrapf(err, "error querying messages for org: %d", archive.Org.ID) - } - defer rows.Close() - - for rows.Next() { - err = rows.Scan(&visibility, &record) - if err != nil { - return 0, errors.Wrapf(err, "error scanning message row for org: %d", archive.Org.ID) - } - - if visibility == "deleted" { - continue - } - writer.WriteString(record) - writer.WriteString("\n") - recordCount++ - } - - logrus.WithField("record_count", recordCount).Debug("Done Writing") - return recordCount, nil -} - -const lookupFlowRuns = ` -SELECT rec.exited_on, row_to_json(rec) -FROM ( - SELECT - fr.id as id, - fr.uuid as uuid, - row_to_json(flow_struct) AS flow, - row_to_json(contact_struct) AS contact, - fr.responded, - (SELECT coalesce(jsonb_agg(path_data), '[]'::jsonb) from ( - SELECT path_row ->> 'node_uuid' AS node, (path_row ->> 'arrived_on')::timestamptz as time - FROM jsonb_array_elements(fr.path::jsonb) AS path_row) as path_data - ) as path, - (SELECT coalesce(jsonb_object_agg(values_data.key, values_data.value), '{}'::jsonb) from ( - SELECT key, jsonb_build_object('name', value -> 'name', 'value', value -> 'value', 'input', value -> 'input', 'time', (value -> 'created_on')::text::timestamptz, 'category', value -> 'category', 'node', value -> 'node_uuid') as value - FROM jsonb_each(fr.results::jsonb)) AS values_data - ) as values, - CASE - WHEN $1 - THEN '[]'::jsonb - ELSE - coalesce(fr.events, '[]'::jsonb) - END AS events, - fr.created_on, - fr.modified_on, - fr.exited_on, - CASE - WHEN exit_type = 'C' - THEN 'completed' - WHEN exit_type = 'I' - THEN 'interrupted' - WHEN exit_type = 'E' - THEN 'expired' - ELSE - null - END as exit_type, - a.username as submitted_by - - FROM flows_flowrun fr - LEFT JOIN auth_user a ON a.id = fr.submitted_by_id - JOIN LATERAL (SELECT uuid, name FROM flows_flow WHERE flows_flow.id = fr.flow_id) AS flow_struct ON True - JOIN LATERAL (SELECT uuid, name FROM contacts_contact cc WHERE cc.id = fr.contact_id) AS contact_struct ON True - - WHERE fr.org_id = $2 AND fr.modified_on >= $3 AND fr.modified_on < $4 - ORDER BY fr.modified_on ASC, id ASC -) as rec; -` - -// writeRunRecords writes the runs in the archive's date range to the passed in writer -func writeRunRecords(ctx context.Context, db *sqlx.DB, archive *Archive, writer *bufio.Writer) (int, error) { - var rows *sqlx.Rows - rows, err := db.QueryxContext(ctx, lookupFlowRuns, archive.Org.IsAnon, archive.Org.ID, archive.StartDate, archive.endDate()) - if err != nil { - return 0, errors.Wrapf(err, "error querying run records for org: %d", archive.Org.ID) - } - defer rows.Close() - - recordCount := 0 - var record string - var exitedOn *time.Time - for rows.Next() { - err = rows.Scan(&exitedOn, &record) - - // shouldn't be archiving an active run, that's an error - if exitedOn == nil { - return 0, fmt.Errorf("run still active, cannot archive: %s", record) - } - - if err != nil { - return 0, errors.Wrapf(err, "error scanning run record for org: %d", archive.Org.ID) - } - - writer.WriteString(record) - writer.WriteString("\n") - recordCount++ - } - - return recordCount, nil -} - // CreateArchiveFile is responsible for writing an archive file for the passed in archive from our database func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archivePath string) error { ctx, cancel := context.WithTimeout(ctx, time.Hour*3) @@ -961,13 +788,13 @@ func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client s }) for _, archive := range archives { - log = logrus.WithFields(logrus.Fields{ + log.WithFields(logrus.Fields{ "start_date": archive.StartDate, "end_date": archive.endDate(), "period": archive.Period, "archive_type": archive.ArchiveType, - }) - log.Info("starting archive") + }).Info("starting archive") + start := time.Now() err := createArchive(ctx, db, config, s3Client, archive) @@ -1052,41 +879,6 @@ func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *s return created, nil } -const selectOrgMessagesInRange = ` -SELECT mm.id, mm.visibility -FROM msgs_msg mm -LEFT JOIN contacts_contact cc ON cc.id = mm.contact_id -WHERE mm.org_id = $1 AND mm.created_on >= $2 AND mm.created_on < $3 -ORDER BY mm.created_on ASC, mm.id ASC -` - -const setMessageDeleteReason = ` -UPDATE msgs_msg -SET delete_reason = 'A' -WHERE id IN(?) -` - -const deleteMessageLogs = ` -DELETE FROM channels_channellog -WHERE msg_id IN(?) -` - -const deleteMessageLabels = ` -DELETE FROM msgs_msg_labels -WHERE msg_id IN(?) -` - -const unlinkResponses = ` -UPDATE msgs_msg -SET response_to_id = NULL -WHERE response_to_id IN(?) -` - -const deleteMessages = ` -DELETE FROM msgs_msg -WHERE id IN(?) -` - const setArchiveDeleted = ` UPDATE archives_archive SET needs_deletion = FALSE, deleted_on = $2 @@ -1110,444 +902,6 @@ func executeInQuery(ctx context.Context, tx *sqlx.Tx, query string, ids []int64) var deleteTransactionSize = 100 -// DeleteArchivedMessages takes the passed in archive, verifies the S3 file is still present (and correct), then selects -// all the messages in the archive date range, and if equal or fewer than the number archived, deletes them 100 at a time -// -// Upon completion it updates the needs_deletion flag on the archive -func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3Client s3iface.S3API, archive *Archive) error { - outer, cancel := context.WithTimeout(ctx, time.Hour*3) - defer cancel() - - start := time.Now() - log := logrus.WithFields(logrus.Fields{ - "id": archive.ID, - "org_id": archive.OrgID, - "start_date": archive.StartDate, - "end_date": archive.endDate(), - "archive_type": archive.ArchiveType, - "total_count": archive.RecordCount, - }) - log.Info("deleting messages") - - // first things first, make sure our file is present on S3 - md5, err := GetS3FileETAG(outer, s3Client, archive.URL) - if err != nil { - return err - } - - // if our etag and archive md5 don't match, that's an error, return - if md5 != archive.Hash { - return fmt.Errorf("archive md5: %s and s3 etag: %s do not match", archive.Hash, md5) - } - - // ok, archive file looks good, let's build up our list of message ids, this may be big but we are int64s so shouldn't be too big - rows, err := db.QueryxContext(outer, selectOrgMessagesInRange, archive.OrgID, archive.StartDate, archive.endDate()) - if err != nil { - return err - } - defer rows.Close() - - visibleCount := 0 - var msgID int64 - var visibility string - msgIDs := make([]int64, 0, archive.RecordCount) - for rows.Next() { - err = rows.Scan(&msgID, &visibility) - if err != nil { - return err - } - msgIDs = append(msgIDs, msgID) - - // keep track of the number of visible messages, ie, not deleted - if visibility != "D" { - visibleCount++ - } - } - rows.Close() - - log.WithFields(logrus.Fields{ - "msg_count": len(msgIDs), - }).Debug("found messages") - - // verify we don't see more messages than there are in our archive (fewer is ok) - if visibleCount > archive.RecordCount { - return fmt.Errorf("more messages in the database: %d than in archive: %d", visibleCount, archive.RecordCount) - } - - // ok, delete our messages in batches, we do this in transactions as it spans a few different queries - for startIdx := 0; startIdx < len(msgIDs); startIdx += deleteTransactionSize { - // no single batch should take more than a few minutes - ctx, cancel := context.WithTimeout(ctx, time.Minute*15) - defer cancel() - - start := time.Now() - - endIdx := startIdx + deleteTransactionSize - if endIdx > len(msgIDs) { - endIdx = len(msgIDs) - } - batchIDs := msgIDs[startIdx:endIdx] - - // start our transaction - tx, err := db.BeginTxx(ctx, nil) - if err != nil { - return err - } - - // first update our delete_reason - err = executeInQuery(ctx, tx, setMessageDeleteReason, batchIDs) - if err != nil { - return fmt.Errorf("error updating delete reason: %s", err.Error()) - } - - // now delete any channel logs - err = executeInQuery(ctx, tx, deleteMessageLogs, batchIDs) - if err != nil { - return fmt.Errorf("error removing channel logs: %s", err.Error()) - } - - // then any labels - err = executeInQuery(ctx, tx, deleteMessageLabels, batchIDs) - if err != nil { - return fmt.Errorf("error removing message labels: %s", err.Error()) - } - - // unlink any responses - err = executeInQuery(ctx, tx, unlinkResponses, batchIDs) - if err != nil { - return fmt.Errorf("error unlinking responses: %s", err.Error()) - } - - // finally, delete our messages - err = executeInQuery(ctx, tx, deleteMessages, batchIDs) - if err != nil { - return fmt.Errorf("error deleting messages: %s", err.Error()) - } - - // commit our transaction - err = tx.Commit() - if err != nil { - return fmt.Errorf("error committing message delete transaction: %s", err.Error()) - } - - log.WithFields(logrus.Fields{ - "elapsed": time.Since(start), - "count": len(batchIDs), - }).Debug("deleted batch of messages") - - cancel() - } - - outer, cancel = context.WithTimeout(ctx, time.Minute) - defer cancel() - - deletedOn := time.Now() - - // all went well! mark our archive as no longer needing deletion - _, err = db.ExecContext(outer, setArchiveDeleted, archive.ID, deletedOn) - if err != nil { - return fmt.Errorf("error setting archive as deleted: %s", err.Error()) - } - archive.NeedsDeletion = false - archive.DeletedOn = &deletedOn - - logrus.WithFields(logrus.Fields{ - "elapsed": time.Since(start), - }).Info("completed deleting messages") - - return nil -} - -// DeleteBroadcasts deletes all broadcasts older than 90 days for the passed in org which have no active messages on them -func DeleteBroadcasts(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, org Org) error { - start := time.Now() - threshhold := now.AddDate(0, 0, -org.RetentionPeriod) - - rows, err := db.QueryxContext(ctx, selectOldOrgBroadcasts, org.ID, threshhold) - if err != nil { - return err - } - defer rows.Close() - - count := 0 - for rows.Next() { - if count == 0 { - logrus.WithField("org_id", org.ID).Info("deleting broadcasts") - } - - // been deleting this org more than an hour? thats enough for today, exit out - if time.Since(start) > time.Hour { - break - } - - var broadcastID int64 - err := rows.Scan(&broadcastID) - if err != nil { - return errors.Wrap(err, "unable to get broadcast id") - } - - // make sure we have no active messages - var msgCount int64 - err = db.Get(&msgCount, `SELECT count(*) FROM msgs_msg WHERE broadcast_id = $1`, broadcastID) - if err != nil { - return errors.Wrapf(err, "unable to select number of msgs for broadcast: %d", broadcastID) - } - - if msgCount != 0 { - logrus.WithField("broadcast_id", broadcastID).WithField("org_id", org.ID).WithField("msg_count", msgCount).Warn("unable to delete broadcast, has messages still") - continue - } - - // we delete broadcasts in a transaction per broadcast - tx, err := db.BeginTx(ctx, nil) - if err != nil { - return errors.Wrapf(err, "error starting transaction while deleting broadcast: %d", broadcastID) - } - - // delete contacts M2M - _, err = tx.Exec(`DELETE from msgs_broadcast_contacts WHERE broadcast_id = $1`, broadcastID) - if err != nil { - tx.Rollback() - return errors.Wrapf(err, "error deleting related contacts for broadcast: %d", broadcastID) - } - - // delete groups M2M - _, err = tx.Exec(`DELETE from msgs_broadcast_groups WHERE broadcast_id = $1`, broadcastID) - if err != nil { - tx.Rollback() - return errors.Wrapf(err, "error deleting related groups for broadcast: %d", broadcastID) - } - - // delete URNs M2M - _, err = tx.Exec(`DELETE from msgs_broadcast_urns WHERE broadcast_id = $1`, broadcastID) - if err != nil { - tx.Rollback() - return errors.Wrapf(err, "error deleting related urns for broadcast: %d", broadcastID) - } - - // delete counts associated with this broadcast - _, err = tx.Exec(`DELETE from msgs_broadcastmsgcount WHERE broadcast_id = $1`, broadcastID) - if err != nil { - tx.Rollback() - return errors.Wrapf(err, "error deleting counts for broadcast: %d", broadcastID) - } - - // finally, delete our broadcast - _, err = tx.Exec(`DELETE from msgs_broadcast WHERE id = $1`, broadcastID) - if err != nil { - tx.Rollback() - return errors.Wrapf(err, "error deleting broadcast: %d", broadcastID) - } - - err = tx.Commit() - if err != nil { - return errors.Wrapf(err, "error deleting broadcast: %d", broadcastID) - } - - count++ - } - - if count > 0 { - logrus.WithFields(logrus.Fields{ - "elapsed": time.Since(start), - "count": count, - "org_id": org.ID, - }).Info("completed deleting broadcasts") - } - - return nil -} - -const selectOldOrgBroadcasts = ` -SELECT - id -FROM - msgs_broadcast -WHERE - org_id = $1 AND - created_on < $2 AND - schedule_id IS NULL -ORDER BY - created_on ASC, - id ASC -LIMIT 1000000; -` - -const selectOrgRunsInRange = ` -SELECT fr.id, fr.is_active -FROM flows_flowrun fr -LEFT JOIN contacts_contact cc ON cc.id = fr.contact_id -WHERE fr.org_id = $1 AND fr.modified_on >= $2 AND fr.modified_on < $3 -ORDER BY fr.modified_on ASC, fr.id ASC -` - -const setRunDeleteReason = ` -UPDATE flows_flowrun -SET delete_reason = 'A' -WHERE id IN(?) -` - -const deleteRecentRuns = ` -DELETE FROM flows_flowpathrecentrun -WHERE run_id IN(?) -` - -const unlinkParents = ` -UPDATE flows_flowrun -SET parent_id = NULL -WHERE parent_id IN(?) -` - -const deleteRuns = ` -DELETE FROM flows_flowrun -WHERE id IN(?) -` - -// DeleteArchivedRuns takes the passed in archive, verifies the S3 file is still present (and correct), then selects -// all the runs in the archive date range, and if equal or fewer than the number archived, deletes them 100 at a time -// -// Upon completion it updates the needs_deletion flag on the archive -func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Client s3iface.S3API, archive *Archive) error { - outer, cancel := context.WithTimeout(ctx, time.Hour*3) - defer cancel() - - start := time.Now() - log := logrus.WithFields(logrus.Fields{ - "id": archive.ID, - "org_id": archive.OrgID, - "start_date": archive.StartDate, - "end_date": archive.endDate(), - "archive_type": archive.ArchiveType, - "total_count": archive.RecordCount, - }) - log.Info("deleting runs") - - // first things first, make sure our file is present on S3 - md5, err := GetS3FileETAG(outer, s3Client, archive.URL) - if err != nil { - return err - } - - // if our etag and archive md5 don't match, that's an error, return - if md5 != archive.Hash { - return fmt.Errorf("archive md5: %s and s3 etag: %s do not match", archive.Hash, md5) - } - - // ok, archive file looks good, let's build up our list of run ids, this may be big but we are int64s so shouldn't be too big - rows, err := db.QueryxContext(outer, selectOrgRunsInRange, archive.OrgID, archive.StartDate, archive.endDate()) - if err != nil { - return err - } - defer rows.Close() - - var runID int64 - var isActive bool - runCount := 0 - runIDs := make([]int64, 0, archive.RecordCount) - for rows.Next() { - err = rows.Scan(&runID, &isActive) - if err != nil { - return err - } - - // if this run is still active, something has gone wrong, throw an error - if isActive { - return fmt.Errorf("run %d in archive is still active", runID) - } - - // increment our count - runCount++ - runIDs = append(runIDs, runID) - } - rows.Close() - - log.WithFields(logrus.Fields{ - "run_count": len(runIDs), - }).Debug("found runs") - - // verify we don't see more runs than there are in our archive (fewer is ok) - if runCount > archive.RecordCount { - return fmt.Errorf("more runs in the database: %d than in archive: %d", runCount, archive.RecordCount) - } - - // ok, delete our runs in batches, we do this in transactions as it spans a few different queries - for startIdx := 0; startIdx < len(runIDs); startIdx += deleteTransactionSize { - // no single batch should take more than a few minutes - ctx, cancel := context.WithTimeout(ctx, time.Minute*15) - defer cancel() - - start := time.Now() - - endIdx := startIdx + deleteTransactionSize - if endIdx > len(runIDs) { - endIdx = len(runIDs) - } - batchIDs := runIDs[startIdx:endIdx] - - // start our transaction - tx, err := db.BeginTxx(ctx, nil) - if err != nil { - return err - } - - // first update our delete_reason - err = executeInQuery(ctx, tx, setRunDeleteReason, batchIDs) - if err != nil { - return fmt.Errorf("error updating delete reason: %s", err.Error()) - } - - // any recent runs - err = executeInQuery(ctx, tx, deleteRecentRuns, batchIDs) - if err != nil { - return fmt.Errorf("error deleting recent runs: %s", err.Error()) - } - - // unlink any parents - err = executeInQuery(ctx, tx, unlinkParents, batchIDs) - if err != nil { - return fmt.Errorf("error unliking parent runs: %s", err.Error()) - } - - // finally, delete our runs - err = executeInQuery(ctx, tx, deleteRuns, batchIDs) - if err != nil { - return fmt.Errorf("error deleting runs: %s", err.Error()) - } - - // commit our transaction - err = tx.Commit() - if err != nil { - return fmt.Errorf("error committing run delete transaction: %s", err.Error()) - } - - log.WithFields(logrus.Fields{ - "elapsed": time.Since(start), - "count": len(batchIDs), - }).Debug("deleted batch of runs") - - cancel() - } - - outer, cancel = context.WithTimeout(ctx, time.Minute) - defer cancel() - - deletedOn := time.Now() - - // all went well! mark our archive as no longer needing deletion - _, err = db.ExecContext(outer, setArchiveDeleted, archive.ID, deletedOn) - if err != nil { - return fmt.Errorf("error setting archive as deleted: %s", err.Error()) - } - archive.NeedsDeletion = false - archive.DeletedOn = &deletedOn - - logrus.WithFields(logrus.Fields{ - "elapsed": time.Since(start), - }).Info("completed deleting runs") - - return nil -} - // DeleteArchivedOrgRecords deletes all the records for the passeg in org based on archives already created func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, error) { // get all the archives that haven't yet been deleted @@ -1609,9 +963,7 @@ func ArchiveOrg(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, return nil, nil, errors.Wrapf(err, "error rolling up archives") } - for _, m := range monthlies { - created = append(created, m) - } + created = append(created, monthlies...) // finally delete any archives not yet actually archived deleted := make([]*Archive, 0, 1) diff --git a/archiver_test.go b/archives/archives_test.go similarity index 98% rename from archiver_test.go rename to archives/archives_test.go index 87cbf84..58aafaf 100644 --- a/archiver_test.go +++ b/archives/archives_test.go @@ -1,4 +1,4 @@ -package archiver +package archives import ( "compress/gzip" @@ -16,7 +16,7 @@ import ( ) func setup(t *testing.T) *sqlx.DB { - testDB, err := ioutil.ReadFile("testdb.sql") + testDB, err := ioutil.ReadFile("../testdb.sql") assert.NoError(t, err) db, err := sqlx.Open("postgres", "postgres://temba:temba@localhost:5432/archiver_test?sslmode=disable&TimeZone=UTC") @@ -486,9 +486,9 @@ func TestArchiveOrgRuns(t *testing.T) { assert.Equal(t, time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), created[11].StartDate) assert.Equal(t, DayPeriod, created[11].Period) - assert.Equal(t, 1, created[11].RecordCount) - assert.Equal(t, int64(427), created[11].Size) - assert.Equal(t, "bf08041cef314492fee2910357ec4189", created[11].Hash) + assert.Equal(t, 2, created[11].RecordCount) + assert.Equal(t, int64(2002), created[11].Size) + assert.Equal(t, "b75d6ee33ce26b786f1b341e875ecd62", created[11].Hash) assert.Equal(t, 12, len(deleted)) diff --git a/config.go b/archives/config.go similarity index 99% rename from config.go rename to archives/config.go index 6fe9af3..5ec635a 100644 --- a/config.go +++ b/archives/config.go @@ -1,4 +1,4 @@ -package archiver +package archives // Config is our top level configuration object type Config struct { diff --git a/archives/messages.go b/archives/messages.go new file mode 100644 index 0000000..a3a5119 --- /dev/null +++ b/archives/messages.go @@ -0,0 +1,398 @@ +package archives + +import ( + "bufio" + "context" + "fmt" + "time" + + "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/jmoiron/sqlx" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +const lookupMsgs = ` +SELECT rec.visibility, row_to_json(rec) FROM ( + SELECT + mm.id, + broadcast_id as broadcast, + row_to_json(contact) as contact, + CASE WHEN oo.is_anon = False THEN ccu.identity ELSE null END as urn, + row_to_json(channel) as channel, + CASE WHEN direction = 'I' THEN 'in' + WHEN direction = 'O' THEN 'out' + ELSE NULL + END as direction, + CASE WHEN msg_type = 'F' + THEN 'flow' + WHEN msg_type = 'V' + THEN 'ivr' + WHEN msg_type = 'I' + THEN 'inbox' + ELSE NULL + END as "type", + CASE when status = 'I' then 'initializing' + WHEN status = 'P' then 'queued' + WHEN status = 'Q' then 'queued' + WHEN status = 'W' then 'wired' + WHEN status = 'D' then 'delivered' + WHEN status = 'H' then 'handled' + WHEN status = 'E' then 'errored' + WHEN status = 'F' then 'failed' + WHEN status = 'S' then 'sent' + WHEN status = 'R' then 'resent' + ELSE NULL + END as status, + + CASE WHEN visibility = 'V' THEN 'visible' + WHEN visibility = 'A' THEN 'archived' + WHEN visibility = 'D' THEN 'deleted' + ELSE NULL + END as visibility, + text, + (select coalesce(jsonb_agg(attach_row), '[]'::jsonb) FROM (select attach_data.attachment[1] as content_type, attach_data.attachment[2] as url FROM (select regexp_matches(unnest(attachments), '^(.*?):(.*)$') attachment) as attach_data) as attach_row) as attachments, + labels_agg.data as labels, + mm.created_on as created_on, + sent_on, + mm.modified_on as modified_on + FROM msgs_msg mm + JOIN orgs_org oo ON mm.org_id = oo.id + JOIN LATERAL (select uuid, name from contacts_contact cc where cc.id = mm.contact_id) as contact ON True + LEFT JOIN contacts_contacturn ccu ON mm.contact_urn_id = ccu.id + LEFT JOIN LATERAL (select uuid, name from channels_channel ch where ch.id = mm.channel_id) as channel ON True + LEFT JOIN LATERAL (select coalesce(jsonb_agg(label_row), '[]'::jsonb) as data from (select uuid, name from msgs_label ml INNER JOIN msgs_msg_labels mml ON ml.id = mml.label_id AND mml.msg_id = mm.id) as label_row) as labels_agg ON True + + WHERE mm.org_id = $1 AND mm.created_on >= $2 AND mm.created_on < $3 + ORDER BY created_on ASC, id ASC) rec; +` + +// writeMessageRecords writes the messages in the archive's date range to the passed in writer +func writeMessageRecords(ctx context.Context, db *sqlx.DB, archive *Archive, writer *bufio.Writer) (int, error) { + var rows *sqlx.Rows + recordCount := 0 + + // first write our normal records + var record, visibility string + + rows, err := db.QueryxContext(ctx, lookupMsgs, archive.Org.ID, archive.StartDate, archive.endDate()) + if err != nil { + return 0, errors.Wrapf(err, "error querying messages for org: %d", archive.Org.ID) + } + defer rows.Close() + + for rows.Next() { + err = rows.Scan(&visibility, &record) + if err != nil { + return 0, errors.Wrapf(err, "error scanning message row for org: %d", archive.Org.ID) + } + + if visibility == "deleted" { + continue + } + writer.WriteString(record) + writer.WriteString("\n") + recordCount++ + } + + logrus.WithField("record_count", recordCount).Debug("Done Writing") + return recordCount, nil +} + +const selectOrgMessagesInRange = ` +SELECT mm.id, mm.visibility +FROM msgs_msg mm +LEFT JOIN contacts_contact cc ON cc.id = mm.contact_id +WHERE mm.org_id = $1 AND mm.created_on >= $2 AND mm.created_on < $3 +ORDER BY mm.created_on ASC, mm.id ASC +` + +const setMessageDeleteReason = ` +UPDATE msgs_msg +SET delete_reason = 'A' +WHERE id IN(?) +` + +const deleteMessageLogs = ` +DELETE FROM channels_channellog +WHERE msg_id IN(?) +` + +const deleteMessageLabels = ` +DELETE FROM msgs_msg_labels +WHERE msg_id IN(?) +` + +const unlinkResponses = ` +UPDATE msgs_msg +SET response_to_id = NULL +WHERE response_to_id IN(?) +` + +const deleteMessages = ` +DELETE FROM msgs_msg +WHERE id IN(?) +` + +// DeleteArchivedMessages takes the passed in archive, verifies the S3 file is still present (and correct), then selects +// all the messages in the archive date range, and if equal or fewer than the number archived, deletes them 100 at a time +// +// Upon completion it updates the needs_deletion flag on the archive +func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3Client s3iface.S3API, archive *Archive) error { + outer, cancel := context.WithTimeout(ctx, time.Hour*3) + defer cancel() + + start := time.Now() + log := logrus.WithFields(logrus.Fields{ + "id": archive.ID, + "org_id": archive.OrgID, + "start_date": archive.StartDate, + "end_date": archive.endDate(), + "archive_type": archive.ArchiveType, + "total_count": archive.RecordCount, + }) + log.Info("deleting messages") + + // first things first, make sure our file is present on S3 + md5, err := GetS3FileETAG(outer, s3Client, archive.URL) + if err != nil { + return err + } + + // if our etag and archive md5 don't match, that's an error, return + if md5 != archive.Hash { + return fmt.Errorf("archive md5: %s and s3 etag: %s do not match", archive.Hash, md5) + } + + // ok, archive file looks good, let's build up our list of message ids, this may be big but we are int64s so shouldn't be too big + rows, err := db.QueryxContext(outer, selectOrgMessagesInRange, archive.OrgID, archive.StartDate, archive.endDate()) + if err != nil { + return err + } + defer rows.Close() + + visibleCount := 0 + var msgID int64 + var visibility string + msgIDs := make([]int64, 0, archive.RecordCount) + for rows.Next() { + err = rows.Scan(&msgID, &visibility) + if err != nil { + return err + } + msgIDs = append(msgIDs, msgID) + + // keep track of the number of visible messages, ie, not deleted + if visibility != "D" { + visibleCount++ + } + } + rows.Close() + + log.WithFields(logrus.Fields{ + "msg_count": len(msgIDs), + }).Debug("found messages") + + // verify we don't see more messages than there are in our archive (fewer is ok) + if visibleCount > archive.RecordCount { + return fmt.Errorf("more messages in the database: %d than in archive: %d", visibleCount, archive.RecordCount) + } + + // ok, delete our messages in batches, we do this in transactions as it spans a few different queries + for startIdx := 0; startIdx < len(msgIDs); startIdx += deleteTransactionSize { + // no single batch should take more than a few minutes + ctx, cancel := context.WithTimeout(ctx, time.Minute*15) + defer cancel() + + start := time.Now() + + endIdx := startIdx + deleteTransactionSize + if endIdx > len(msgIDs) { + endIdx = len(msgIDs) + } + batchIDs := msgIDs[startIdx:endIdx] + + // start our transaction + tx, err := db.BeginTxx(ctx, nil) + if err != nil { + return err + } + + // first update our delete_reason + err = executeInQuery(ctx, tx, setMessageDeleteReason, batchIDs) + if err != nil { + return fmt.Errorf("error updating delete reason: %s", err.Error()) + } + + // now delete any channel logs + err = executeInQuery(ctx, tx, deleteMessageLogs, batchIDs) + if err != nil { + return fmt.Errorf("error removing channel logs: %s", err.Error()) + } + + // then any labels + err = executeInQuery(ctx, tx, deleteMessageLabels, batchIDs) + if err != nil { + return fmt.Errorf("error removing message labels: %s", err.Error()) + } + + // unlink any responses + err = executeInQuery(ctx, tx, unlinkResponses, batchIDs) + if err != nil { + return fmt.Errorf("error unlinking responses: %s", err.Error()) + } + + // finally, delete our messages + err = executeInQuery(ctx, tx, deleteMessages, batchIDs) + if err != nil { + return fmt.Errorf("error deleting messages: %s", err.Error()) + } + + // commit our transaction + err = tx.Commit() + if err != nil { + return fmt.Errorf("error committing message delete transaction: %s", err.Error()) + } + + log.WithFields(logrus.Fields{ + "elapsed": time.Since(start), + "count": len(batchIDs), + }).Debug("deleted batch of messages") + + cancel() + } + + outer, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() + + deletedOn := time.Now() + + // all went well! mark our archive as no longer needing deletion + _, err = db.ExecContext(outer, setArchiveDeleted, archive.ID, deletedOn) + if err != nil { + return fmt.Errorf("error setting archive as deleted: %s", err.Error()) + } + archive.NeedsDeletion = false + archive.DeletedOn = &deletedOn + + logrus.WithFields(logrus.Fields{ + "elapsed": time.Since(start), + }).Info("completed deleting messages") + + return nil +} + +const selectOldOrgBroadcasts = ` +SELECT + id +FROM + msgs_broadcast +WHERE + org_id = $1 AND + created_on < $2 AND + schedule_id IS NULL +ORDER BY + created_on ASC, + id ASC +LIMIT 1000000; +` + +// DeleteBroadcasts deletes all broadcasts older than 90 days for the passed in org which have no active messages on them +func DeleteBroadcasts(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, org Org) error { + start := time.Now() + threshhold := now.AddDate(0, 0, -org.RetentionPeriod) + + rows, err := db.QueryxContext(ctx, selectOldOrgBroadcasts, org.ID, threshhold) + if err != nil { + return err + } + defer rows.Close() + + count := 0 + for rows.Next() { + if count == 0 { + logrus.WithField("org_id", org.ID).Info("deleting broadcasts") + } + + // been deleting this org more than an hour? thats enough for today, exit out + if time.Since(start) > time.Hour { + break + } + + var broadcastID int64 + err := rows.Scan(&broadcastID) + if err != nil { + return errors.Wrap(err, "unable to get broadcast id") + } + + // make sure we have no active messages + var msgCount int64 + err = db.Get(&msgCount, `SELECT count(*) FROM msgs_msg WHERE broadcast_id = $1`, broadcastID) + if err != nil { + return errors.Wrapf(err, "unable to select number of msgs for broadcast: %d", broadcastID) + } + + if msgCount != 0 { + logrus.WithField("broadcast_id", broadcastID).WithField("org_id", org.ID).WithField("msg_count", msgCount).Warn("unable to delete broadcast, has messages still") + continue + } + + // we delete broadcasts in a transaction per broadcast + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return errors.Wrapf(err, "error starting transaction while deleting broadcast: %d", broadcastID) + } + + // delete contacts M2M + _, err = tx.Exec(`DELETE from msgs_broadcast_contacts WHERE broadcast_id = $1`, broadcastID) + if err != nil { + tx.Rollback() + return errors.Wrapf(err, "error deleting related contacts for broadcast: %d", broadcastID) + } + + // delete groups M2M + _, err = tx.Exec(`DELETE from msgs_broadcast_groups WHERE broadcast_id = $1`, broadcastID) + if err != nil { + tx.Rollback() + return errors.Wrapf(err, "error deleting related groups for broadcast: %d", broadcastID) + } + + // delete URNs M2M + _, err = tx.Exec(`DELETE from msgs_broadcast_urns WHERE broadcast_id = $1`, broadcastID) + if err != nil { + tx.Rollback() + return errors.Wrapf(err, "error deleting related urns for broadcast: %d", broadcastID) + } + + // delete counts associated with this broadcast + _, err = tx.Exec(`DELETE from msgs_broadcastmsgcount WHERE broadcast_id = $1`, broadcastID) + if err != nil { + tx.Rollback() + return errors.Wrapf(err, "error deleting counts for broadcast: %d", broadcastID) + } + + // finally, delete our broadcast + _, err = tx.Exec(`DELETE from msgs_broadcast WHERE id = $1`, broadcastID) + if err != nil { + tx.Rollback() + return errors.Wrapf(err, "error deleting broadcast: %d", broadcastID) + } + + err = tx.Commit() + if err != nil { + return errors.Wrapf(err, "error deleting broadcast: %d", broadcastID) + } + + count++ + } + + if count > 0 { + logrus.WithFields(logrus.Fields{ + "elapsed": time.Since(start), + "count": count, + "org_id": org.ID, + }).Info("completed deleting broadcasts") + } + + return nil +} diff --git a/archives/runs.go b/archives/runs.go new file mode 100644 index 0000000..c2e314e --- /dev/null +++ b/archives/runs.go @@ -0,0 +1,268 @@ +package archives + +import ( + "bufio" + "context" + "fmt" + "time" + + "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/jmoiron/sqlx" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +const lookupFlowRuns = ` +SELECT rec.exited_on, row_to_json(rec) +FROM ( + SELECT + fr.id as id, + fr.uuid as uuid, + row_to_json(flow_struct) AS flow, + row_to_json(contact_struct) AS contact, + fr.responded, + (SELECT coalesce(jsonb_agg(path_data), '[]'::jsonb) from ( + SELECT path_row ->> 'node_uuid' AS node, (path_row ->> 'arrived_on')::timestamptz as time + FROM jsonb_array_elements(fr.path::jsonb) AS path_row LIMIT 500) as path_data + ) as path, + (SELECT coalesce(jsonb_object_agg(values_data.key, values_data.value), '{}'::jsonb) from ( + SELECT key, jsonb_build_object('name', value -> 'name', 'value', value -> 'value', 'input', value -> 'input', 'time', (value -> 'created_on')::text::timestamptz, 'category', value -> 'category', 'node', value -> 'node_uuid') as value + FROM jsonb_each(fr.results::jsonb)) AS values_data + ) as values, + CASE + WHEN $1 + THEN '[]'::jsonb + ELSE + coalesce(fr.events, '[]'::jsonb) + END AS events, + fr.created_on, + fr.modified_on, + fr.exited_on, + CASE + WHEN exit_type = 'C' + THEN 'completed' + WHEN exit_type = 'I' + THEN 'interrupted' + WHEN exit_type = 'E' + THEN 'expired' + ELSE + null + END as exit_type, + a.username as submitted_by + + FROM flows_flowrun fr + LEFT JOIN auth_user a ON a.id = fr.submitted_by_id + JOIN LATERAL (SELECT uuid, name FROM flows_flow WHERE flows_flow.id = fr.flow_id) AS flow_struct ON True + JOIN LATERAL (SELECT uuid, name FROM contacts_contact cc WHERE cc.id = fr.contact_id) AS contact_struct ON True + + WHERE fr.org_id = $2 AND fr.modified_on >= $3 AND fr.modified_on < $4 + ORDER BY fr.modified_on ASC, id ASC +) as rec; +` + +// writeRunRecords writes the runs in the archive's date range to the passed in writer +func writeRunRecords(ctx context.Context, db *sqlx.DB, archive *Archive, writer *bufio.Writer) (int, error) { + var rows *sqlx.Rows + rows, err := db.QueryxContext(ctx, lookupFlowRuns, archive.Org.IsAnon, archive.Org.ID, archive.StartDate, archive.endDate()) + if err != nil { + return 0, errors.Wrapf(err, "error querying run records for org: %d", archive.Org.ID) + } + defer rows.Close() + + recordCount := 0 + var record string + var exitedOn *time.Time + for rows.Next() { + err = rows.Scan(&exitedOn, &record) + + // shouldn't be archiving an active run, that's an error + if exitedOn == nil { + return 0, fmt.Errorf("run still active, cannot archive: %s", record) + } + + if err != nil { + return 0, errors.Wrapf(err, "error scanning run record for org: %d", archive.Org.ID) + } + + writer.WriteString(record) + writer.WriteString("\n") + recordCount++ + } + + return recordCount, nil +} + +const selectOrgRunsInRange = ` +SELECT fr.id, fr.is_active +FROM flows_flowrun fr +LEFT JOIN contacts_contact cc ON cc.id = fr.contact_id +WHERE fr.org_id = $1 AND fr.modified_on >= $2 AND fr.modified_on < $3 +ORDER BY fr.modified_on ASC, fr.id ASC +` + +const setRunDeleteReason = ` +UPDATE flows_flowrun +SET delete_reason = 'A' +WHERE id IN(?) +` + +const deleteRecentRuns = ` +DELETE FROM flows_flowpathrecentrun +WHERE run_id IN(?) +` + +const unlinkParents = ` +UPDATE flows_flowrun +SET parent_id = NULL +WHERE parent_id IN(?) +` + +const deleteRuns = ` +DELETE FROM flows_flowrun +WHERE id IN(?) +` + +// DeleteArchivedRuns takes the passed in archive, verifies the S3 file is still present (and correct), then selects +// all the runs in the archive date range, and if equal or fewer than the number archived, deletes them 100 at a time +// +// Upon completion it updates the needs_deletion flag on the archive +func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Client s3iface.S3API, archive *Archive) error { + outer, cancel := context.WithTimeout(ctx, time.Hour*3) + defer cancel() + + start := time.Now() + log := logrus.WithFields(logrus.Fields{ + "id": archive.ID, + "org_id": archive.OrgID, + "start_date": archive.StartDate, + "end_date": archive.endDate(), + "archive_type": archive.ArchiveType, + "total_count": archive.RecordCount, + }) + log.Info("deleting runs") + + // first things first, make sure our file is present on S3 + md5, err := GetS3FileETAG(outer, s3Client, archive.URL) + if err != nil { + return err + } + + // if our etag and archive md5 don't match, that's an error, return + if md5 != archive.Hash { + return fmt.Errorf("archive md5: %s and s3 etag: %s do not match", archive.Hash, md5) + } + + // ok, archive file looks good, let's build up our list of run ids, this may be big but we are int64s so shouldn't be too big + rows, err := db.QueryxContext(outer, selectOrgRunsInRange, archive.OrgID, archive.StartDate, archive.endDate()) + if err != nil { + return err + } + defer rows.Close() + + var runID int64 + var isActive bool + runCount := 0 + runIDs := make([]int64, 0, archive.RecordCount) + for rows.Next() { + err = rows.Scan(&runID, &isActive) + if err != nil { + return err + } + + // if this run is still active, something has gone wrong, throw an error + if isActive { + return fmt.Errorf("run %d in archive is still active", runID) + } + + // increment our count + runCount++ + runIDs = append(runIDs, runID) + } + rows.Close() + + log.WithFields(logrus.Fields{ + "run_count": len(runIDs), + }).Debug("found runs") + + // verify we don't see more runs than there are in our archive (fewer is ok) + if runCount > archive.RecordCount { + return fmt.Errorf("more runs in the database: %d than in archive: %d", runCount, archive.RecordCount) + } + + // ok, delete our runs in batches, we do this in transactions as it spans a few different queries + for startIdx := 0; startIdx < len(runIDs); startIdx += deleteTransactionSize { + // no single batch should take more than a few minutes + ctx, cancel := context.WithTimeout(ctx, time.Minute*15) + defer cancel() + + start := time.Now() + + endIdx := startIdx + deleteTransactionSize + if endIdx > len(runIDs) { + endIdx = len(runIDs) + } + batchIDs := runIDs[startIdx:endIdx] + + // start our transaction + tx, err := db.BeginTxx(ctx, nil) + if err != nil { + return err + } + + // first update our delete_reason + err = executeInQuery(ctx, tx, setRunDeleteReason, batchIDs) + if err != nil { + return fmt.Errorf("error updating delete reason: %s", err.Error()) + } + + // any recent runs + err = executeInQuery(ctx, tx, deleteRecentRuns, batchIDs) + if err != nil { + return fmt.Errorf("error deleting recent runs: %s", err.Error()) + } + + // unlink any parents + err = executeInQuery(ctx, tx, unlinkParents, batchIDs) + if err != nil { + return fmt.Errorf("error unliking parent runs: %s", err.Error()) + } + + // finally, delete our runs + err = executeInQuery(ctx, tx, deleteRuns, batchIDs) + if err != nil { + return fmt.Errorf("error deleting runs: %s", err.Error()) + } + + // commit our transaction + err = tx.Commit() + if err != nil { + return fmt.Errorf("error committing run delete transaction: %s", err.Error()) + } + + log.WithFields(logrus.Fields{ + "elapsed": time.Since(start), + "count": len(batchIDs), + }).Debug("deleted batch of runs") + + cancel() + } + + outer, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() + + deletedOn := time.Now() + + // all went well! mark our archive as no longer needing deletion + _, err = db.ExecContext(outer, setArchiveDeleted, archive.ID, deletedOn) + if err != nil { + return fmt.Errorf("error setting archive as deleted: %s", err.Error()) + } + archive.NeedsDeletion = false + archive.DeletedOn = &deletedOn + + logrus.WithFields(logrus.Fields{ + "elapsed": time.Since(start), + }).Info("completed deleting runs") + + return nil +} diff --git a/s3.go b/archives/s3.go similarity index 99% rename from s3.go rename to archives/s3.go index 89f4392..2a755e6 100644 --- a/s3.go +++ b/archives/s3.go @@ -1,4 +1,4 @@ -package archiver +package archives import ( "context" diff --git a/testdata/messages1.jsonl b/archives/testdata/messages1.jsonl similarity index 100% rename from testdata/messages1.jsonl rename to archives/testdata/messages1.jsonl diff --git a/testdata/messages2.jsonl b/archives/testdata/messages2.jsonl similarity index 100% rename from testdata/messages2.jsonl rename to archives/testdata/messages2.jsonl diff --git a/testdata/runs1.jsonl b/archives/testdata/runs1.jsonl similarity index 100% rename from testdata/runs1.jsonl rename to archives/testdata/runs1.jsonl diff --git a/testdata/runs2.jsonl b/archives/testdata/runs2.jsonl similarity index 100% rename from testdata/runs2.jsonl rename to archives/testdata/runs2.jsonl diff --git a/cmd/rp-archiver/main.go b/cmd/rp-archiver/main.go index 230e614..c48f4d2 100644 --- a/cmd/rp-archiver/main.go +++ b/cmd/rp-archiver/main.go @@ -11,12 +11,12 @@ import ( "github.com/jmoiron/sqlx" _ "github.com/lib/pq" "github.com/nyaruka/ezconf" - archiver "github.com/nyaruka/rp-archiver" + "github.com/nyaruka/rp-archiver/archives" "github.com/sirupsen/logrus" ) func main() { - config := archiver.NewConfig() + config := archives.NewConfig() loader := ezconf.NewLoader(&config, "archiver", "Archives RapidPro runs and msgs to S3", []string{"archiver.toml"}) loader.MustLoad() @@ -67,14 +67,14 @@ func main() { var s3Client s3iface.S3API if config.UploadToS3 { - s3Client, err = archiver.NewS3Client(config) + s3Client, err = archives.NewS3Client(config) if err != nil { logrus.WithError(err).Fatal("unable to initialize s3 client") } } // ensure that we can actually write to the temp directory - err = archiver.EnsureTempArchiveDirectory(config.TempDir) + err = archives.EnsureTempArchiveDirectory(config.TempDir) if err != nil { logrus.WithError(err).Fatal("cannot write to temp directory") } @@ -91,7 +91,7 @@ func main() { // get our active orgs ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - orgs, err := archiver.GetActiveOrgs(ctx, db, config) + orgs, err := archives.GetActiveOrgs(ctx, db, config) cancel() if err != nil { @@ -107,15 +107,15 @@ func main() { log := logrus.WithField("org", org.Name).WithField("org_id", org.ID) if config.ArchiveMessages { - _, _, err = archiver.ArchiveOrg(ctx, time.Now(), config, db, s3Client, org, archiver.MessageType) + _, _, err = archives.ArchiveOrg(ctx, time.Now(), config, db, s3Client, org, archives.MessageType) if err != nil { - log.WithError(err).WithField("archive_type", archiver.MessageType).Error("error archiving org messages") + log.WithError(err).WithField("archive_type", archives.MessageType).Error("error archiving org messages") } } if config.ArchiveRuns { - _, _, err = archiver.ArchiveOrg(ctx, time.Now(), config, db, s3Client, org, archiver.RunType) + _, _, err = archives.ArchiveOrg(ctx, time.Now(), config, db, s3Client, org, archives.RunType) if err != nil { - log.WithError(err).WithField("archive_type", archiver.RunType).Error("error archiving org runs") + log.WithError(err).WithField("archive_type", archives.RunType).Error("error archiving org runs") } } diff --git a/docker/Dockerfile b/docker/Dockerfile index 3d5717b..94b7505 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.15.14-alpine3.14 +FROM golang:1.17.3-alpine3.14 WORKDIR /app diff --git a/go.mod b/go.mod index 4c79e66..686190b 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,6 @@ require ( github.com/kylelemons/godebug v1.1.0 // indirect github.com/lib/pq v1.0.0 github.com/nyaruka/ezconf v0.2.1 - github.com/onsi/ginkgo v1.10.3 // indirect - github.com/onsi/gomega v1.7.1 // indirect github.com/pkg/errors v0.8.1 github.com/pmezard/go-difflib v1.0.0 // indirect github.com/sirupsen/logrus v1.0.5 @@ -26,4 +24,12 @@ require ( gopkg.in/ini.v1 v1.51.0 // indirect ) -go 1.13 +require ( + github.com/fatih/structs v1.0.0 // indirect + github.com/naoina/go-stringutil v0.1.0 // indirect + github.com/naoina/toml v0.1.1 // indirect + golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 // indirect + golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a // indirect +) + +go 1.17 diff --git a/go.sum b/go.sum index c8a6694..4b700ac 100644 --- a/go.sum +++ b/go.sum @@ -8,21 +8,15 @@ github.com/evalphobia/logrus_sentry v0.4.5 h1:weRoBjojMYPp57TLDjPEkP58JVHHSiqNrx github.com/evalphobia/logrus_sentry v0.4.5/go.mod h1:pKcp+vriitUqu9KiWj/VRFbRfFNUwz95/UkgG8a6MNc= github.com/fatih/structs v1.0.0 h1:BrX964Rv5uQ3wwS+KRUAJCBBw5PQmgJfJ6v4yly5QwU= github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= -github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= -github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/getsentry/raven-go v0.0.0-20180430182053-263040ce1a36 h1:i93kN7TI/4T5Wx9hsBct/G1yCcZ9tLyWESh7QA7neaI= github.com/getsentry/raven-go v0.0.0-20180430182053-263040ce1a36/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ= github.com/go-ini/ini v1.36.0 h1:63En8accP8FKkFZ77ztSfvQf9kGRJN3qBIdItP46RRk= github.com/go-ini/ini v1.36.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-sql-driver/mysql v1.4.0 h1:7LxgVwFb2hIQtMm87NdgAVfXjnt4OePseqT1tKx+opk= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= -github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= -github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= -github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8 h1:12VvqtR6Aowv3l/EQUlocDHW2Cp4G9WJVH7uyH8QFJE= github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jmoiron/sqlx v1.2.0 h1:41Ip0zITnmWNR/vHV+S4m+VoUivnWY5E4OJfLZjCJMA= @@ -41,11 +35,6 @@ github.com/naoina/toml v0.1.1 h1:PT/lllxVVN0gzzSqSlHEmP8MJB4MY2U7STGxiouV4X8= github.com/naoina/toml v0.1.1/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E= github.com/nyaruka/ezconf v0.2.1 h1:TDXWoqjqYya1uhou1mAJZg7rgFYL98EB0Tb3+BWtUh0= github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDYYxgnQw= -github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.10.3 h1:OoxbjfXVZyod1fmWYhI7SEyaD8B00ynP3T+D5GiyHOY= -github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/gomega v1.7.1 h1:K0jcRCwNQM3vFGh1ppMtDh/+7ApJrjldlX8fA0jDTLQ= -github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -60,13 +49,10 @@ github.com/stretchr/testify v1.2.1 h1:52QO5WkIUcHGIR7EnGagH88x1bUzqGXTC5/1bDTUQ7 github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20191119073136-fc4aabc6c914 h1:MlY3mEfbnWGmUi4rtHOtNnnnN4UJRGSyLPx+DXA5Sq4= golang.org/x/net v0.0.0-20191119073136-fc4aabc6c914/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -78,15 +64,7 @@ google.golang.org/appengine v1.6.5 h1:tycE03LOZYQNhDpS27tcQdAzLCVMaj7QT2SXxebnpC google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= gopkg.in/airbrake/gobrake.v2 v2.0.9 h1:7z2uVWwn7oVeeugY1DtlPAy5H+KYgB1KeKTnqjNatLo= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= -gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 h1:OAj3g0cR6Dx/R07QgQe8wkA9RNjB2u4i700xBkIT4e0= gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo= gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno= gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= -gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= -gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/testdb.sql b/testdb.sql index b95eb8f..c4bdb2e 100644 --- a/testdb.sql +++ b/testdb.sql @@ -159,7 +159,6 @@ CREATE TABLE auth_user ( username character varying(128) NOT NULL ); -DROP TABLE IF EXISTS api_webhookresult CASCADE; DROP TABLE IF EXISTS api_webhookevent CASCADE; DROP TABLE IF EXISTS flows_flowpathrecentrun CASCADE; DROP TABLE IF EXISTS flows_actionlog CASCADE; @@ -167,7 +166,7 @@ DROP TABLE IF EXISTS flows_flowrun CASCADE; CREATE TABLE flows_flowrun ( id serial primary key, is_active boolean NOT NULL DEFAULT FALSE, - uuid character varying(36) NOT NULL, + uuid character varying(36) NOT NULL UNIQUE, responded boolean NOT NULL, contact_id integer NOT NULL references contacts_contact(id), flow_id integer NOT NULL references flows_flow(id), @@ -180,6 +179,7 @@ CREATE TABLE flows_flowrun ( modified_on timestamp with time zone NOT NULL, exited_on timestamp with time zone NULL, submitted_by_id integer NULL references auth_user(id), + status varchar(1) NOT NULL, exit_type varchar(1) NULL, delete_reason char(1) NULL ); @@ -307,25 +307,30 @@ INSERT INTO flows_flow(id, uuid, name) VALUES INSERT INTO auth_user(id, username) VALUES (1, 'greg@gmail.com'); -INSERT INTO flows_flowrun(id, uuid, responded, contact_id, flow_id, org_id, results, path, events, created_on, modified_on, exited_on, exit_type, parent_id, submitted_by_id) VALUES -(1, '4ced1260-9cfe-4b7f-81dd-b637108f15b9', TRUE, 6, 1, 2, '{}', '[]', '[]', '2017-08-12 21:11:59.890662+02:00','2017-08-12 21:11:59.890662+02:00','2017-08-12 21:11:59.890662+02:00', 'C', NULL, NULL), +INSERT INTO flows_flowrun(id, uuid, responded, contact_id, flow_id, org_id, results, path, events, created_on, modified_on, exited_on, status, exit_type, parent_id, submitted_by_id) VALUES +(1, '4ced1260-9cfe-4b7f-81dd-b637108f15b9', TRUE, 6, 1, 2, '{}', '[]', '[]', '2017-08-12 21:11:59.890662+02:00','2017-08-12 21:11:59.890662+02:00','2017-08-12 21:11:59.890662+02:00', 'C', 'C', NULL, NULL), (2, '7d68469c-0494-498a-bdf3-bac68321fd6d', TRUE, 6, 1, 2, '{"agree": {"category": "Strongly agree", "node_uuid": "a0434c54-3e26-4eb0-bafc-46cdeaf435ac", "name": "Do you agree?", "value": "A", "created_on": "2017-05-03T12:25:21.714339+00:00", "input": "A"}}', '[{"uuid": "c3d0b417-db75-417c-8050-33776ec8f620", "node_uuid": "10896d63-8df7-4022-88dd-a9d93edf355b", "arrived_on": "2017-08-12T15:07:24.049815+02:00", "exit_uuid": "2f890507-2ad2-4bd1-92fc-0ca031155fca"}]', '[{"msg": {"urn": "tel:+12076661212", "text": "hola", "uuid": "cf05c58f-31fb-4ce8-9e65-4ecc9fd47cbe", "channel": {"name": "1223", "uuid": "bbfe2e9c-cf69-4d0a-b42e-00ac3dc0b0b8"}}, "type": "msg_created", "step_uuid": "659cdae5-1f29-4a58-9437-10421f724268", "created_on": "2018-01-22T15:06:47.357682+00:00"}]', -'2017-08-12 21:11:59.890662+02:00','2017-08-12 21:11:59.890662+02:00','2017-08-12 21:11:59.890662+02:00', 'C', NULL, NULL), +'2017-08-12 21:11:59.890662+02:00','2017-08-12 21:11:59.890662+02:00','2017-08-12 21:11:59.890662+02:00', 'C', 'C', NULL, NULL), (3, 'de782b35-a398-46ed-8550-34c66053841b', TRUE, 7, 2, 3, '{"agree": {"category": "Strongly agree", "node_uuid": "084c8cf1-715d-4d0a-b38d-a616ed74e638", "name": "Agree", "value": "A", "created_on": "2017-05-03T12:25:21.714339+00:00", "input": "A"}, "confirm_agree": {"category": "Confirmed Strongly agree", "node_uuid": "a0434c54-3e26-4eb0-bafc-46cdeaf435ab", "name": "Do you agree?", "value": "A", "created_on": "2017-05-03T12:25:21.714339+00:00", "input": "A"}}', '[{"uuid": "600ac5b4-4895-4161-ad97-6e2f1bb48bcb", "node_uuid": "accbc6e2-b0df-46cd-9a76-bff0fdf4d753", "arrived_on": "2017-08-12T15:07:24.049815+02:00", "exit_uuid": "8249e2dc-c893-4200-b6d2-398d07a459bc"}]', '[{"msg": {"urn": "tel:+12076661212", "text": "hola", "uuid": "9ea50923-0888-4596-9a9d-4890994934a9", "channel": {"name": "1223", "uuid": "d6597e08-8285-428c-8e7e-97c68adfa073"}}, "type": "msg_created", "step_uuid": "ae067248-df92-41c8-bb29-92506e984259", "created_on": "2018-01-22T15:06:47.357682+00:00"}]', -'2017-08-10 21:11:59.890662+02:00','2017-08-10 21:11:59.890662+02:00','2017-08-10 21:11:59.890662+02:00', 'C', NULL, 1), -(4, 'de782b35-a398-46ed-8550-34c66053841b', TRUE, 7, 2, 3, +'2017-08-10 21:11:59.890662+02:00','2017-08-10 21:11:59.890662+02:00','2017-08-10 21:11:59.890662+02:00', 'C', 'C', NULL, 1), +(4, '329a5d24-64fc-479c-8d24-9674c9b46530', TRUE, 7, 2, 3, '{"agree": {"category": "Disagree", "node_uuid": "084c8cf1-715d-4d0a-b38d-a616ed74e638", "name": "Agree", "value": "B", "created_on": "2017-10-10T12:25:21.714339+00:00", "input": "B"}}', '[{"uuid": "babf4fc8-e12c-4bb9-a9dd-61178a118b5a", "node_uuid": "accbc6e2-b0df-46cd-9a76-bff0fdf4d753", "arrived_on": "2017-10-12T15:07:24.049815+02:00", "exit_uuid": "8249e2dc-c893-4200-b6d2-398d07a459bc"}]', '[{"msg": {"urn": "tel:+12076661212", "text": "hi hi", "uuid": "543d2c4b-ff0b-4b87-a9a4-b2d6745cf470", "channel": {"name": "1223", "uuid": "d6597e08-8285-428c-8e7e-97c68adfa073"}}, "type": "msg_created", "step_uuid": "3a5014dd-7b14-4b7a-be52-0419c09340a6", "created_on": "2018-10-12T15:06:47.357682+00:00"}]', -'2017-10-10 21:11:59.890662+02:00','2017-10-10 21:11:59.890662+02:00','2017-10-10 21:11:59.890662+02:00', 'C', NULL, NULL), -(6, '6262eefe-a6e9-4201-9b76-a7f25e3b7f29', TRUE, 7, 2, 3, '{}', '[]', '[]', -'2017-12-12 21:11:59.890662+02:00','2017-12-12 21:11:59.890662+02:00','2017-12-12 21:11:59.890662+02:00', 'C', 4, NULL); +'2017-10-10 21:11:59.890662+02:00','2017-10-10 21:11:59.890662+02:00','2017-10-10 21:11:59.890662+02:00', 'C', 'C', NULL, NULL), +(5, 'abed67d2-06b8-4749-8bb9-ecda037b673b', TRUE, 7, 2, 3, '{}', '[]', '[]', '2017-10-10 21:11:59.890663+02:00','2017-10-10 21:11:59.890662+02:00','2017-10-10 21:11:59.890662+02:00', 'C', 'C', NULL, NULL), +(6, '6262eefe-a6e9-4201-9b76-a7f25e3b7f29', TRUE, 7, 2, 3, '{}', '[]', '[]', '2017-12-12 21:11:59.890662+02:00','2017-12-12 21:11:59.890662+02:00','2017-12-12 21:11:59.890662+02:00', 'C', 'C', 4, NULL); INSERT INTO flows_flowpathrecentrun(id, run_id) VALUES -(1, 3); \ No newline at end of file +(1, 3); + +-- update run #5 to have a path longer than 500 steps +UPDATE flows_flowrun SET path = s.path FROM ( + SELECT json_agg(CONCAT('{"uuid": "babf4fc8-e12c-4bb9-a9dd-61178a118b5a", "node_uuid": "accbc6e2-b0df-46cd-9a76-bff0fdf4d753", "arrived_on": "2017-10-12T15:07:24.', LPAD(gs.val::text, 6, '0'), '+02:00", "exit_uuid": "8249e2dc-c893-4200-b6d2-398d07a459bc"}')::jsonb) as path FROM generate_series(1, 1000) as gs(val) +) AS s WHERE id = 5;