Skip to content

Commit

Permalink
Rework dependencies so that contacts package import indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Mar 23, 2022
1 parent 93c72b0 commit 133fbde
Show file tree
Hide file tree
Showing 10 changed files with 520 additions and 507 deletions.
17 changes: 13 additions & 4 deletions base.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,35 @@ import (

// Indexer is base interface for indexers
type Indexer interface {
Name() string
Index(db *sql.DB) error
}

type BaseIndexer struct {
name string // e.g. contacts, used as based index name
ElasticURL string
IndexName string // e.g. contacts
Rebuild bool // whether indexer should rebuild entire index in one pass
Cleanup bool // whether after rebuilding, indexer should cleanup old indexes
Rebuild bool // whether indexer should rebuild entire index in one pass
Cleanup bool // whether after rebuilding, indexer should cleanup old indexes

// statistics
indexedTotal int64
deletedTotal int64
elapsedTotal time.Duration
}

func NewBaseIndexer(name, elasticURL string, rebuild, cleanup bool) BaseIndexer {
return BaseIndexer{name: name, ElasticURL: elasticURL, Rebuild: rebuild, Cleanup: cleanup}
}

func (i *BaseIndexer) Name() string {
return i.name
}

// UpdateStats updates statistics for this indexer
func (i *BaseIndexer) UpdateStats(indexed, deleted int, elapsed time.Duration) {
i.indexedTotal += int64(indexed)
i.deletedTotal += int64(deleted)
i.elapsedTotal += elapsed

logrus.WithField("index", i.IndexName).WithField("indexed", indexed).WithField("deleted", deleted).WithField("elapsed", elapsed).Info("completed indexing")
logrus.WithField("indexer", i.name).WithField("indexed", indexed).WithField("deleted", deleted).WithField("elapsed", elapsed).Info("completed indexing")
}
92 changes: 92 additions & 0 deletions base_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package indexer_test

import (
"net/http"
"net/http/httptest"
"testing"

indexer "github.com/nyaruka/rp-indexer"
"github.com/stretchr/testify/require"
)

func TestRetryServer(t *testing.T) {
responseCounter := 0
responses := []func(w http.ResponseWriter, r *http.Request){
func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Length", "5")
},
func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Length", "1")
},
func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Length", "1")
},
func(w http.ResponseWriter, r *http.Request) {
resp := `{
"took": 1,
"timed_out": false,
"_shards": {
"total": 2,
"successful": 2,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1,
"max_score": null,
"hits": [
{
"_index": "rp_elastic_test_2020_08_14_1",
"_type": "_doc",
"_id": "1",
"_score": null,
"_routing": "1",
"_source": {
"id": 1,
"org_id": 1,
"uuid": "c7a2dd87-a80e-420b-8431-ca48d422e924",
"name": null,
"language": "eng",
"is_active": true,
"created_on": "2017-11-10T16:11:59.890662-05:00",
"modified_on": "2017-11-10T16:11:59.890662-05:00",
"last_seen_on": "2020-08-04T21:11:00-04:00",
"modified_on_mu": 1.510348319890662e15,
"urns": [
{
"scheme": "tel",
"path": "+12067791111"
},
{
"scheme": "tel",
"path": "+12067792222"
}
],
"fields": [
{
"text": "the rock",
"field": "17103bb1-1b48-4b70-92f7-1f6b73bd3488"
}
],
"groups": [
"4ea0f313-2f62-4e57-bdf0-232b5191dd57",
"529bac39-550a-4d6f-817c-1833f3449007"
]
},
"sort": [1]
}
]
}
}`

w.Write([]byte(resp))
},
}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
responses[responseCounter](w, r)
responseCounter++
}))
defer ts.Close()
indexer.FindPhysicalIndexes(ts.URL, "rp_elastic_test")
require.Equal(t, responseCounter, 4)
}
75 changes: 1 addition & 74 deletions cmd/rp-indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import (
"github.com/evalphobia/logrus_sentry"
_ "github.com/lib/pq"
"github.com/nyaruka/ezconf"
indexer "github.com/nyaruka/rp-indexer"
"github.com/nyaruka/rp-indexer/contacts"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -66,7 +64,7 @@ func main() {
log.Fatal(err)
}

ci := NewContactIndexer(config.ElasticURL, config.Index, config.Rebuild, config.Cleanup)
ci := contacts.NewIndexer(config.ElasticURL, config.Index, config.Rebuild, config.Cleanup)

for {
err := ci.Index(db)
Expand All @@ -88,74 +86,3 @@ func main() {
time.Sleep(time.Second * 5)
}
}

type ContactIndexer struct {
indexer.BaseIndexer
}

func NewContactIndexer(elasticURL, indexName string, rebuild, cleanup bool) indexer.Indexer {
return &ContactIndexer{
BaseIndexer: indexer.BaseIndexer{ElasticURL: elasticURL, IndexName: indexName, Rebuild: rebuild, Cleanup: cleanup},
}
}

func (i *ContactIndexer) Index(db *sql.DB) error {
var err error

// find our physical index
physicalIndexes := indexer.FindPhysicalIndexes(i.ElasticURL, i.IndexName)
log.WithField("physicalIndexes", physicalIndexes).WithField("index", i.IndexName).Debug("found physical indexes")

physicalIndex := ""
if len(physicalIndexes) > 0 {
physicalIndex = physicalIndexes[0]
}

// whether we need to remap our alias after building
remapAlias := false

// doesn't exist or we are rebuilding, create it
if physicalIndex == "" || i.Rebuild {
physicalIndex, err = indexer.CreateNewIndex(i.ElasticURL, i.IndexName, contacts.IndexSettings)
if err != nil {
return errors.Wrap(err, "error creating new index")
}
log.WithField("index", i.IndexName).WithField("physicalIndex", physicalIndex).Info("created new physical index")
remapAlias = true
}

lastModified, err := indexer.GetLastModified(i.ElasticURL, physicalIndex)
if err != nil {
return errors.Wrap(err, "error finding last modified")
}

log.WithField("last_modified", lastModified).WithField("index", physicalIndex).Info("indexing newer than last modified")

// now index our docs
start := time.Now()
indexed, deleted, err := indexer.IndexContacts(db, i.ElasticURL, physicalIndex, lastModified.Add(-5*time.Second))
if err != nil {
return errors.Wrap(err, "error indexing documents")
}

i.UpdateStats(indexed, deleted, time.Since(start))

// if the index didn't previously exist or we are rebuilding, remap to our alias
if remapAlias {
err := indexer.MapIndexAlias(i.ElasticURL, i.IndexName, physicalIndex)
if err != nil {
return errors.Wrap(err, "error remapping alias")
}
remapAlias = false
}

// cleanup our aliases if appropriate
if i.Cleanup {
err := indexer.CleanupIndexes(i.ElasticURL, i.IndexName)
if err != nil {
return errors.Wrap(err, "error cleaning up old indexes")
}
}

return nil
}
Loading

0 comments on commit 133fbde

Please sign in to comment.