From be02daf67790cfd149f510c77d229c9928b2cdb4 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Mon, 14 Feb 2022 13:51:04 -0500 Subject: [PATCH] Move a little more functionality into contacts package --- .github/workflows/ci.yml | 84 ++++++++++++++++++++-------------------- .gitignore | 4 +- contacts/query.go | 72 ++++++++++++++++++++++++++++++++++ contacts/settings.go | 7 ++++ deploy | 1 + indexer.go | 71 ++------------------------------- 6 files changed, 126 insertions(+), 113 deletions(-) create mode 100644 contacts/query.go create mode 100644 contacts/settings.go create mode 120000 deploy diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f591b51..5a4a1fd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,65 +1,65 @@ name: CI on: [push, pull_request] env: - go-version: '1.16.x' - es-version: '7.10.1' + go-version: "1.16.x" + es-version: "7.10.1" jobs: test: name: Test strategy: matrix: - pg-version: ['12', '13'] + pg-version: ["12", "13"] runs-on: ubuntu-latest steps: - - name: Checkout code - uses: actions/checkout@v1 + - name: Checkout code + uses: actions/checkout@v1 - - name: Install ElasticSearch - uses: nyaruka/elasticsearch-action@master - with: - elastic version: ${{ env.es-version }} + - name: Install ElasticSearch + uses: nyaruka/elasticsearch-action@master + with: + elastic version: ${{ env.es-version }} - - name: Install PostgreSQL - uses: harmon758/postgresql-action@v1 - with: - postgresql version: ${{ matrix.pg-version }} - postgresql db: elastic_test - postgresql user: temba - postgresql password: temba + - name: Install PostgreSQL + uses: harmon758/postgresql-action@v1 + with: + postgresql version: ${{ matrix.pg-version }} + postgresql db: elastic_test + postgresql user: nyaruka + postgresql password: nyaruka - - name: Install Go - uses: actions/setup-go@v1 - with: - go-version: ${{ env.go-version }} + - name: Install Go + uses: actions/setup-go@v1 + with: + go-version: ${{ env.go-version }} - - name: Run tests - run: go test -p=1 -coverprofile=coverage.text -covermode=atomic ./... + - name: Run tests + run: go test -p=1 -coverprofile=coverage.text -covermode=atomic ./... + + - name: Upload coverage + if: success() + uses: codecov/codecov-action@v1 + with: + token: ${{ secrets.CODECOV_TOKEN }} - - name: Upload coverage - if: success() - uses: codecov/codecov-action@v1 - with: - token: ${{ secrets.CODECOV_TOKEN }} - release: name: Release needs: [test] if: startsWith(github.ref, 'refs/tags/') runs-on: ubuntu-latest steps: - - name: Checkout code - uses: actions/checkout@v1 + - name: Checkout code + uses: actions/checkout@v1 - - name: Install Go - uses: actions/setup-go@v1 - with: - go-version: ${{ env.go-version }} + - name: Install Go + uses: actions/setup-go@v1 + with: + go-version: ${{ env.go-version }} - - name: Publish release - uses: goreleaser/goreleaser-action@v1 - with: - version: v0.147.2 - args: release --rm-dist - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - fail_ci_if_error: true + - name: Publish release + uses: goreleaser/goreleaser-action@v1 + with: + version: v0.147.2 + args: release --rm-dist + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + fail_ci_if_error: true diff --git a/.gitignore b/.gitignore index 11d130f..1446c34 100644 --- a/.gitignore +++ b/.gitignore @@ -5,9 +5,7 @@ *.dylib fabfile.py fabfile.pyc -fabconfig.py -fabconfig.pyc -fabric +deploy/ rp-indexer # Test binary, build with `go test -c` diff --git a/contacts/query.go b/contacts/query.go new file mode 100644 index 0000000..19eac21 --- /dev/null +++ b/contacts/query.go @@ -0,0 +1,72 @@ +package contacts + +import ( + "database/sql" + "time" +) + +const sqlSelectModified = ` +SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM ( + SELECT + id, org_id, uuid, name, language, status, ticket_count AS tickets, is_active, created_on, modified_on, last_seen_on, + EXTRACT(EPOCH FROM modified_on) * 1000000 as modified_on_mu, + ( + SELECT array_to_json(array_agg(row_to_json(u))) + FROM ( + SELECT scheme, path + FROM contacts_contacturn + WHERE contact_id = contacts_contact.id + ) u + ) as urns, + ( + SELECT jsonb_agg(f.value) + FROM ( + select case + when value ? 'ward' + then jsonb_build_object( + 'ward_keyword', trim(substring(value ->> 'ward' from '(?!.* > )([^>]+)')) + ) + else '{}' :: jsonb + end || district_value.value as value + FROM ( + select case + when value ? 'district' + then jsonb_build_object( + 'district_keyword', trim(substring(value ->> 'district' from '(?!.* > )([^>]+)')) + ) + else '{}' :: jsonb + end || state_value.value as value + FROM ( + + select case + when value ? 'state' + then jsonb_build_object( + 'state_keyword', trim(substring(value ->> 'state' from '(?!.* > )([^>]+)')) + ) + else '{}' :: jsonb + end || + jsonb_build_object('field', key) || value as value + from jsonb_each(contacts_contact.fields) + ) state_value + ) as district_value + ) as f + ) as fields, + ( + SELECT array_to_json(array_agg(g.uuid)) + FROM ( + SELECT contacts_contactgroup.uuid + FROM contacts_contactgroup_contacts, contacts_contactgroup + WHERE contact_id = contacts_contact.id AND + contacts_contactgroup_contacts.contactgroup_id = contacts_contactgroup.id + ) g + ) as groups + FROM contacts_contact + WHERE modified_on >= $1 + ORDER BY modified_on ASC + LIMIT 500000 +) t; +` + +func FetchModified(db *sql.DB, lastModified time.Time) (*sql.Rows, error) { + return db.Query(sqlSelectModified, lastModified) +} diff --git a/contacts/settings.go b/contacts/settings.go new file mode 100644 index 0000000..89dd5a9 --- /dev/null +++ b/contacts/settings.go @@ -0,0 +1,7 @@ +package contacts + +import _ "embed" + +// settings and mappings for our index +//go:embed index_settings.json +var IndexSettings string diff --git a/deploy b/deploy new file mode 120000 index 0000000..3ebc6a1 --- /dev/null +++ b/deploy @@ -0,0 +1 @@ +../utils/deploy \ No newline at end of file diff --git a/indexer.go b/indexer.go index be30942..d80f04d 100644 --- a/indexer.go +++ b/indexer.go @@ -13,6 +13,7 @@ import ( "time" "github.com/nyaruka/gocommon/httpx" + "github.com/nyaruka/rp-indexer/contacts" log "github.com/sirupsen/logrus" ) @@ -86,7 +87,7 @@ func CreateNewIndex(url string, alias string) (string, error) { // initialize our index createURL := fmt.Sprintf("%s/%s?include_type_name=true", url, physicalIndex) - _, err := MakeJSONRequest(http.MethodPut, createURL, indexSettings, nil) + _, err := MakeJSONRequest(http.MethodPut, createURL, contacts.IndexSettings, nil) if err != nil { return "", err } @@ -265,7 +266,7 @@ func IndexContacts(db *sql.DB, elasticURL string, index string, lastModified tim start := time.Now() for { - rows, err := db.Query(contactQuery, lastModified) + rows, err := contacts.FetchModified(db, lastModified) queryCreated := 0 queryCount := 0 @@ -381,72 +382,6 @@ func MapIndexAlias(elasticURL string, alias string, newIndex string) error { return err } -const contactQuery = ` -SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM ( - SELECT - id, org_id, uuid, name, language, status, ticket_count AS tickets, is_active, created_on, modified_on, last_seen_on, - EXTRACT(EPOCH FROM modified_on) * 1000000 as modified_on_mu, - ( - SELECT array_to_json(array_agg(row_to_json(u))) - FROM ( - SELECT scheme, path - FROM contacts_contacturn - WHERE contact_id = contacts_contact.id - ) u - ) as urns, - ( - SELECT jsonb_agg(f.value) - FROM ( - select case - when value ? 'ward' - then jsonb_build_object( - 'ward_keyword', trim(substring(value ->> 'ward' from '(?!.* > )([^>]+)')) - ) - else '{}' :: jsonb - end || district_value.value as value - FROM ( - select case - when value ? 'district' - then jsonb_build_object( - 'district_keyword', trim(substring(value ->> 'district' from '(?!.* > )([^>]+)')) - ) - else '{}' :: jsonb - end || state_value.value as value - FROM ( - - select case - when value ? 'state' - then jsonb_build_object( - 'state_keyword', trim(substring(value ->> 'state' from '(?!.* > )([^>]+)')) - ) - else '{}' :: jsonb - end || - jsonb_build_object('field', key) || value as value - from jsonb_each(contacts_contact.fields) - ) state_value - ) as district_value - ) as f - ) as fields, - ( - SELECT array_to_json(array_agg(g.uuid)) - FROM ( - SELECT contacts_contactgroup.uuid - FROM contacts_contactgroup_contacts, contacts_contactgroup - WHERE contact_id = contacts_contact.id AND - contacts_contactgroup_contacts.contactgroup_id = contacts_contactgroup.id - ) g - ) as groups - FROM contacts_contact - WHERE modified_on >= $1 - ORDER BY modified_on ASC - LIMIT 500000 -) t; -` - -// settings and mappings for our index -//go:embed contacts/index_settings.json -var indexSettings string - // gets our last modified contact const lastModifiedQuery = `{ "sort": [{ "modified_on_mu": "desc" }]}`