diff --git a/cmd/rp-indexer/main.go b/cmd/rp-indexer/main.go index 49379c4..a959a69 100644 --- a/cmd/rp-indexer/main.go +++ b/cmd/rp-indexer/main.go @@ -62,7 +62,7 @@ func main() { // if rebuilding, just do a complete index and quit. In future when we support multiple indexers, // the rebuild argument can be become the name of the index to rebuild, e.g. --rebuild=contacts idxr := idxrs[0] - if _, err := idxr.Index(db, true, cfg.Cleanup); err != nil { + if _, err := idxr.Index(db, true, cfg.Cleanup, cfg.ContactsShards, cfg.ContactsReplicas); err != nil { log.WithField("indexer", idxr.Name()).WithError(err).Fatal("error during rebuilding") } } else { diff --git a/config.go b/config.go index c83e755..1b10e7a 100644 --- a/config.go +++ b/config.go @@ -3,14 +3,16 @@ package indexer import "os" type Config struct { - ElasticURL string `help:"the url for our elastic search instance"` - DB string `help:"the connection string for our database"` - Index string `help:"the alias for our contact index"` - Poll int `help:"the number of seconds to wait between checking for updated contacts"` - Rebuild bool `help:"whether to rebuild the index, swapping it when complete, then exiting (default false)"` - Cleanup bool `help:"whether to remove old indexes after a rebuild"` - LogLevel string `help:"the log level, one of error, warn, info, debug"` - SentryDSN string `help:"the sentry configuration to log errors to, if any"` + ElasticURL string `help:"the url for our elastic search instance"` + DB string `help:"the connection string for our database"` + Index string `help:"the alias for our contact index"` + Poll int `help:"the number of seconds to wait between checking for updated contacts"` + Rebuild bool `help:"whether to rebuild the index, swapping it when complete, then exiting (default false)"` + Cleanup bool `help:"whether to remove old indexes after a rebuild"` + LogLevel string `help:"the log level, one of error, warn, info, debug"` + SentryDSN string `help:"the sentry configuration to log errors to, if any"` + ContactsShards int `help:"The number of shards to use for the contacts index"` + ContactsReplicas int `help:"The number of replicas to use for the contacts index"` LibratoUsername string `help:"the username that will be used to authenticate to Librato"` LibratoToken string `help:"the token that will be used to authenticate to Librato"` @@ -21,13 +23,15 @@ func NewDefaultConfig() *Config { hostname, _ := os.Hostname() return &Config{ - ElasticURL: "http://localhost:9200", - DB: "postgres://localhost/temba?sslmode=disable", - Index: "contacts", - Poll: 5, - Rebuild: false, - Cleanup: false, - LogLevel: "info", - InstanceName: hostname, + ElasticURL: "http://localhost:9200", + DB: "postgres://localhost/temba?sslmode=disable", + Index: "contacts", + Poll: 5, + Rebuild: false, + Cleanup: false, + LogLevel: "info", + InstanceName: hostname, + ContactsShards: 2, + ContactsReplicas: 1, } } diff --git a/daemon.go b/daemon.go index 8831075..3bdbc07 100644 --- a/daemon.go +++ b/daemon.go @@ -66,7 +66,7 @@ func (d *Daemon) startIndexer(indexer indexers.Indexer) { case <-d.quit: return case <-time.After(d.poll): - _, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup) + _, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup, d.cfg.ContactsShards, d.cfg.ContactsReplicas) if err != nil { log.WithError(err).Error("error during indexing") } diff --git a/indexers/base.go b/indexers/base.go index 7e5e639..3e4b2e5 100644 --- a/indexers/base.go +++ b/indexers/base.go @@ -29,10 +29,22 @@ type Stats struct { // Indexer is base interface for indexers type Indexer interface { Name() string - Index(db *sql.DB, rebuild, cleanup bool) (string, error) + Index(db *sql.DB, rebuild, cleanup bool, shards int, replicas int) (string, error) Stats() Stats } +type ElasticSettings struct { + Settings struct { + Index struct { + NumberOfShards int `json:"number_of_shards"` + NumberOfReplicas int `json:"number_of_replicas"` + RoutingPartitionSize int `json:"routing_partition_size"` + } `json:"index"` + Analysis json.RawMessage `json:"analysis"` + } `json:"settings"` + Mappings json.RawMessage `json:"mappings"` +} + type baseIndexer struct { elasticURL string name string // e.g. contacts, used as the alias @@ -99,7 +111,7 @@ func (i *baseIndexer) FindIndexes() []string { // that index to `contacts`. // // If the day-specific name already exists, we append a .1 or .2 to the name. -func (i *baseIndexer) createNewIndex(settings json.RawMessage) (string, error) { +func (i *baseIndexer) createNewIndex(indexSettings ElasticSettings) (string, error) { // create our day-specific name index := fmt.Sprintf("%s_%s", i.name, time.Now().Format("2006_01_02")) idx := 0 @@ -121,7 +133,11 @@ func (i *baseIndexer) createNewIndex(settings json.RawMessage) (string, error) { } // create the new index - _, err := utils.MakeJSONRequest(http.MethodPut, fmt.Sprintf("%s/%s?include_type_name=true", i.elasticURL, index), settings, nil) + settings, err := json.Marshal(indexSettings) + if err != nil { + return "", err + } + _, err = utils.MakeJSONRequest(http.MethodPut, fmt.Sprintf("%s/%s?include_type_name=true", i.elasticURL, index), settings, nil) if err != nil { return "", err } diff --git a/indexers/contacts.go b/indexers/contacts.go index 2ba5e28..749aa04 100644 --- a/indexers/contacts.go +++ b/indexers/contacts.go @@ -6,14 +6,13 @@ import ( _ "embed" "encoding/json" "fmt" - "time" - "github.com/pkg/errors" "github.com/sirupsen/logrus" + "io/ioutil" + "time" ) -//go:embed contacts.settings.json -var contactsSettings json.RawMessage +var contactsSettings ElasticSettings // ContactIndexer is an indexer for contacts type ContactIndexer struct { @@ -31,7 +30,7 @@ func NewContactIndexer(elasticURL, name string, batchSize int) *ContactIndexer { } // Index indexes modified contacts and returns the name of the concrete index -func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error) { +func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool, shards int, replicas int) (string, error) { var err error // find our physical index @@ -47,6 +46,10 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error // doesn't exist or we are rebuilding, create it if physicalIndex == "" || rebuild { + file, _ := ioutil.ReadFile("contacts.settings.json") + _ = json.Unmarshal([]byte(file), &contactsSettings) + contactsSettings.Settings.Index.NumberOfShards = shards + contactsSettings.Settings.Index.NumberOfReplicas = replicas physicalIndex, err = i.createNewIndex(contactsSettings) if err != nil { return "", errors.Wrap(err, "error creating new index")