Skip to content

Commit

Permalink
Better testing
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Mar 23, 2022
1 parent 05b073f commit a20bdf4
Show file tree
Hide file tree
Showing 7 changed files with 357 additions and 336 deletions.
79 changes: 58 additions & 21 deletions base.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,32 @@ import (
// Indexer is base interface for indexers
type Indexer interface {
Name() string
Index(db *sql.DB, rebuild, cleanup bool) error
Index(db *sql.DB, rebuild, cleanup bool) (string, error)
Stats() (int64, int64, time.Duration)
}

type BaseIndexer struct {
name string // e.g. contacts, used as based index name
ElasticURL string
elasticURL string
name string // e.g. contacts, used as the alias

// statistics
indexedTotal int64
deletedTotal int64
elapsedTotal time.Duration
}

func NewBaseIndexer(name, elasticURL string) BaseIndexer {
return BaseIndexer{name: name, ElasticURL: elasticURL}
func NewBaseIndexer(elasticURL, name string) BaseIndexer {
return BaseIndexer{elasticURL: elasticURL, name: name}
}

func (i *BaseIndexer) Name() string {
return i.name
}

func (i *BaseIndexer) Log() *logrus.Entry {
return logrus.WithField("indexer", i.name)
}

func (i *BaseIndexer) Stats() (int64, int64, time.Duration) {
return i.indexedTotal, i.deletedTotal, i.elapsedTotal
}
Expand All @@ -47,16 +52,16 @@ func (i *BaseIndexer) RecordComplete(indexed, deleted int, elapsed time.Duration
i.deletedTotal += int64(deleted)
i.elapsedTotal += elapsed

logrus.WithField("indexer", i.name).WithField("indexed", indexed).WithField("deleted", deleted).WithField("elapsed", elapsed).Info("completed indexing")
i.Log().WithField("indexed", indexed).WithField("deleted", deleted).WithField("elapsed", elapsed).Info("completed indexing")
}

// our response for figuring out the physical index for an alias
type infoResponse map[string]interface{}

// FindPhysicalIndexes finds all our physical indexes
func (i *BaseIndexer) FindPhysicalIndexes() []string {
// FindIndexes finds all our physical indexes
func (i *BaseIndexer) FindIndexes() []string {
response := infoResponse{}
_, err := MakeJSONRequest(http.MethodGet, fmt.Sprintf("%s/%s", i.ElasticURL, i.name), nil, &response)
_, err := makeJSONRequest(http.MethodGet, fmt.Sprintf("%s/%s", i.elasticURL, i.name), nil, &response)
indexes := make([]string, 0)

// error could mean a variety of things, but we'll figure that out later
Expand All @@ -72,7 +77,7 @@ func (i *BaseIndexer) FindPhysicalIndexes() []string {
// reverse sort order should put our newest index first
sort.Sort(sort.Reverse(sort.StringSlice(indexes)))

logrus.WithField("indexer", i.name).WithField("indexes", indexes).Debug("found physical indexes")
i.Log().WithField("indexes", indexes).Debug("found physical indexes")

return indexes
}
Expand All @@ -91,7 +96,7 @@ func (i *BaseIndexer) CreateNewIndex(settings json.RawMessage) (string, error) {

// check if it exists
for {
resp, err := http.Get(fmt.Sprintf("%s/%s", i.ElasticURL, index))
resp, err := http.Get(fmt.Sprintf("%s/%s", i.elasticURL, index))
if err != nil {
return "", err
}
Expand All @@ -106,13 +111,13 @@ func (i *BaseIndexer) CreateNewIndex(settings json.RawMessage) (string, error) {
}

// create the new index
_, err := MakeJSONRequest(http.MethodPut, fmt.Sprintf("%s/%s?include_type_name=true", i.ElasticURL, index), settings, nil)
_, err := makeJSONRequest(http.MethodPut, fmt.Sprintf("%s/%s?include_type_name=true", i.elasticURL, index), settings, nil)
if err != nil {
return "", err
}

// all went well, return our physical index name
logrus.WithField("indexer", i.name).WithField("index", index).Info("created new index")
i.Log().WithField("index", index).Info("created new index")

return index, nil
}
Expand Down Expand Up @@ -144,7 +149,7 @@ func (i *BaseIndexer) UpdateAlias(newIndex string) error {
commands := make([]interface{}, 0)

// find existing physical indexes
existing := i.FindPhysicalIndexes()
existing := i.FindIndexes()
for _, idx := range existing {
remove := removeAliasCommand{}
remove.Remove.Alias = i.name
Expand All @@ -162,9 +167,9 @@ func (i *BaseIndexer) UpdateAlias(newIndex string) error {

aliasJSON := jsonx.MustMarshal(aliasCommand{Actions: commands})

_, err := MakeJSONRequest(http.MethodPost, fmt.Sprintf("%s/_aliases", i.ElasticURL), aliasJSON, nil)
_, err := makeJSONRequest(http.MethodPost, fmt.Sprintf("%s/_aliases", i.elasticURL), aliasJSON, nil)

logrus.WithField("indexer", i.name).WithField("index", newIndex).Debug("adding new alias")
i.Log().WithField("index", newIndex).Info("updated alias")

return err
}
Expand All @@ -179,7 +184,7 @@ type healthResponse struct {
// CleanupIndexes removes all indexes that are older than the currently active index
func (i *BaseIndexer) CleanupIndexes() error {
// find our current indexes
currents := i.FindPhysicalIndexes()
currents := i.FindIndexes()

// no current indexes? this a noop
if len(currents) == 0 {
Expand All @@ -188,7 +193,7 @@ func (i *BaseIndexer) CleanupIndexes() error {

// find all the current indexes
healthResponse := healthResponse{}
_, err := MakeJSONRequest(http.MethodGet, fmt.Sprintf("%s/%s", i.ElasticURL, "_cluster/health?level=indices"), nil, &healthResponse)
_, err := makeJSONRequest(http.MethodGet, fmt.Sprintf("%s/%s", i.elasticURL, "_cluster/health?level=indices"), nil, &healthResponse)
if err != nil {
return err
}
Expand All @@ -197,7 +202,7 @@ func (i *BaseIndexer) CleanupIndexes() error {
for key := range healthResponse.Indices {
if strings.HasPrefix(key, i.name) && strings.Compare(key, currents[0]) < 0 {
logrus.WithField("index", key).Info("removing old index")
_, err = MakeJSONRequest(http.MethodDelete, fmt.Sprintf("%s/%s", i.ElasticURL, key), nil, nil)
_, err = makeJSONRequest(http.MethodDelete, fmt.Sprintf("%s/%s", i.elasticURL, key), nil, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -225,9 +230,9 @@ type indexResponse struct {
// indexes the batch of contacts
func (i *BaseIndexer) IndexBatch(index string, batch []byte) (int, int, error) {
response := indexResponse{}
indexURL := fmt.Sprintf("%s/%s/_bulk", i.ElasticURL, index)
indexURL := fmt.Sprintf("%s/%s/_bulk", i.elasticURL, index)

_, err := MakeJSONRequest(http.MethodPut, indexURL, batch, &response)
_, err := makeJSONRequest(http.MethodPut, indexURL, batch, &response)
if err != nil {
return 0, 0, err
}
Expand Down Expand Up @@ -258,3 +263,35 @@ func (i *BaseIndexer) IndexBatch(index string, batch []byte) (int, int, error) {

return createdCount, deletedCount, nil
}

// our response for finding the last modified document
type queryResponse struct {
Hits struct {
Total struct {
Value int `json:"value"`
} `json:"total"`
Hits []struct {
Source struct {
ID int64 `json:"id"`
ModifiedOn time.Time `json:"modified_on"`
} `json:"_source"`
} `json:"hits"`
} `json:"hits"`
}

// GetLastModified queries a concrete index and finds the last modified document, returning its modified time
func (i *BaseIndexer) GetLastModified(index string) (time.Time, error) {
lastModified := time.Time{}

// get the newest document on our index
queryResponse := queryResponse{}
_, err := makeJSONRequest(http.MethodPost, fmt.Sprintf("%s/%s/_search", i.elasticURL, index), []byte(`{ "sort": [{ "modified_on_mu": "desc" }]}`), &queryResponse)
if err != nil {
return lastModified, err
}

if len(queryResponse.Hits.Hits) > 0 {
lastModified = queryResponse.Hits.Hits[0].Source.ModifiedOn
}
return lastModified, nil
}
4 changes: 2 additions & 2 deletions base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ func TestRetryServer(t *testing.T) {
}))
defer ts.Close()

ci := contacts.NewIndexer("rp_elastic_test", ts.URL)
ci.FindPhysicalIndexes()
ci := contacts.NewIndexer(ts.URL, "rp_elastic_test", 500)
ci.FindIndexes()

require.Equal(t, responseCounter, 4)
}
4 changes: 2 additions & 2 deletions cmd/rp-indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ func main() {
log.Fatal(err)
}

ci := contacts.NewIndexer(config.ElasticURL, config.Index)
ci := contacts.NewIndexer(config.ElasticURL, config.Index, 500)

for {
err := ci.Index(db, config.Rebuild, config.Cleanup)
_, err := ci.Index(db, config.Rebuild, config.Cleanup)

if err != nil {
if config.Rebuild {
Expand Down
58 changes: 27 additions & 31 deletions contacts/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@ import (
"github.com/sirupsen/logrus"
)

var BatchSize = 500

//go:embed index_settings.json
var IndexSettings json.RawMessage
var indexSettings json.RawMessage

// indexes a contact
const indexCommand = `{ "index": { "_id": %d, "_type": "_doc", "version": %d, "version_type": "external", "routing": %d} }`
Expand All @@ -27,20 +25,24 @@ const deleteCommand = `{ "delete" : { "_id": %d, "_type": "_doc", "version": %d,
// ContactIndexer is an indexer for contacts
type Indexer struct {
indexer.BaseIndexer

batchSize int
}

// NewIndexer creates a new contact indexer
func NewIndexer(name, elasticURL string) *Indexer {
func NewIndexer(elasticURL, name string, batchSize int) *Indexer {
return &Indexer{
BaseIndexer: indexer.NewBaseIndexer(name, elasticURL),
BaseIndexer: indexer.NewBaseIndexer(elasticURL, name),
batchSize: batchSize,
}
}

func (i *Indexer) Index(db *sql.DB, rebuild, cleanup bool) error {
// Index indexes modified contacts and returns the name of the concrete index
func (i *Indexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error) {
var err error

// find our physical index
physicalIndexes := i.FindPhysicalIndexes()
physicalIndexes := i.FindIndexes()

physicalIndex := ""
if len(physicalIndexes) > 0 {
Expand All @@ -52,26 +54,26 @@ func (i *Indexer) Index(db *sql.DB, rebuild, cleanup bool) error {

// doesn't exist or we are rebuilding, create it
if physicalIndex == "" || rebuild {
physicalIndex, err = i.CreateNewIndex(IndexSettings)
physicalIndex, err = i.CreateNewIndex(indexSettings)
if err != nil {
return errors.Wrap(err, "error creating new index")
return "", errors.Wrap(err, "error creating new index")
}
logrus.WithField("indexer", i.Name()).WithField("index", physicalIndex).Info("created new physical index")
i.Log().WithField("index", physicalIndex).Info("created new physical index")
remapAlias = true
}

lastModified, err := indexer.GetLastModified(i.ElasticURL, physicalIndex)
lastModified, err := i.GetLastModified(physicalIndex)
if err != nil {
return errors.Wrap(err, "error finding last modified")
return "", errors.Wrap(err, "error finding last modified")
}

logrus.WithField("indexer", i.Name()).WithField("index", physicalIndex).WithField("last_modified", lastModified).Info("indexing newer than last modified")
i.Log().WithField("index", physicalIndex).WithField("last_modified", lastModified).Info("indexing newer than last modified")

// now index our docs
start := time.Now()
indexed, deleted, err := i.IndexModified(db, physicalIndex, lastModified.Add(-5*time.Second))
indexed, deleted, err := i.indexModified(db, physicalIndex, lastModified.Add(-5*time.Second))
if err != nil {
return errors.Wrap(err, "error indexing documents")
return "", errors.Wrap(err, "error indexing documents")
}

i.RecordComplete(indexed, deleted, time.Since(start))
Expand All @@ -80,7 +82,7 @@ func (i *Indexer) Index(db *sql.DB, rebuild, cleanup bool) error {
if remapAlias {
err := i.UpdateAlias(physicalIndex)
if err != nil {
return errors.Wrap(err, "error remapping alias")
return "", errors.Wrap(err, "error updating alias")
}
remapAlias = false
}
Expand All @@ -89,22 +91,18 @@ func (i *Indexer) Index(db *sql.DB, rebuild, cleanup bool) error {
if cleanup {
err := i.CleanupIndexes()
if err != nil {
return errors.Wrap(err, "error cleaning up old indexes")
return "", errors.Wrap(err, "error cleaning up old indexes")
}
}

return nil
return physicalIndex, nil
}

// IndexModified queries and indexes all contacts with a lastModified greater than or equal to the passed in time
func (i *Indexer) IndexModified(db *sql.DB, index string, lastModified time.Time) (int, int, error) {
func (i *Indexer) indexModified(db *sql.DB, index string, lastModified time.Time) (int, int, error) {
batch := &bytes.Buffer{}
createdCount, deletedCount, processedCount := 0, 0, 0

if index == "" {
return 0, 0, errors.New("empty index")
}

var modifiedOn time.Time
var contactJSON string
var id, orgID int64
Expand Down Expand Up @@ -140,18 +138,20 @@ func (i *Indexer) IndexModified(db *sql.DB, index string, lastModified time.Time

if isActive {
logrus.WithField("id", id).WithField("modifiedOn", modifiedOn).WithField("contact", contactJSON).Debug("modified contact")

batch.WriteString(fmt.Sprintf(indexCommand, id, modifiedOn.UnixNano(), orgID))
batch.WriteString("\n")
batch.WriteString(contactJSON)
batch.WriteString("\n")
} else {
logrus.WithField("id", id).WithField("modifiedOn", modifiedOn).Debug("deleted contact")

batch.WriteString(fmt.Sprintf(deleteCommand, id, modifiedOn.UnixNano(), orgID))
batch.WriteString("\n")
}

// write to elastic search in batches
if queryCount%BatchSize == 0 {
if queryCount%i.batchSize == 0 {
created, deleted, err := i.IndexBatch(index, batch.Bytes())
if err != nil {
return 0, 0, err
Expand Down Expand Up @@ -181,16 +181,12 @@ func (i *Indexer) IndexModified(db *sql.DB, index string, lastModified time.Time
break
}

rows.Close()

elapsed := time.Since(start)
rate := float32(processedCount) / (float32(elapsed) / float32(time.Second))
logrus.WithFields(map[string]interface{}{
"rate": int(rate),
"added": createdCount,
"deleted": deletedCount,
"elapsed": elapsed,
"index": index}).Info("updated contact index")

rows.Close()
i.Log().WithField("index", index).WithFields(logrus.Fields{"rate": int(rate), "added": createdCount, "deleted": deletedCount, "elapsed": elapsed}).Info("indexed contact batch")
}

return createdCount, deletedCount, nil
Expand Down
Loading

0 comments on commit a20bdf4

Please sign in to comment.