Skip to content

Commit

Permalink
allow configurable shards/replicas
Browse files Browse the repository at this point in the history
  • Loading branch information
tybritten committed Aug 8, 2022
1 parent a0add1e commit de432df
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 26 deletions.
2 changes: 1 addition & 1 deletion cmd/rp-indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func main() {
// if rebuilding, just do a complete index and quit. In future when we support multiple indexers,
// the rebuild argument can be become the name of the index to rebuild, e.g. --rebuild=contacts
idxr := idxrs[0]
if _, err := idxr.Index(db, true, cfg.Cleanup); err != nil {
if _, err := idxr.Index(db, true, cfg.Cleanup, cfg.ContactsShards, cfg.ContactsReplicas); err != nil {
log.WithField("indexer", idxr.Name()).WithError(err).Fatal("error during rebuilding")
}
} else {
Expand Down
36 changes: 20 additions & 16 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ package indexer
import "os"

type Config struct {
ElasticURL string `help:"the url for our elastic search instance"`
DB string `help:"the connection string for our database"`
Index string `help:"the alias for our contact index"`
Poll int `help:"the number of seconds to wait between checking for updated contacts"`
Rebuild bool `help:"whether to rebuild the index, swapping it when complete, then exiting (default false)"`
Cleanup bool `help:"whether to remove old indexes after a rebuild"`
LogLevel string `help:"the log level, one of error, warn, info, debug"`
SentryDSN string `help:"the sentry configuration to log errors to, if any"`
ElasticURL string `help:"the url for our elastic search instance"`
DB string `help:"the connection string for our database"`
Index string `help:"the alias for our contact index"`
Poll int `help:"the number of seconds to wait between checking for updated contacts"`
Rebuild bool `help:"whether to rebuild the index, swapping it when complete, then exiting (default false)"`
Cleanup bool `help:"whether to remove old indexes after a rebuild"`
LogLevel string `help:"the log level, one of error, warn, info, debug"`
SentryDSN string `help:"the sentry configuration to log errors to, if any"`
ContactsShards int `help:"The number of shards to use for the contacts index"`
ContactsReplicas int `help:"The number of replicas to use for the contacts index"`

LibratoUsername string `help:"the username that will be used to authenticate to Librato"`
LibratoToken string `help:"the token that will be used to authenticate to Librato"`
Expand All @@ -21,13 +23,15 @@ func NewDefaultConfig() *Config {
hostname, _ := os.Hostname()

return &Config{
ElasticURL: "http://localhost:9200",
DB: "postgres://localhost/temba?sslmode=disable",
Index: "contacts",
Poll: 5,
Rebuild: false,
Cleanup: false,
LogLevel: "info",
InstanceName: hostname,
ElasticURL: "http://localhost:9200",
DB: "postgres://localhost/temba?sslmode=disable",
Index: "contacts",
Poll: 5,
Rebuild: false,
Cleanup: false,
LogLevel: "info",
InstanceName: hostname,
ContactsShards: 2,
ContactsReplicas: 1,
}
}
2 changes: 1 addition & 1 deletion daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (d *Daemon) startIndexer(indexer indexers.Indexer) {
case <-d.quit:
return
case <-time.After(d.poll):
_, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup)
_, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup, d.cfg.ContactsShards, d.cfg.ContactsReplicas)
if err != nil {
log.WithError(err).Error("error during indexing")
}
Expand Down
22 changes: 19 additions & 3 deletions indexers/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,22 @@ type Stats struct {
// Indexer is base interface for indexers
type Indexer interface {
Name() string
Index(db *sql.DB, rebuild, cleanup bool) (string, error)
Index(db *sql.DB, rebuild, cleanup bool, shards int, replicas int) (string, error)
Stats() Stats
}

type ElasticSettings struct {
Settings struct {
Index struct {
NumberOfShards int `json:"number_of_shards"`
NumberOfReplicas int `json:"number_of_replicas"`
RoutingPartitionSize int `json:"routing_partition_size"`
} `json:"index"`
Analysis json.RawMessage `json:"analysis"`
} `json:"settings"`
Mappings json.RawMessage `json:"mappings"`
}

type baseIndexer struct {
elasticURL string
name string // e.g. contacts, used as the alias
Expand Down Expand Up @@ -99,7 +111,7 @@ func (i *baseIndexer) FindIndexes() []string {
// that index to `contacts`.
//
// If the day-specific name already exists, we append a .1 or .2 to the name.
func (i *baseIndexer) createNewIndex(settings json.RawMessage) (string, error) {
func (i *baseIndexer) createNewIndex(indexSettings ElasticSettings) (string, error) {
// create our day-specific name
index := fmt.Sprintf("%s_%s", i.name, time.Now().Format("2006_01_02"))
idx := 0
Expand All @@ -121,7 +133,11 @@ func (i *baseIndexer) createNewIndex(settings json.RawMessage) (string, error) {
}

// create the new index
_, err := utils.MakeJSONRequest(http.MethodPut, fmt.Sprintf("%s/%s?include_type_name=true", i.elasticURL, index), settings, nil)
settings, err := json.Marshal(indexSettings)
if err != nil {
return "", err
}
_, err = utils.MakeJSONRequest(http.MethodPut, fmt.Sprintf("%s/%s?include_type_name=true", i.elasticURL, index), settings, nil)
if err != nil {
return "", err
}
Expand Down
13 changes: 8 additions & 5 deletions indexers/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ import (
_ "embed"
"encoding/json"
"fmt"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"io/ioutil"
"time"
)

//go:embed contacts.settings.json
var contactsSettings json.RawMessage
var contactsSettings ElasticSettings

// ContactIndexer is an indexer for contacts
type ContactIndexer struct {
Expand All @@ -31,7 +30,7 @@ func NewContactIndexer(elasticURL, name string, batchSize int) *ContactIndexer {
}

// Index indexes modified contacts and returns the name of the concrete index
func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error) {
func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool, shards int, replicas int) (string, error) {
var err error

// find our physical index
Expand All @@ -47,6 +46,10 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error

// doesn't exist or we are rebuilding, create it
if physicalIndex == "" || rebuild {
file, _ := ioutil.ReadFile("contacts.settings.json")
_ = json.Unmarshal([]byte(file), &contactsSettings)
contactsSettings.Settings.Index.NumberOfShards = shards
contactsSettings.Settings.Index.NumberOfReplicas = replicas
physicalIndex, err = i.createNewIndex(contactsSettings)
if err != nil {
return "", errors.Wrap(err, "error creating new index")
Expand Down

0 comments on commit de432df

Please sign in to comment.