Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change index setting to contacts_index and rename some stuff for clarity #55

Merged
merged 2 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/rp-indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ func main() {
}

idxrs := []indexers.Indexer{
indexers.NewContactIndexer(cfg.ElasticURL, cfg.Index, 500),
indexers.NewContactIndexer(cfg.ElasticURL, cfg.ContactsIndex, cfg.ContactsShards, cfg.ContactsReplicas, 500),
}

if cfg.Rebuild {
// if rebuilding, just do a complete index and quit. In future when we support multiple indexers,
// the rebuild argument can be become the name of the index to rebuild, e.g. --rebuild=contacts
idxr := idxrs[0]
if _, err := idxr.Index(db, true, cfg.Cleanup, cfg.ContactsShards, cfg.ContactsReplicas); err != nil {
if _, err := idxr.Index(db, true, cfg.Cleanup); err != nil {
log.WithField("indexer", idxr.Name()).WithError(err).Fatal("error during rebuilding")
}
} else {
Expand Down
39 changes: 20 additions & 19 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,35 @@ package indexer
import "os"

type Config struct {
ElasticURL string `help:"the url for our elastic search instance"`
DB string `help:"the connection string for our database"`
Index string `help:"the alias for our contact index"`
Poll int `help:"the number of seconds to wait between checking for updated contacts"`
Rebuild bool `help:"whether to rebuild the index, swapping it when complete, then exiting (default false)"`
Cleanup bool `help:"whether to remove old indexes after a rebuild"`
LogLevel string `help:"the log level, one of error, warn, info, debug"`
SentryDSN string `help:"the sentry configuration to log errors to, if any"`
ContactsShards int `help:"The number of shards to use for the contacts index"`
ContactsReplicas int `help:"The number of replicas to use for the contacts index"`

ElasticURL string `help:"the url for our elastic search instance"`
DB string `help:"the connection string for our database"`
Poll int `help:"the number of seconds to wait between checking for database updates"`
Rebuild bool `help:"whether to rebuild the index, swapping it when complete, then exiting (default false)"`
Cleanup bool `help:"whether to remove old indexes after a rebuild"`
LogLevel string `help:"the log level, one of error, warn, info, debug"`
SentryDSN string `help:"the sentry configuration to log errors to, if any"`
LibratoUsername string `help:"the username that will be used to authenticate to Librato"`
LibratoToken string `help:"the token that will be used to authenticate to Librato"`
InstanceName string `help:"the unique name of this instance used for analytics"`

ContactsIndex string `help:"the alias to use for the contact index"`
ContactsShards int `help:"the number of shards to use for the contacts index"`
ContactsReplicas int `help:"the number of replicas to use for the contacts index"`
}

func NewDefaultConfig() *Config {
hostname, _ := os.Hostname()

return &Config{
ElasticURL: "http://localhost:9200",
DB: "postgres://localhost/temba?sslmode=disable",
Index: "contacts",
Poll: 5,
Rebuild: false,
Cleanup: false,
LogLevel: "info",
InstanceName: hostname,
ElasticURL: "http://localhost:9200",
DB: "postgres://localhost/temba?sslmode=disable",
Poll: 5,
Rebuild: false,
Cleanup: false,
LogLevel: "info",
InstanceName: hostname,

ContactsIndex: "contacts",
ContactsShards: 2,
ContactsReplicas: 1,
}
Expand Down
2 changes: 1 addition & 1 deletion daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (d *Daemon) startIndexer(indexer indexers.Indexer) {
case <-d.quit:
return
case <-time.After(d.poll):
_, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup, d.cfg.ContactsShards, d.cfg.ContactsReplicas)
_, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup)
if err != nil {
log.WithError(err).Error("error during indexing")
}
Expand Down
30 changes: 20 additions & 10 deletions indexers/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ type Stats struct {
// Indexer is base interface for indexers
type Indexer interface {
Name() string
Index(db *sql.DB, rebuild, cleanup bool, shards int, replicas int) (string, error)
Index(db *sql.DB, rebuild, cleanup bool) (string, error)
Stats() Stats
}

type ElasticSettings struct {
// IndexDefinition is what we pass to elastic to create an index,
// see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html
type IndexDefinition struct {
Settings struct {
Index struct {
NumberOfShards int `json:"number_of_shards"`
Expand All @@ -45,15 +47,25 @@ type ElasticSettings struct {
Mappings json.RawMessage `json:"mappings"`
}

func newIndexDefinition(base []byte, shards, replicas int) *IndexDefinition {
d := &IndexDefinition{}
jsonx.MustUnmarshal(contactsIndexDef, d)

d.Settings.Index.NumberOfShards = shards
d.Settings.Index.NumberOfReplicas = replicas
return d
}

type baseIndexer struct {
elasticURL string
name string // e.g. contacts, used as the alias
definition *IndexDefinition

stats Stats
}

func newBaseIndexer(elasticURL, name string) baseIndexer {
return baseIndexer{elasticURL: elasticURL, name: name}
func newBaseIndexer(elasticURL, name string, def *IndexDefinition) baseIndexer {
return baseIndexer{elasticURL: elasticURL, name: name, definition: def}
}

func (i *baseIndexer) Name() string {
Expand Down Expand Up @@ -111,7 +123,7 @@ func (i *baseIndexer) FindIndexes() []string {
// that index to `contacts`.
//
// If the day-specific name already exists, we append a .1 or .2 to the name.
func (i *baseIndexer) createNewIndex(indexSettings ElasticSettings) (string, error) {
func (i *baseIndexer) createNewIndex(def *IndexDefinition) (string, error) {
// create our day-specific name
index := fmt.Sprintf("%s_%s", i.name, time.Now().Format("2006_01_02"))
idx := 0
Expand All @@ -133,11 +145,9 @@ func (i *baseIndexer) createNewIndex(indexSettings ElasticSettings) (string, err
}

// create the new index
settings, err := json.Marshal(indexSettings)
if err != nil {
return "", err
}
_, err = utils.MakeJSONRequest(http.MethodPut, fmt.Sprintf("%s/%s?include_type_name=true", i.elasticURL, index), settings, nil)
settings := jsonx.MustMarshal(def)

_, err := utils.MakeJSONRequest(http.MethodPut, fmt.Sprintf("%s/%s?include_type_name=true", i.elasticURL, index), settings, nil)
if err != nil {
return "", err
}
Expand Down
3 changes: 1 addition & 2 deletions indexers/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package indexers_test
import (
"context"
"database/sql"
"io/ioutil"
"log"
"os"
"sort"
Expand All @@ -22,7 +21,7 @@ const elasticURL = "http://localhost:9200"
const aliasName = "indexer_test"

func setup(t *testing.T) (*sql.DB, *elastic.Client) {
testDB, err := ioutil.ReadFile("../testdb.sql")
testDB, err := os.ReadFile("../testdb.sql")
require.NoError(t, err)

db, err := sql.Open("postgres", "postgres://nyaruka:nyaruka@localhost:5432/elastic_test?sslmode=disable")
Expand Down
26 changes: 10 additions & 16 deletions indexers/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,15 @@ import (
"bytes"
"database/sql"
_ "embed"
"encoding/json"
"fmt"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"time"
)

//go:embed contacts.settings.json
var contactsSettingsFile []byte

var contactsSettings ElasticSettings
//go:embed contacts.index.json
var contactsIndexDef []byte

// ContactIndexer is an indexer for contacts
type ContactIndexer struct {
Expand All @@ -24,15 +22,17 @@ type ContactIndexer struct {
}

// NewContactIndexer creates a new contact indexer
func NewContactIndexer(elasticURL, name string, batchSize int) *ContactIndexer {
func NewContactIndexer(elasticURL, name string, shards, replicas, batchSize int) *ContactIndexer {
def := newIndexDefinition(contactsIndexDef, shards, replicas)

return &ContactIndexer{
baseIndexer: newBaseIndexer(elasticURL, name),
baseIndexer: newBaseIndexer(elasticURL, name, def),
batchSize: batchSize,
}
}

// Index indexes modified contacts and returns the name of the concrete index
func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool, shards int, replicas int) (string, error) {
func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error) {
var err error

// find our physical index
Expand All @@ -48,13 +48,7 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool, shards int, re

// doesn't exist or we are rebuilding, create it
if physicalIndex == "" || rebuild {
err = json.Unmarshal(contactsSettingsFile, &contactsSettings)
if err != nil {
return "", errors.Wrap(err, "error unmarshalling embeded contacts.settings.json file")
}
contactsSettings.Settings.Index.NumberOfShards = shards
contactsSettings.Settings.Index.NumberOfReplicas = replicas
physicalIndex, err = i.createNewIndex(contactsSettings)
physicalIndex, err = i.createNewIndex(i.definition)
if err != nil {
return "", errors.Wrap(err, "error creating new index")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"settings": {
"index": {
"number_of_shards": 2,
"number_of_replicas": 1,
"number_of_shards": -1,
"number_of_replicas": -1,
"routing_partition_size": 1
},
"analysis": {
Expand Down
16 changes: 8 additions & 8 deletions indexers/contacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,12 @@ var contactQueryTests = []struct {
func TestContacts(t *testing.T) {
db, es := setup(t)

ix1 := indexers.NewContactIndexer(elasticURL, aliasName, 4)
ix1 := indexers.NewContactIndexer(elasticURL, aliasName, 2, 1, 4)
assert.Equal(t, "indexer_test", ix1.Name())

expectedIndexName := fmt.Sprintf("indexer_test_%s", time.Now().Format("2006_01_02"))

indexName, err := ix1.Index(db, false, false, 2, 1)
indexName, err := ix1.Index(db, false, false)
assert.NoError(t, err)
assert.Equal(t, expectedIndexName, indexName)

Expand All @@ -217,7 +217,7 @@ func TestContacts(t *testing.T) {
require.NoError(t, err)

// and index again...
indexName, err = ix1.Index(db, false, false, 2, 1)
indexName, err = ix1.Index(db, false, false)
assert.NoError(t, err)
assert.Equal(t, expectedIndexName, indexName) // same index used
assertIndexerStats(t, ix1, 10, 1)
Expand All @@ -238,9 +238,9 @@ func TestContacts(t *testing.T) {
require.NoError(t, err)

// and simulate another indexer doing a parallel rebuild
ix2 := indexers.NewContactIndexer(elasticURL, aliasName, 4)
ix2 := indexers.NewContactIndexer(elasticURL, aliasName, 2, 1, 4)

indexName2, err := ix2.Index(db, true, false, 2, 1)
indexName2, err := ix2.Index(db, true, false)
assert.NoError(t, err)
assert.Equal(t, expectedIndexName+"_1", indexName2) // new index used
assertIndexerStats(t, ix2, 8, 0)
Expand All @@ -254,8 +254,8 @@ func TestContacts(t *testing.T) {
assertQuery(t, es, elastic.NewMatchQuery("name", "eric"), []int64{2})

// simulate another indexer doing a parallel rebuild with cleanup
ix3 := indexers.NewContactIndexer(elasticURL, aliasName, 4)
indexName3, err := ix3.Index(db, true, true, 2, 1)
ix3 := indexers.NewContactIndexer(elasticURL, aliasName, 2, 1, 4)
indexName3, err := ix3.Index(db, true, true)
assert.NoError(t, err)
assert.Equal(t, expectedIndexName+"_2", indexName3) // new index used
assertIndexerStats(t, ix3, 8, 0)
Expand All @@ -264,7 +264,7 @@ func TestContacts(t *testing.T) {
assertIndexesWithPrefix(t, es, aliasName, []string{expectedIndexName + "_2"})

// check that the original indexer now indexes against the new index
indexName, err = ix1.Index(db, false, false, 2, 1)
indexName, err = ix1.Index(db, false, false)
assert.NoError(t, err)
assert.Equal(t, expectedIndexName+"_2", indexName)
}