Skip to content

Commit

Permalink
Move a little more functionality into contacts package
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Feb 14, 2022
1 parent 90bc8c1 commit be02daf
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 113 deletions.
84 changes: 42 additions & 42 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,65 +1,65 @@
name: CI
on: [push, pull_request]
env:
go-version: '1.16.x'
es-version: '7.10.1'
go-version: "1.16.x"
es-version: "7.10.1"
jobs:
test:
name: Test
strategy:
matrix:
pg-version: ['12', '13']
pg-version: ["12", "13"]
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v1
- name: Checkout code
uses: actions/checkout@v1

- name: Install ElasticSearch
uses: nyaruka/elasticsearch-action@master
with:
elastic version: ${{ env.es-version }}
- name: Install ElasticSearch
uses: nyaruka/elasticsearch-action@master
with:
elastic version: ${{ env.es-version }}

- name: Install PostgreSQL
uses: harmon758/postgresql-action@v1
with:
postgresql version: ${{ matrix.pg-version }}
postgresql db: elastic_test
postgresql user: temba
postgresql password: temba
- name: Install PostgreSQL
uses: harmon758/postgresql-action@v1
with:
postgresql version: ${{ matrix.pg-version }}
postgresql db: elastic_test
postgresql user: nyaruka
postgresql password: nyaruka

- name: Install Go
uses: actions/setup-go@v1
with:
go-version: ${{ env.go-version }}
- name: Install Go
uses: actions/setup-go@v1
with:
go-version: ${{ env.go-version }}

- name: Run tests
run: go test -p=1 -coverprofile=coverage.text -covermode=atomic ./...
- name: Run tests
run: go test -p=1 -coverprofile=coverage.text -covermode=atomic ./...

- name: Upload coverage
if: success()
uses: codecov/codecov-action@v1
with:
token: ${{ secrets.CODECOV_TOKEN }}

- name: Upload coverage
if: success()
uses: codecov/codecov-action@v1
with:
token: ${{ secrets.CODECOV_TOKEN }}

release:
name: Release
needs: [test]
if: startsWith(github.ref, 'refs/tags/')
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v1
- name: Checkout code
uses: actions/checkout@v1

- name: Install Go
uses: actions/setup-go@v1
with:
go-version: ${{ env.go-version }}
- name: Install Go
uses: actions/setup-go@v1
with:
go-version: ${{ env.go-version }}

- name: Publish release
uses: goreleaser/goreleaser-action@v1
with:
version: v0.147.2
args: release --rm-dist
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
fail_ci_if_error: true
- name: Publish release
uses: goreleaser/goreleaser-action@v1
with:
version: v0.147.2
args: release --rm-dist
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
fail_ci_if_error: true
4 changes: 1 addition & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
*.dylib
fabfile.py
fabfile.pyc
fabconfig.py
fabconfig.pyc
fabric
deploy/
rp-indexer

# Test binary, build with `go test -c`
Expand Down
72 changes: 72 additions & 0 deletions contacts/query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package contacts

import (
"database/sql"
"time"
)

const sqlSelectModified = `
SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM (
SELECT
id, org_id, uuid, name, language, status, ticket_count AS tickets, is_active, created_on, modified_on, last_seen_on,
EXTRACT(EPOCH FROM modified_on) * 1000000 as modified_on_mu,
(
SELECT array_to_json(array_agg(row_to_json(u)))
FROM (
SELECT scheme, path
FROM contacts_contacturn
WHERE contact_id = contacts_contact.id
) u
) as urns,
(
SELECT jsonb_agg(f.value)
FROM (
select case
when value ? 'ward'
then jsonb_build_object(
'ward_keyword', trim(substring(value ->> 'ward' from '(?!.* > )([^>]+)'))
)
else '{}' :: jsonb
end || district_value.value as value
FROM (
select case
when value ? 'district'
then jsonb_build_object(
'district_keyword', trim(substring(value ->> 'district' from '(?!.* > )([^>]+)'))
)
else '{}' :: jsonb
end || state_value.value as value
FROM (
select case
when value ? 'state'
then jsonb_build_object(
'state_keyword', trim(substring(value ->> 'state' from '(?!.* > )([^>]+)'))
)
else '{}' :: jsonb
end ||
jsonb_build_object('field', key) || value as value
from jsonb_each(contacts_contact.fields)
) state_value
) as district_value
) as f
) as fields,
(
SELECT array_to_json(array_agg(g.uuid))
FROM (
SELECT contacts_contactgroup.uuid
FROM contacts_contactgroup_contacts, contacts_contactgroup
WHERE contact_id = contacts_contact.id AND
contacts_contactgroup_contacts.contactgroup_id = contacts_contactgroup.id
) g
) as groups
FROM contacts_contact
WHERE modified_on >= $1
ORDER BY modified_on ASC
LIMIT 500000
) t;
`

