diff --git a/base.go b/base.go index 397baf9..eeb3b73 100644 --- a/base.go +++ b/base.go @@ -16,12 +16,13 @@ import ( // Indexer is base interface for indexers type Indexer interface { Name() string - Index(db *sql.DB, rebuild, cleanup bool) error + Index(db *sql.DB, rebuild, cleanup bool) (string, error) + Stats() (int64, int64, time.Duration) } type BaseIndexer struct { - name string // e.g. contacts, used as based index name - ElasticURL string + elasticURL string + name string // e.g. contacts, used as the alias // statistics indexedTotal int64 @@ -29,14 +30,18 @@ type BaseIndexer struct { elapsedTotal time.Duration } -func NewBaseIndexer(name, elasticURL string) BaseIndexer { - return BaseIndexer{name: name, ElasticURL: elasticURL} +func NewBaseIndexer(elasticURL, name string) BaseIndexer { + return BaseIndexer{elasticURL: elasticURL, name: name} } func (i *BaseIndexer) Name() string { return i.name } +func (i *BaseIndexer) Log() *logrus.Entry { + return logrus.WithField("indexer", i.name) +} + func (i *BaseIndexer) Stats() (int64, int64, time.Duration) { return i.indexedTotal, i.deletedTotal, i.elapsedTotal } @@ -47,16 +52,16 @@ func (i *BaseIndexer) RecordComplete(indexed, deleted int, elapsed time.Duration i.deletedTotal += int64(deleted) i.elapsedTotal += elapsed - logrus.WithField("indexer", i.name).WithField("indexed", indexed).WithField("deleted", deleted).WithField("elapsed", elapsed).Info("completed indexing") + i.Log().WithField("indexed", indexed).WithField("deleted", deleted).WithField("elapsed", elapsed).Info("completed indexing") } // our response for figuring out the physical index for an alias type infoResponse map[string]interface{} -// FindPhysicalIndexes finds all our physical indexes -func (i *BaseIndexer) FindPhysicalIndexes() []string { +// FindIndexes finds all our physical indexes +func (i *BaseIndexer) FindIndexes() []string { response := infoResponse{} - _, err := MakeJSONRequest(http.MethodGet, fmt.Sprintf("%s/%s", i.ElasticURL, i.name), nil, &response) + _, err := makeJSONRequest(http.MethodGet, fmt.Sprintf("%s/%s", i.elasticURL, i.name), nil, &response) indexes := make([]string, 0) // error could mean a variety of things, but we'll figure that out later @@ -72,7 +77,7 @@ func (i *BaseIndexer) FindPhysicalIndexes() []string { // reverse sort order should put our newest index first sort.Sort(sort.Reverse(sort.StringSlice(indexes))) - logrus.WithField("indexer", i.name).WithField("indexes", indexes).Debug("found physical indexes") + i.Log().WithField("indexes", indexes).Debug("found physical indexes") return indexes } @@ -91,7 +96,7 @@ func (i *BaseIndexer) CreateNewIndex(settings json.RawMessage) (string, error) { // check if it exists for { - resp, err := http.Get(fmt.Sprintf("%s/%s", i.ElasticURL, index)) + resp, err := http.Get(fmt.Sprintf("%s/%s", i.elasticURL, index)) if err != nil { return "", err } @@ -106,13 +111,13 @@ func (i *BaseIndexer) CreateNewIndex(settings json.RawMessage) (string, error) { } // create the new index - _, err := MakeJSONRequest(http.MethodPut, fmt.Sprintf("%s/%s?include_type_name=true", i.ElasticURL, index), settings, nil) + _, err := makeJSONRequest(http.MethodPut, fmt.Sprintf("%s/%s?include_type_name=true", i.elasticURL, index), settings, nil) if err != nil { return "", err } // all went well, return our physical index name - logrus.WithField("indexer", i.name).WithField("index", index).Info("created new index") + i.Log().WithField("index", index).Info("created new index") return index, nil } @@ -144,7 +149,7 @@ func (i *BaseIndexer) UpdateAlias(newIndex string) error { commands := make([]interface{}, 0) // find existing physical indexes - existing := i.FindPhysicalIndexes() + existing := i.FindIndexes() for _, idx := range existing { remove := removeAliasCommand{} remove.Remove.Alias = i.name @@ -162,9 +167,9 @@ func (i *BaseIndexer) UpdateAlias(newIndex string) error { aliasJSON := jsonx.MustMarshal(aliasCommand{Actions: commands}) - _, err := MakeJSONRequest(http.MethodPost, fmt.Sprintf("%s/_aliases", i.ElasticURL), aliasJSON, nil) + _, err := makeJSONRequest(http.MethodPost, fmt.Sprintf("%s/_aliases", i.elasticURL), aliasJSON, nil) - logrus.WithField("indexer", i.name).WithField("index", newIndex).Debug("adding new alias") + i.Log().WithField("index", newIndex).Info("updated alias") return err } @@ -179,7 +184,7 @@ type healthResponse struct { // CleanupIndexes removes all indexes that are older than the currently active index func (i *BaseIndexer) CleanupIndexes() error { // find our current indexes - currents := i.FindPhysicalIndexes() + currents := i.FindIndexes() // no current indexes? this a noop if len(currents) == 0 { @@ -188,7 +193,7 @@ func (i *BaseIndexer) CleanupIndexes() error { // find all the current indexes healthResponse := healthResponse{} - _, err := MakeJSONRequest(http.MethodGet, fmt.Sprintf("%s/%s", i.ElasticURL, "_cluster/health?level=indices"), nil, &healthResponse) + _, err := makeJSONRequest(http.MethodGet, fmt.Sprintf("%s/%s", i.elasticURL, "_cluster/health?level=indices"), nil, &healthResponse) if err != nil { return err } @@ -197,7 +202,7 @@ func (i *BaseIndexer) CleanupIndexes() error { for key := range healthResponse.Indices { if strings.HasPrefix(key, i.name) && strings.Compare(key, currents[0]) < 0 { logrus.WithField("index", key).Info("removing old index") - _, err = MakeJSONRequest(http.MethodDelete, fmt.Sprintf("%s/%s", i.ElasticURL, key), nil, nil) + _, err = makeJSONRequest(http.MethodDelete, fmt.Sprintf("%s/%s", i.elasticURL, key), nil, nil) if err != nil { return err } @@ -225,9 +230,9 @@ type indexResponse struct { // indexes the batch of contacts func (i *BaseIndexer) IndexBatch(index string, batch []byte) (int, int, error) { response := indexResponse{} - indexURL := fmt.Sprintf("%s/%s/_bulk", i.ElasticURL, index) + indexURL := fmt.Sprintf("%s/%s/_bulk", i.elasticURL, index) - _, err := MakeJSONRequest(http.MethodPut, indexURL, batch, &response) + _, err := makeJSONRequest(http.MethodPut, indexURL, batch, &response) if err != nil { return 0, 0, err } @@ -258,3 +263,35 @@ func (i *BaseIndexer) IndexBatch(index string, batch []byte) (int, int, error) { return createdCount, deletedCount, nil } + +// our response for finding the last modified document +type queryResponse struct { + Hits struct { + Total struct { + Value int `json:"value"` + } `json:"total"` + Hits []struct { + Source struct { + ID int64 `json:"id"` + ModifiedOn time.Time `json:"modified_on"` + } `json:"_source"` + } `json:"hits"` + } `json:"hits"` +} + +// GetLastModified queries a concrete index and finds the last modified document, returning its modified time +func (i *BaseIndexer) GetLastModified(index string) (time.Time, error) { + lastModified := time.Time{} + + // get the newest document on our index + queryResponse := queryResponse{} + _, err := makeJSONRequest(http.MethodPost, fmt.Sprintf("%s/%s/_search", i.elasticURL, index), []byte(`{ "sort": [{ "modified_on_mu": "desc" }]}`), &queryResponse) + if err != nil { + return lastModified, err + } + + if len(queryResponse.Hits.Hits) > 0 { + lastModified = queryResponse.Hits.Hits[0].Source.ModifiedOn + } + return lastModified, nil +} diff --git a/base_test.go b/base_test.go index e49be40..9327622 100644 --- a/base_test.go +++ b/base_test.go @@ -88,8 +88,8 @@ func TestRetryServer(t *testing.T) { })) defer ts.Close() - ci := contacts.NewIndexer("rp_elastic_test", ts.URL) - ci.FindPhysicalIndexes() + ci := contacts.NewIndexer(ts.URL, "rp_elastic_test", 500) + ci.FindIndexes() require.Equal(t, responseCounter, 4) } diff --git a/cmd/rp-indexer/main.go b/cmd/rp-indexer/main.go index bc4688e..b50f89e 100644 --- a/cmd/rp-indexer/main.go +++ b/cmd/rp-indexer/main.go @@ -64,10 +64,10 @@ func main() { log.Fatal(err) } - ci := contacts.NewIndexer(config.ElasticURL, config.Index) + ci := contacts.NewIndexer(config.ElasticURL, config.Index, 500) for { - err := ci.Index(db, config.Rebuild, config.Cleanup) + _, err := ci.Index(db, config.Rebuild, config.Cleanup) if err != nil { if config.Rebuild { diff --git a/contacts/indexer.go b/contacts/indexer.go index 6905cf5..df3f581 100644 --- a/contacts/indexer.go +++ b/contacts/indexer.go @@ -13,10 +13,8 @@ import ( "github.com/sirupsen/logrus" ) -var BatchSize = 500 - //go:embed index_settings.json -var IndexSettings json.RawMessage +var indexSettings json.RawMessage // indexes a contact const indexCommand = `{ "index": { "_id": %d, "_type": "_doc", "version": %d, "version_type": "external", "routing": %d} }` @@ -27,20 +25,24 @@ const deleteCommand = `{ "delete" : { "_id": %d, "_type": "_doc", "version": %d, // ContactIndexer is an indexer for contacts type Indexer struct { indexer.BaseIndexer + + batchSize int } // NewIndexer creates a new contact indexer -func NewIndexer(name, elasticURL string) *Indexer { +func NewIndexer(elasticURL, name string, batchSize int) *Indexer { return &Indexer{ - BaseIndexer: indexer.NewBaseIndexer(name, elasticURL), + BaseIndexer: indexer.NewBaseIndexer(elasticURL, name), + batchSize: batchSize, } } -func (i *Indexer) Index(db *sql.DB, rebuild, cleanup bool) error { +// Index indexes modified contacts and returns the name of the concrete index +func (i *Indexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error) { var err error // find our physical index - physicalIndexes := i.FindPhysicalIndexes() + physicalIndexes := i.FindIndexes() physicalIndex := "" if len(physicalIndexes) > 0 { @@ -52,26 +54,26 @@ func (i *Indexer) Index(db *sql.DB, rebuild, cleanup bool) error { // doesn't exist or we are rebuilding, create it if physicalIndex == "" || rebuild { - physicalIndex, err = i.CreateNewIndex(IndexSettings) + physicalIndex, err = i.CreateNewIndex(indexSettings) if err != nil { - return errors.Wrap(err, "error creating new index") + return "", errors.Wrap(err, "error creating new index") } - logrus.WithField("indexer", i.Name()).WithField("index", physicalIndex).Info("created new physical index") + i.Log().WithField("index", physicalIndex).Info("created new physical index") remapAlias = true } - lastModified, err := indexer.GetLastModified(i.ElasticURL, physicalIndex) + lastModified, err := i.GetLastModified(physicalIndex) if err != nil { - return errors.Wrap(err, "error finding last modified") + return "", errors.Wrap(err, "error finding last modified") } - logrus.WithField("indexer", i.Name()).WithField("index", physicalIndex).WithField("last_modified", lastModified).Info("indexing newer than last modified") + i.Log().WithField("index", physicalIndex).WithField("last_modified", lastModified).Info("indexing newer than last modified") // now index our docs start := time.Now() - indexed, deleted, err := i.IndexModified(db, physicalIndex, lastModified.Add(-5*time.Second)) + indexed, deleted, err := i.indexModified(db, physicalIndex, lastModified.Add(-5*time.Second)) if err != nil { - return errors.Wrap(err, "error indexing documents") + return "", errors.Wrap(err, "error indexing documents") } i.RecordComplete(indexed, deleted, time.Since(start)) @@ -80,7 +82,7 @@ func (i *Indexer) Index(db *sql.DB, rebuild, cleanup bool) error { if remapAlias { err := i.UpdateAlias(physicalIndex) if err != nil { - return errors.Wrap(err, "error remapping alias") + return "", errors.Wrap(err, "error updating alias") } remapAlias = false } @@ -89,22 +91,18 @@ func (i *Indexer) Index(db *sql.DB, rebuild, cleanup bool) error { if cleanup { err := i.CleanupIndexes() if err != nil { - return errors.Wrap(err, "error cleaning up old indexes") + return "", errors.Wrap(err, "error cleaning up old indexes") } } - return nil + return physicalIndex, nil } // IndexModified queries and indexes all contacts with a lastModified greater than or equal to the passed in time -func (i *Indexer) IndexModified(db *sql.DB, index string, lastModified time.Time) (int, int, error) { +func (i *Indexer) indexModified(db *sql.DB, index string, lastModified time.Time) (int, int, error) { batch := &bytes.Buffer{} createdCount, deletedCount, processedCount := 0, 0, 0 - if index == "" { - return 0, 0, errors.New("empty index") - } - var modifiedOn time.Time var contactJSON string var id, orgID int64 @@ -140,18 +138,20 @@ func (i *Indexer) IndexModified(db *sql.DB, index string, lastModified time.Time if isActive { logrus.WithField("id", id).WithField("modifiedOn", modifiedOn).WithField("contact", contactJSON).Debug("modified contact") + batch.WriteString(fmt.Sprintf(indexCommand, id, modifiedOn.UnixNano(), orgID)) batch.WriteString("\n") batch.WriteString(contactJSON) batch.WriteString("\n") } else { logrus.WithField("id", id).WithField("modifiedOn", modifiedOn).Debug("deleted contact") + batch.WriteString(fmt.Sprintf(deleteCommand, id, modifiedOn.UnixNano(), orgID)) batch.WriteString("\n") } // write to elastic search in batches - if queryCount%BatchSize == 0 { + if queryCount%i.batchSize == 0 { created, deleted, err := i.IndexBatch(index, batch.Bytes()) if err != nil { return 0, 0, err @@ -181,16 +181,12 @@ func (i *Indexer) IndexModified(db *sql.DB, index string, lastModified time.Time break } + rows.Close() + elapsed := time.Since(start) rate := float32(processedCount) / (float32(elapsed) / float32(time.Second)) - logrus.WithFields(map[string]interface{}{ - "rate": int(rate), - "added": createdCount, - "deleted": deletedCount, - "elapsed": elapsed, - "index": index}).Info("updated contact index") - rows.Close() + i.Log().WithField("index", index).WithFields(logrus.Fields{"rate": int(rate), "added": createdCount, "deleted": deletedCount, "elapsed": elapsed}).Info("indexed contact batch") } return createdCount, deletedCount, nil diff --git a/contacts/indexer_test.go b/contacts/indexer_test.go index a355617..57d6754 100644 --- a/contacts/indexer_test.go +++ b/contacts/indexer_test.go @@ -6,12 +6,15 @@ import ( "fmt" "io/ioutil" "log" - "net/http" "os" + "sort" + "strconv" + "strings" "testing" "time" _ "github.com/lib/pq" + "github.com/nyaruka/gocommon/jsonx" indexer "github.com/nyaruka/rp-indexer" "github.com/nyaruka/rp-indexer/contacts" "github.com/olivere/elastic/v7" @@ -21,7 +24,7 @@ import ( ) const elasticURL = "http://localhost:9200" -const indexName = "rp_elastic_test" +const aliasName = "indexer_test" func setup(t *testing.T) (*sql.DB, *elastic.Client) { testDB, err := ioutil.ReadFile("../testdb.sql") @@ -33,278 +36,308 @@ func setup(t *testing.T) (*sql.DB, *elastic.Client) { _, err = db.Exec(string(testDB)) require.NoError(t, err) - client, err := elastic.NewClient(elastic.SetURL(elasticURL), elastic.SetTraceLog(log.New(os.Stdout, "", log.LstdFlags)), elastic.SetSniff(false)) + es, err := elastic.NewClient(elastic.SetURL(elasticURL), elastic.SetTraceLog(log.New(os.Stdout, "", log.LstdFlags)), elastic.SetSniff(false)) require.NoError(t, err) - ci := contacts.NewIndexer(indexName, elasticURL) + // delete all indexes with our alias prefix + existing, err := es.IndexNames() + require.NoError(t, err) - existing := ci.FindPhysicalIndexes() - for _, idx := range existing { - _, err = client.DeleteIndex(idx).Do(context.Background()) - require.NoError(t, err) + for _, name := range existing { + if strings.HasPrefix(name, aliasName) { + _, err = es.DeleteIndex(name).Do(context.Background()) + require.NoError(t, err) + } } logrus.SetLevel(logrus.DebugLevel) - return db, client + return db, es } -func assertQuery(t *testing.T, client *elastic.Client, index string, query elastic.Query, hits []int64) { - results, err := client.Search().Index(index).Query(query).Sort("id", true).Pretty(true).Do(context.Background()) +func assertQuery(t *testing.T, client *elastic.Client, query elastic.Query, expected []int64, msgAndArgs ...interface{}) { + results, err := client.Search().Index(aliasName).Query(query).Sort("id", true).Pretty(true).Do(context.Background()) assert.NoError(t, err) - assert.Equal(t, int64(len(hits)), results.Hits.TotalHits.Value) - if int64(len(hits)) == results.Hits.TotalHits.Value { - for i, hit := range results.Hits.Hits { - assert.Equal(t, fmt.Sprintf("%d", hits[i]), hit.Id) - } + actual := make([]int64, len(results.Hits.Hits)) + for h, hit := range results.Hits.Hits { + asInt, _ := strconv.Atoi(hit.Id) + actual[h] = int64(asInt) } -} - -func TestIndexer(t *testing.T) { - contacts.BatchSize = 4 - db, client := setup(t) - - ci := contacts.NewIndexer(indexName, elasticURL) - - physicalName, err := ci.CreateNewIndex(contacts.IndexSettings) - assert.NoError(t, err) - - added, deleted, err := ci.IndexModified(db, physicalName, time.Time{}) - assert.NoError(t, err) - assert.Equal(t, 9, added) - assert.Equal(t, 0, deleted) - - time.Sleep(2 * time.Second) - - assertQuery(t, client, physicalName, elastic.NewMatchQuery("name", "JOHn"), []int64{4}) - - // prefix on name matches both john and joanne, but no ajodi - assertQuery(t, client, physicalName, elastic.NewMatchQuery("name", "JO"), []int64{4, 6}) - assertQuery(t, client, physicalName, elastic.NewTermQuery("name.keyword", "JOHN DOE"), []int64{4}) - - // can search on both first and last name - boolQuery := elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("name", "john"), - elastic.NewMatchQuery("name", "doe")) - assertQuery(t, client, physicalName, boolQuery, []int64{4}) - - // can search on a long name - assertQuery(t, client, physicalName, elastic.NewMatchQuery("name", "Ajodinabiff"), []int64{5}) - - assertQuery(t, client, physicalName, elastic.NewMatchQuery("language", "eng"), []int64{1}) - - // test contact, not indexed - assertQuery(t, client, physicalName, elastic.NewMatchQuery("language", "fra"), []int64{}) - assertQuery(t, client, physicalName, elastic.NewMatchQuery("status", "B"), []int64{3}) - assertQuery(t, client, physicalName, elastic.NewMatchQuery("status", "S"), []int64{2}) - - assertQuery(t, client, physicalName, elastic.NewMatchQuery("org_id", "1"), []int64{1, 2, 3, 4}) + assert.Equal(t, expected, actual, msgAndArgs...) +} - assertQuery(t, client, physicalName, elastic.NewMatchQuery("tickets", 2), []int64{1}) - assertQuery(t, client, physicalName, elastic.NewMatchQuery("tickets", 1), []int64{2, 3}) - assertQuery(t, client, physicalName, elastic.NewRangeQuery("tickets").Gt(0), []int64{1, 2, 3}) +func assertIndexesWithPrefix(t *testing.T, es *elastic.Client, prefix string, expected []string) { + all, err := es.IndexNames() + require.NoError(t, err) - assertQuery(t, client, physicalName, elastic.NewMatchQuery("flow", "6d3cf1eb-546e-4fb8-a5ca-69187648fbf6"), []int64{2, 3}) - assertQuery(t, client, physicalName, elastic.NewMatchQuery("flow", "4eea8ff1-4fe2-4ce5-92a4-0870a499973a"), []int64{4}) + actual := []string{} + for _, name := range all { + if strings.HasPrefix(name, prefix) { + actual = append(actual, name) + } + } + sort.Strings(actual) + assert.Equal(t, expected, actual) +} - // created_on range query - assertQuery(t, client, physicalName, elastic.NewRangeQuery("created_on").Gt("2017-01-01"), []int64{1, 6, 8}) +func assertIndexerStats(t *testing.T, ix indexer.Indexer, expectedIndexed, expectedDeleted int64) { + actualIndexed, actualDeleted, _ := ix.Stats() + assert.Equal(t, expectedIndexed, actualIndexed, "indexed mismatch") + assert.Equal(t, expectedDeleted, actualDeleted, "deleted mismatch") +} - // last_seen_on range query - assertQuery(t, client, physicalName, elastic.NewRangeQuery("last_seen_on").Lt("2019-01-01"), []int64{3, 4}) +var queryTests = []struct { + query elastic.Query + expected []int64 +}{ + {elastic.NewMatchQuery("org_id", "1"), []int64{1, 2, 3, 4}}, + {elastic.NewMatchQuery("name", "JOHn"), []int64{4}}, + {elastic.NewTermQuery("name.keyword", "JOHN DOE"), []int64{4}}, + {elastic.NewBoolQuery().Must(elastic.NewMatchQuery("name", "john"), elastic.NewMatchQuery("name", "doe")), []int64{4}}, // can search on both first and last name + {elastic.NewMatchQuery("name", "Ajodinabiff"), []int64{5}}, // long name + {elastic.NewMatchQuery("language", "eng"), []int64{1}}, + {elastic.NewMatchQuery("status", "B"), []int64{3}}, + {elastic.NewMatchQuery("status", "S"), []int64{2}}, + {elastic.NewMatchQuery("tickets", 2), []int64{1}}, + {elastic.NewMatchQuery("tickets", 1), []int64{2, 3}}, + {elastic.NewRangeQuery("tickets").Gt(0), []int64{1, 2, 3}}, + {elastic.NewMatchQuery("flow", "6d3cf1eb-546e-4fb8-a5ca-69187648fbf6"), []int64{2, 3}}, + {elastic.NewMatchQuery("flow", "4eea8ff1-4fe2-4ce5-92a4-0870a499973a"), []int64{4}}, + {elastic.NewRangeQuery("created_on").Gt("2017-01-01"), []int64{1, 6, 8}}, // created_on range + {elastic.NewRangeQuery("last_seen_on").Lt("2019-01-01"), []int64{3, 4}}, // last_seen_on range + {elastic.NewExistsQuery("last_seen_on"), []int64{1, 2, 3, 4, 5, 6}}, // last_seen_on is set + {elastic.NewBoolQuery().MustNot(elastic.NewExistsQuery("last_seen_on")), []int64{7, 8, 9}}, // last_seen_on is not set + { + elastic.NewNestedQuery("urns", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("urns.scheme", "facebook"), + elastic.NewMatchQuery("urns.path.keyword", "1000001"), + )), + []int64{8}, + }, + { // urn substring + elastic.NewNestedQuery("urns", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("urns.scheme", "tel"), + elastic.NewMatchPhraseQuery("urns.path", "779"), + )), + []int64{1, 2, 3, 6}, + }, + { // urn substring with more characters (77911) + elastic.NewNestedQuery("urns", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("urns.scheme", "tel"), + elastic.NewMatchPhraseQuery("urns.path", "77911"), + )), + []int64{1}, + }, + { // urn substring with more characters (600055) + elastic.NewNestedQuery("urns", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("urns.scheme", "tel"), + elastic.NewMatchPhraseQuery("urns.path", "600055"), + )), + []int64{5}, + }, + { // match a contact with multiple tel urns + elastic.NewNestedQuery("urns", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("urns.scheme", "tel"), + elastic.NewMatchPhraseQuery("urns.path", "222"), + )), + []int64{1}, + }, + { // text field + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "17103bb1-1b48-4b70-92f7-1f6b73bd3488"), + elastic.NewMatchQuery("fields.text", "the rock")), + ), + []int64{1}, + }, + { // people with no nickname + elastic.NewBoolQuery().MustNot( + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "17103bb1-1b48-4b70-92f7-1f6b73bd3488"), + elastic.NewExistsQuery("fields.text")), + ), + ), + []int64{2, 3, 4, 5, 6, 7, 8, 9}, + }, + { // no tokenizing of field text + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "17103bb1-1b48-4b70-92f7-1f6b73bd3488"), + elastic.NewMatchQuery("fields.text", "rock"), + )), + []int64{}, + }, + { // number field range + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "05bca1cd-e322-4837-9595-86d0d85e5adb"), + elastic.NewRangeQuery("fields.number").Gt(10), + )), + []int64{2}, + }, + { // datetime field range + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "e0eac267-463a-4c00-9732-cab62df07b16"), + elastic.NewRangeQuery("fields.datetime").Lt(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)), + )), + []int64{3}, + }, + { // state field + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "22d11697-edba-4186-b084-793e3b876379"), + elastic.NewMatchPhraseQuery("fields.state", "washington"), + )), + []int64{5}, + }, + { + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "22d11697-edba-4186-b084-793e3b876379"), + elastic.NewMatchQuery("fields.state_keyword", " washington"), + )), + []int64{5}, + }, + { // doesn't include country + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "22d11697-edba-4186-b084-793e3b876379"), + elastic.NewMatchQuery("fields.state_keyword", "usa"), + )), + []int64{}, + }, + { + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "22d11697-edba-4186-b084-793e3b876379"), + elastic.NewMatchPhraseQuery("fields.state", "usa"), + )), + []int64{}, + }, + { // district field + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "fcab2439-861c-4832-aa54-0c97f38f24ab"), + elastic.NewMatchPhraseQuery("fields.district", "king"), + )), + []int64{7, 9}, + }, + { // phrase matches all + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "fcab2439-861c-4832-aa54-0c97f38f24ab"), + elastic.NewMatchPhraseQuery("fields.district", "King-Côunty"), + )), + []int64{7}, + }, + { + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "fcab2439-861c-4832-aa54-0c97f38f24ab"), + elastic.NewMatchQuery("fields.district_keyword", "King-Côunty"), + )), + []int64{7}, + }, + { // ward field + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "a551ade4-e5a0-4d83-b185-53b515ad2f2a"), + elastic.NewMatchPhraseQuery("fields.ward", "district"), + )), + []int64{8}, + }, + { + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "a551ade4-e5a0-4d83-b185-53b515ad2f2a"), + elastic.NewMatchQuery("fields.ward_keyword", "central district"), + )), + []int64{8}, + }, + { // no substring though on keyword + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "a551ade4-e5a0-4d83-b185-53b515ad2f2a"), + elastic.NewMatchQuery("fields.ward_keyword", "district"), + )), + []int64{}, + }, + {elastic.NewMatchQuery("groups", "4ea0f313-2f62-4e57-bdf0-232b5191dd57"), []int64{1}}, + {elastic.NewMatchQuery("groups", "529bac39-550a-4d6f-817c-1833f3449007"), []int64{1, 2}}, + {elastic.NewMatchQuery("groups", "4c016340-468d-4675-a974-15cb7a45a5ab"), []int64{}}, +} - // last_seen_on is set / not set queries - assertQuery(t, client, physicalName, elastic.NewExistsQuery("last_seen_on"), []int64{1, 2, 3, 4, 5, 6}) - assertQuery(t, client, physicalName, elastic.NewBoolQuery().MustNot(elastic.NewExistsQuery("last_seen_on")), []int64{7, 8, 9}) +func TestIndexer(t *testing.T) { + db, es := setup(t) - // urn query - query := elastic.NewNestedQuery("urns", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("urns.scheme", "facebook"), - elastic.NewMatchQuery("urns.path.keyword", "1000001"))) - assertQuery(t, client, physicalName, query, []int64{8}) + ix1 := contacts.NewIndexer(elasticURL, aliasName, 4) + assert.Equal(t, "indexer_test", ix1.Name()) - // urn substring query - query = elastic.NewNestedQuery("urns", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("urns.scheme", "tel"), - elastic.NewMatchPhraseQuery("urns.path", "779"))) - assertQuery(t, client, physicalName, query, []int64{1, 2, 3, 6}) + expectedIndexName := fmt.Sprintf("indexer_test_%s", time.Now().Format("2006_01_02")) - // urn substring query with more characters (77911) - query = elastic.NewNestedQuery("urns", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("urns.scheme", "tel"), - elastic.NewMatchPhraseQuery("urns.path", "77911"))) - assertQuery(t, client, physicalName, query, []int64{1}) + indexName, err := ix1.Index(db, false, false) + assert.NoError(t, err) + assert.Equal(t, expectedIndexName, indexName) - // urn substring query with more characters (600055) - query = elastic.NewNestedQuery("urns", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("urns.scheme", "tel"), - elastic.NewMatchPhraseQuery("urns.path", "600055"))) - assertQuery(t, client, physicalName, query, []int64{5}) + time.Sleep(1 * time.Second) - // match a contact with multiple tel urns - query = elastic.NewNestedQuery("urns", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("urns.scheme", "tel"), - elastic.NewMatchPhraseQuery("urns.path", "222"))) - assertQuery(t, client, physicalName, query, []int64{1}) + assertIndexerStats(t, ix1, 9, 0) + assertIndexesWithPrefix(t, es, aliasName, []string{expectedIndexName}) - // text query - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "17103bb1-1b48-4b70-92f7-1f6b73bd3488"), - elastic.NewMatchQuery("fields.text", "the rock"))) - assertQuery(t, client, physicalName, query, []int64{1}) + for _, tc := range queryTests { + src, _ := tc.query.Source() + assertQuery(t, es, tc.query, tc.expected, "query mismatch for %s", string(jsonx.MustMarshal(src))) + } - // people with no nickname - notQuery := elastic.NewBoolQuery().MustNot( - elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "17103bb1-1b48-4b70-92f7-1f6b73bd3488"), - elastic.NewExistsQuery("fields.text")))) - assertQuery(t, client, physicalName, notQuery, []int64{2, 3, 4, 5, 6, 7, 8, 9}) - - // no tokenizing of field text - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "17103bb1-1b48-4b70-92f7-1f6b73bd3488"), - elastic.NewMatchQuery("fields.text", "rock"))) - assertQuery(t, client, physicalName, query, []int64{}) - - // number field range query - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "05bca1cd-e322-4837-9595-86d0d85e5adb"), - elastic.NewRangeQuery("fields.number").Gt(10))) - assertQuery(t, client, physicalName, query, []int64{2}) - - // datetime field range query - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "e0eac267-463a-4c00-9732-cab62df07b16"), - elastic.NewRangeQuery("fields.datetime").Lt(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)))) - assertQuery(t, client, physicalName, query, []int64{3}) - - // state query - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "22d11697-edba-4186-b084-793e3b876379"), - elastic.NewMatchPhraseQuery("fields.state", "washington"))) - assertQuery(t, client, physicalName, query, []int64{5}) - - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "22d11697-edba-4186-b084-793e3b876379"), - elastic.NewMatchQuery("fields.state_keyword", " washington"))) - assertQuery(t, client, physicalName, query, []int64{5}) - - // doesn't include country - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "22d11697-edba-4186-b084-793e3b876379"), - elastic.NewMatchQuery("fields.state_keyword", "usa"))) - assertQuery(t, client, physicalName, query, []int64{}) - - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "22d11697-edba-4186-b084-793e3b876379"), - elastic.NewMatchPhraseQuery("fields.state", "usa"))) - assertQuery(t, client, physicalName, query, []int64{}) - - // district query - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "fcab2439-861c-4832-aa54-0c97f38f24ab"), - elastic.NewMatchPhraseQuery("fields.district", "king"))) - assertQuery(t, client, physicalName, query, []int64{7, 9}) - - // phrase matches all - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "fcab2439-861c-4832-aa54-0c97f38f24ab"), - elastic.NewMatchPhraseQuery("fields.district", "King-Côunty"))) - assertQuery(t, client, physicalName, query, []int64{7}) - - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "fcab2439-861c-4832-aa54-0c97f38f24ab"), - elastic.NewMatchQuery("fields.district_keyword", "King-Côunty"))) - assertQuery(t, client, physicalName, query, []int64{7}) - - // ward query - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "a551ade4-e5a0-4d83-b185-53b515ad2f2a"), - elastic.NewMatchPhraseQuery("fields.ward", "district"))) - assertQuery(t, client, physicalName, query, []int64{8}) - - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "a551ade4-e5a0-4d83-b185-53b515ad2f2a"), - elastic.NewMatchQuery("fields.ward_keyword", "central district"))) - assertQuery(t, client, physicalName, query, []int64{8}) - - // no substring though on keyword - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "a551ade4-e5a0-4d83-b185-53b515ad2f2a"), - elastic.NewMatchQuery("fields.ward_keyword", "district"))) - assertQuery(t, client, physicalName, query, []int64{}) - - // group query - assertQuery(t, client, physicalName, elastic.NewMatchQuery("groups", "4ea0f313-2f62-4e57-bdf0-232b5191dd57"), []int64{1}) - assertQuery(t, client, physicalName, elastic.NewMatchQuery("groups", "529bac39-550a-4d6f-817c-1833f3449007"), []int64{1, 2}) - assertQuery(t, client, physicalName, elastic.NewMatchQuery("groups", "4c016340-468d-4675-a974-15cb7a45a5ab"), []int64{}) - - lastModified, err := indexer.GetLastModified(elasticURL, physicalName) + lastModified, err := ix1.GetLastModified(indexName) assert.NoError(t, err) assert.Equal(t, time.Date(2017, 11, 10, 21, 11, 59, 890662000, time.UTC), lastModified.In(time.UTC)) - // map our index over - err = ci.UpdateAlias(physicalName) + // now make some contact changes, removing one contact, updating another + _, err = db.Exec(` + DELETE FROM contacts_contactgroup_contacts WHERE id = 3; + UPDATE contacts_contact SET name = 'John Deer', modified_on = '2020-08-20 14:00:00+00' where id = 2; + UPDATE contacts_contact SET is_active = FALSE, modified_on = '2020-08-22 15:00:00+00' where id = 4;`) + require.NoError(t, err) + + // and index again... + indexName, err = ix1.Index(db, false, false) assert.NoError(t, err) - time.Sleep(5 * time.Second) + assert.Equal(t, expectedIndexName, indexName) // same index used + assertIndexerStats(t, ix1, 10, 1) - // try a test query to check it worked - assertQuery(t, client, indexName, elastic.NewMatchQuery("name", "john"), []int64{4}) + time.Sleep(1 * time.Second) - // look up our mapping - physical := ci.FindPhysicalIndexes() - assert.Equal(t, physicalName, physical[0]) + assertIndexesWithPrefix(t, es, aliasName, []string{expectedIndexName}) - // rebuild again - newIndex, err := ci.CreateNewIndex(contacts.IndexSettings) - assert.NoError(t, err) + // should only match new john, old john is gone + assertQuery(t, es, elastic.NewMatchQuery("name", "john"), []int64{2}) - added, deleted, err = ci.IndexModified(db, newIndex, time.Time{}) - assert.NoError(t, err) - assert.Equal(t, 9, added) - assert.Equal(t, 0, deleted) + // 3 is no longer in our group + assertQuery(t, es, elastic.NewMatchQuery("groups", "529bac39-550a-4d6f-817c-1833f3449007"), []int64{1}) - // remap again - err = ci.UpdateAlias(newIndex) - assert.NoError(t, err) - time.Sleep(5 * time.Second) + // change John's name to Eric.. + _, err = db.Exec(` + UPDATE contacts_contact SET name = 'Eric', modified_on = '2020-08-20 14:00:00+00' where id = 2;`) + require.NoError(t, err) - // old index still around - resp, err := http.Get(fmt.Sprintf("%s/%s", elasticURL, physicalName)) - assert.NoError(t, err) - assert.Equal(t, resp.StatusCode, http.StatusOK) + // and simulate another indexer doing a parallel rebuild + ix2 := contacts.NewIndexer(elasticURL, aliasName, 4) - // cleanup our indexes, will remove our original index - err = ci.CleanupIndexes() + indexName2, err := ix2.Index(db, true, false) assert.NoError(t, err) + assert.Equal(t, expectedIndexName+"_1", indexName2) // new index used + assertIndexerStats(t, ix2, 8, 0) - // old physical index should be gone - resp, err = http.Get(fmt.Sprintf("%s/%s", elasticURL, physicalName)) - assert.NoError(t, err) - assert.Equal(t, resp.StatusCode, http.StatusNotFound) + time.Sleep(1 * time.Second) - // new index still works - assertQuery(t, client, newIndex, elastic.NewMatchQuery("name", "john"), []int64{4}) + // check we have a new index but the old index is still around + assertIndexesWithPrefix(t, es, aliasName, []string{expectedIndexName, expectedIndexName + "_1"}) - // update our database, removing one contact, updating another - _, err = db.Exec(` - DELETE FROM contacts_contactgroup_contacts WHERE id = 3; - UPDATE contacts_contact SET name = 'John Deer', modified_on = '2020-08-20 14:00:00+00' where id = 2; - UPDATE contacts_contact SET is_active = FALSE, modified_on = '2020-08-22 15:00:00+00' where id = 4;`) - assert.NoError(t, err) + // and the alias points to the new index + assertQuery(t, es, elastic.NewMatchQuery("name", "eric"), []int64{2}) - added, deleted, err = ci.IndexModified(db, indexName, lastModified) + // simulate another indexer doing a parallel rebuild with cleanup + ix3 := contacts.NewIndexer(elasticURL, aliasName, 4) + indexName3, err := ix3.Index(db, true, true) assert.NoError(t, err) - assert.Equal(t, 1, added) - assert.Equal(t, 1, deleted) + assert.Equal(t, expectedIndexName+"_2", indexName3) // new index used + assertIndexerStats(t, ix3, 8, 0) - time.Sleep(5 * time.Second) + // check we cleaned up indexes besides the new one + assertIndexesWithPrefix(t, es, aliasName, []string{expectedIndexName + "_2"}) - // should only match new john, old john is gone - assertQuery(t, client, indexName, elastic.NewMatchQuery("name", "john"), []int64{2}) - - // 3 is no longer in our group - assertQuery(t, client, indexName, elastic.NewMatchQuery("groups", "529bac39-550a-4d6f-817c-1833f3449007"), []int64{1}) + // check that the original indexer now indexes against the new index + indexName, err = ix1.Index(db, false, false) + assert.NoError(t, err) + assert.Equal(t, expectedIndexName+"_2", indexName) } diff --git a/http.go b/http.go index d526984..15277ee 100644 --- a/http.go +++ b/http.go @@ -44,11 +44,9 @@ func shouldRetry(request *http.Request, response *http.Response, withDelay time. return false } -// MakeJSONRequest is a utility function to make a JSON request, optionally decoding the response into the passed in struct -func MakeJSONRequest(method string, url string, body []byte, jsonStruct interface{}) (*http.Response, error) { - req, _ := http.NewRequest(method, url, bytes.NewReader(body)) - req.Header.Add("Content-Type", "application/json") - +// utility function to make a JSON request, optionally decoding the response into the passed in struct +func makeJSONRequest(method string, url string, body []byte, jsonStruct interface{}) (*http.Response, error) { + req, _ := httpx.NewRequest(method, url, bytes.NewReader(body), map[string]string{"Content-Type": "application/json"}) resp, err := httpx.Do(http.DefaultClient, req, retryConfig, nil) l := log.WithField("url", url).WithField("method", method).WithField("request", body) diff --git a/indexes.go b/indexes.go deleted file mode 100644 index f235a14..0000000 --- a/indexes.go +++ /dev/null @@ -1,43 +0,0 @@ -package indexer - -import ( - _ "embed" - "fmt" - "net/http" - "time" -) - -// our response for finding the last modified document -type queryResponse struct { - Hits struct { - Total struct { - Value int `json:"value"` - } `json:"total"` - Hits []struct { - Source struct { - ID int64 `json:"id"` - ModifiedOn time.Time `json:"modified_on"` - } `json:"_source"` - } `json:"hits"` - } `json:"hits"` -} - -// GetLastModified queries an index and finds the last modified document, returning its modified time -func GetLastModified(url string, index string) (time.Time, error) { - lastModified := time.Time{} - if index == "" { - return lastModified, fmt.Errorf("empty index passed to GetLastModified") - } - - // get the newest document on our index - queryResponse := queryResponse{} - _, err := MakeJSONRequest(http.MethodPost, fmt.Sprintf("%s/%s/_search", url, index), []byte(`{ "sort": [{ "modified_on_mu": "desc" }]}`), &queryResponse) - if err != nil { - return lastModified, err - } - - if len(queryResponse.Hits.Hits) > 0 { - lastModified = queryResponse.Hits.Hits[0].Source.ModifiedOn - } - return lastModified, nil -}