From de432df155c39887078c3c100c6a5631192b2173 Mon Sep 17 00:00:00 2001 From: Tyler Britten <1933680+tybritten@users.noreply.github.com> Date: Mon, 8 Aug 2022 09:26:50 -0400 Subject: [PATCH 1/5] allow configurable shards/replicas --- cmd/rp-indexer/main.go | 2 +- config.go | 36 ++++++++++++++++++++---------------- daemon.go | 2 +- indexers/base.go | 22 +++++++++++++++++++--- indexers/contacts.go | 13 ++++++++----- 5 files changed, 49 insertions(+), 26 deletions(-) diff --git a/cmd/rp-indexer/main.go b/cmd/rp-indexer/main.go index 49379c4..a959a69 100644 --- a/cmd/rp-indexer/main.go +++ b/cmd/rp-indexer/main.go @@ -62,7 +62,7 @@ func main() { // if rebuilding, just do a complete index and quit. In future when we support multiple indexers, // the rebuild argument can be become the name of the index to rebuild, e.g. --rebuild=contacts idxr := idxrs[0] - if _, err := idxr.Index(db, true, cfg.Cleanup); err != nil { + if _, err := idxr.Index(db, true, cfg.Cleanup, cfg.ContactsShards, cfg.ContactsReplicas); err != nil { log.WithField("indexer", idxr.Name()).WithError(err).Fatal("error during rebuilding") } } else { diff --git a/config.go b/config.go index c83e755..1b10e7a 100644 --- a/config.go +++ b/config.go @@ -3,14 +3,16 @@ package indexer import "os" type Config struct { - ElasticURL string `help:"the url for our elastic search instance"` - DB string `help:"the connection string for our database"` - Index string `help:"the alias for our contact index"` - Poll int `help:"the number of seconds to wait between checking for updated contacts"` - Rebuild bool `help:"whether to rebuild the index, swapping it when complete, then exiting (default false)"` - Cleanup bool `help:"whether to remove old indexes after a rebuild"` - LogLevel string `help:"the log level, one of error, warn, info, debug"` - SentryDSN string `help:"the sentry configuration to log errors to, if any"` + ElasticURL string `help:"the url for our elastic search instance"` + DB string `help:"the connection string for our database"` + Index string `help:"the alias for our contact index"` + Poll int `help:"the number of seconds to wait between checking for updated contacts"` + Rebuild bool `help:"whether to rebuild the index, swapping it when complete, then exiting (default false)"` + Cleanup bool `help:"whether to remove old indexes after a rebuild"` + LogLevel string `help:"the log level, one of error, warn, info, debug"` + SentryDSN string `help:"the sentry configuration to log errors to, if any"` + ContactsShards int `help:"The number of shards to use for the contacts index"` + ContactsReplicas int `help:"The number of replicas to use for the contacts index"` LibratoUsername string `help:"the username that will be used to authenticate to Librato"` LibratoToken string `help:"the token that will be used to authenticate to Librato"` @@ -21,13 +23,15 @@ func NewDefaultConfig() *Config { hostname, _ := os.Hostname() return &Config{ - ElasticURL: "http://localhost:9200", - DB: "postgres://localhost/temba?sslmode=disable", - Index: "contacts", - Poll: 5, - Rebuild: false, - Cleanup: false, - LogLevel: "info", - InstanceName: hostname, + ElasticURL: "http://localhost:9200", + DB: "postgres://localhost/temba?sslmode=disable", + Index: "contacts", + Poll: 5, + Rebuild: false, + Cleanup: false, + LogLevel: "info", + InstanceName: hostname, + ContactsShards: 2, + ContactsReplicas: 1, } } diff --git a/daemon.go b/daemon.go index 8831075..3bdbc07 100644 --- a/daemon.go +++ b/daemon.go @@ -66,7 +66,7 @@ func (d *Daemon) startIndexer(indexer indexers.Indexer) { case <-d.quit: return case <-time.After(d.poll): - _, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup) + _, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup, d.cfg.ContactsShards, d.cfg.ContactsReplicas) if err != nil { log.WithError(err).Error("error during indexing") } diff --git a/indexers/base.go b/indexers/base.go index 7e5e639..3e4b2e5 100644 --- a/indexers/base.go +++ b/indexers/base.go @@ -29,10 +29,22 @@ type Stats struct { // Indexer is base interface for indexers type Indexer interface { Name() string - Index(db *sql.DB, rebuild, cleanup bool) (string, error) + Index(db *sql.DB, rebuild, cleanup bool, shards int, replicas int) (string, error) Stats() Stats } +type ElasticSettings struct { + Settings struct { + Index struct { + NumberOfShards int `json:"number_of_shards"` + NumberOfReplicas int `json:"number_of_replicas"` + RoutingPartitionSize int `json:"routing_partition_size"` + } `json:"index"` + Analysis json.RawMessage `json:"analysis"` + } `json:"settings"` + Mappings json.RawMessage `json:"mappings"` +} + type baseIndexer struct { elasticURL string name string // e.g. contacts, used as the alias @@ -99,7 +111,7 @@ func (i *baseIndexer) FindIndexes() []string { // that index to `contacts`. // // If the day-specific name already exists, we append a .1 or .2 to the name. -func (i *baseIndexer) createNewIndex(settings json.RawMessage) (string, error) { +func (i *baseIndexer) createNewIndex(indexSettings ElasticSettings) (string, error) { // create our day-specific name index := fmt.Sprintf("%s_%s", i.name, time.Now().Format("2006_01_02")) idx := 0 @@ -121,7 +133,11 @@ func (i *baseIndexer) createNewIndex(settings json.RawMessage) (string, error) { } // create the new index - _, err := utils.MakeJSONRequest(http.MethodPut, fmt.Sprintf("%s/%s?include_type_name=true", i.elasticURL, index), settings, nil) + settings, err := json.Marshal(indexSettings) + if err != nil { + return "", err + } + _, err = utils.MakeJSONRequest(http.MethodPut, fmt.Sprintf("%s/%s?include_type_name=true", i.elasticURL, index), settings, nil) if err != nil { return "", err } diff --git a/indexers/contacts.go b/indexers/contacts.go index 2ba5e28..749aa04 100644 --- a/indexers/contacts.go +++ b/indexers/contacts.go @@ -6,14 +6,13 @@ import ( _ "embed" "encoding/json" "fmt" - "time" - "github.com/pkg/errors" "github.com/sirupsen/logrus" + "io/ioutil" + "time" ) -//go:embed contacts.settings.json -var contactsSettings json.RawMessage +var contactsSettings ElasticSettings // ContactIndexer is an indexer for contacts type ContactIndexer struct { @@ -31,7 +30,7 @@ func NewContactIndexer(elasticURL, name string, batchSize int) *ContactIndexer { } // Index indexes modified contacts and returns the name of the concrete index -func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error) { +func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool, shards int, replicas int) (string, error) { var err error // find our physical index @@ -47,6 +46,10 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error // doesn't exist or we are rebuilding, create it if physicalIndex == "" || rebuild { + file, _ := ioutil.ReadFile("contacts.settings.json") + _ = json.Unmarshal([]byte(file), &contactsSettings) + contactsSettings.Settings.Index.NumberOfShards = shards + contactsSettings.Settings.Index.NumberOfReplicas = replicas physicalIndex, err = i.createNewIndex(contactsSettings) if err != nil { return "", errors.Wrap(err, "error creating new index") From 9dc40a26114b2b6b74633d72bffdf8cff5a1258b Mon Sep 17 00:00:00 2001 From: Tyler Britten <1933680+tybritten@users.noreply.github.com> Date: Mon, 8 Aug 2022 09:46:11 -0400 Subject: [PATCH 2/5] update tests --- indexers/contacts_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/indexers/contacts_test.go b/indexers/contacts_test.go index a6b34db..63bc18f 100644 --- a/indexers/contacts_test.go +++ b/indexers/contacts_test.go @@ -191,7 +191,7 @@ func TestContacts(t *testing.T) { expectedIndexName := fmt.Sprintf("indexer_test_%s", time.Now().Format("2006_01_02")) - indexName, err := ix1.Index(db, false, false) + indexName, err := ix1.Index(db, false, false, 2, 1) assert.NoError(t, err) assert.Equal(t, expectedIndexName, indexName) @@ -217,7 +217,7 @@ func TestContacts(t *testing.T) { require.NoError(t, err) // and index again... - indexName, err = ix1.Index(db, false, false) + indexName, err = ix1.Index(db, false, false, 2, 1) assert.NoError(t, err) assert.Equal(t, expectedIndexName, indexName) // same index used assertIndexerStats(t, ix1, 10, 1) @@ -264,7 +264,7 @@ func TestContacts(t *testing.T) { assertIndexesWithPrefix(t, es, aliasName, []string{expectedIndexName + "_2"}) // check that the original indexer now indexes against the new index - indexName, err = ix1.Index(db, false, false) + indexName, err = ix1.Index(db, false, false, 2, 1) assert.NoError(t, err) assert.Equal(t, expectedIndexName+"_2", indexName) } From 15876ac2cae3b69bbd88090c3dd37f4e006e1aa1 Mon Sep 17 00:00:00 2001 From: Tyler Britten <1933680+tybritten@users.noreply.github.com> Date: Mon, 8 Aug 2022 10:02:44 -0400 Subject: [PATCH 3/5] update rest of tests --- indexers/contacts_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/indexers/contacts_test.go b/indexers/contacts_test.go index 63bc18f..463aac0 100644 --- a/indexers/contacts_test.go +++ b/indexers/contacts_test.go @@ -240,7 +240,7 @@ func TestContacts(t *testing.T) { // and simulate another indexer doing a parallel rebuild ix2 := indexers.NewContactIndexer(elasticURL, aliasName, 4) - indexName2, err := ix2.Index(db, true, false) + indexName2, err := ix2.Index(db, true, false, 2, 1) assert.NoError(t, err) assert.Equal(t, expectedIndexName+"_1", indexName2) // new index used assertIndexerStats(t, ix2, 8, 0) @@ -255,7 +255,7 @@ func TestContacts(t *testing.T) { // simulate another indexer doing a parallel rebuild with cleanup ix3 := indexers.NewContactIndexer(elasticURL, aliasName, 4) - indexName3, err := ix3.Index(db, true, true) + indexName3, err := ix3.Index(db, true, true, 2, 1) assert.NoError(t, err) assert.Equal(t, expectedIndexName+"_2", indexName3) // new index used assertIndexerStats(t, ix3, 8, 0) From 7305e1048da679c90841c5c150276bfb4deff61b Mon Sep 17 00:00:00 2001 From: Tyler Britten <1933680+tybritten@users.noreply.github.com> Date: Mon, 8 Aug 2022 11:45:47 -0400 Subject: [PATCH 4/5] re-embed settings file --- indexers/contacts.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/indexers/contacts.go b/indexers/contacts.go index 749aa04..3ee1eda 100644 --- a/indexers/contacts.go +++ b/indexers/contacts.go @@ -8,10 +8,12 @@ import ( "fmt" "github.com/pkg/errors" "github.com/sirupsen/logrus" - "io/ioutil" "time" ) +//go:embed contacts.settings.json +var contactsSettingsFile []byte + var contactsSettings ElasticSettings // ContactIndexer is an indexer for contacts @@ -46,8 +48,7 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool, shards int, re // doesn't exist or we are rebuilding, create it if physicalIndex == "" || rebuild { - file, _ := ioutil.ReadFile("contacts.settings.json") - _ = json.Unmarshal([]byte(file), &contactsSettings) + _ = json.Unmarshal(contactsSettingsFile, &contactsSettings) contactsSettings.Settings.Index.NumberOfShards = shards contactsSettings.Settings.Index.NumberOfReplicas = replicas physicalIndex, err = i.createNewIndex(contactsSettings) From 021e17d028038826175d8459c2ccd43c02748ed8 Mon Sep 17 00:00:00 2001 From: Tyler Britten <1933680+tybritten@users.noreply.github.com> Date: Mon, 10 Oct 2022 17:13:24 -0400 Subject: [PATCH 5/5] handle unmarshall error --- indexers/contacts.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/indexers/contacts.go b/indexers/contacts.go index 3ee1eda..b476af6 100644 --- a/indexers/contacts.go +++ b/indexers/contacts.go @@ -48,7 +48,10 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool, shards int, re // doesn't exist or we are rebuilding, create it if physicalIndex == "" || rebuild { - _ = json.Unmarshal(contactsSettingsFile, &contactsSettings) + err = json.Unmarshal(contactsSettingsFile, &contactsSettings) + if err != nil { + return "", errors.Wrap(err, "error unmarshalling embeded contacts.settings.json file") + } contactsSettings.Settings.Index.NumberOfShards = shards contactsSettings.Settings.Index.NumberOfReplicas = replicas physicalIndex, err = i.createNewIndex(contactsSettings)