func FetchModified(db *sql.DB, lastModified time.Time) (*sql.Rows, error) {
return db.Query(sqlSelectModified, lastModified)
}
7 changes: 7 additions & 0 deletions contacts/settings.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package contacts

import _ "embed"

// settings and mappings for our index
//go:embed index_settings.json
var IndexSettings string
1 change: 1 addition & 0 deletions deploy
71 changes: 3 additions & 68 deletions indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/nyaruka/gocommon/httpx"
"github.com/nyaruka/rp-indexer/contacts"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -86,7 +87,7 @@ func CreateNewIndex(url string, alias string) (string, error) {

// initialize our index
createURL := fmt.Sprintf("%s/%s?include_type_name=true", url, physicalIndex)
_, err := MakeJSONRequest(http.MethodPut, createURL, indexSettings, nil)
_, err := MakeJSONRequest(http.MethodPut, createURL, contacts.IndexSettings, nil)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -265,7 +266,7 @@ func IndexContacts(db *sql.DB, elasticURL string, index string, lastModified tim
start := time.Now()

for {
rows, err := db.Query(contactQuery, lastModified)
rows, err := contacts.FetchModified(db, lastModified)

queryCreated := 0
queryCount := 0
Expand Down Expand Up @@ -381,72 +382,6 @@ func MapIndexAlias(elasticURL string, alias string, newIndex string) error {
return err
}

const contactQuery = `
SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM (
SELECT
id, org_id, uuid, name, language, status, ticket_count AS tickets, is_active, created_on, modified_on, last_seen_on,
EXTRACT(EPOCH FROM modified_on) * 1000000 as modified_on_mu,
(
SELECT array_to_json(array_agg(row_to_json(u)))
FROM (
SELECT scheme, path
FROM contacts_contacturn
WHERE contact_id = contacts_contact.id
) u
) as urns,
(
SELECT jsonb_agg(f.value)
FROM (
select case
when value ? 'ward'
then jsonb_build_object(
'ward_keyword', trim(substring(value ->> 'ward' from '(?!.* > )([^>]+)'))
)
else '{}' :: jsonb
end || district_value.value as value
FROM (
select case
when value ? 'district'
then jsonb_build_object(
'district_keyword', trim(substring(value ->> 'district' from '(?!.* > )([^>]+)'))
)
else '{}' :: jsonb
end || state_value.value as value
FROM (
select case
when value ? 'state'
then jsonb_build_object(
'state_keyword', trim(substring(value ->> 'state' from '(?!.* > )([^>]+)'))
)
else '{}' :: jsonb
end ||
jsonb_build_object('field', key) || value as value
from jsonb_each(contacts_contact.fields)
) state_value
) as district_value
) as f
) as fields,
(
SELECT array_to_json(array_agg(g.uuid))
FROM (
SELECT contacts_contactgroup.uuid
FROM contacts_contactgroup_contacts, contacts_contactgroup
WHERE contact_id = contacts_contact.id AND
contacts_contactgroup_contacts.contactgroup_id = contacts_contactgroup.id
) g
) as groups
FROM contacts_contact
WHERE modified_on >= $1
ORDER BY modified_on ASC
LIMIT 500000
) t;
`

// settings and mappings for our index
//go:embed contacts/index_settings.json
var indexSettings string

// gets our last modified contact
const lastModifiedQuery = `{ "sort": [{ "modified_on_mu": "desc" }]}`

Expand Down

0 comments on commit be02daf

Please sign in to comment.