diff --git a/cmd/rp-indexer/main.go b/cmd/rp-indexer/main.go index 021bea1..b959c05 100644 --- a/cmd/rp-indexer/main.go +++ b/cmd/rp-indexer/main.go @@ -3,86 +3,81 @@ package main import ( "database/sql" "os" - "time" + "os/signal" + "syscall" "github.com/evalphobia/logrus_sentry" _ "github.com/lib/pq" "github.com/nyaruka/ezconf" + indexer "github.com/nyaruka/rp-indexer" "github.com/nyaruka/rp-indexer/indexers" log "github.com/sirupsen/logrus" ) -type config struct { - ElasticURL string `help:"the url for our elastic search instance"` - DB string `help:"the connection string for our database"` - Index string `help:"the alias for our contact index"` - Poll int `help:"the number of seconds to wait between checking for updated contacts"` - Rebuild bool `help:"whether to rebuild the index, swapping it when complete, then exiting (default false)"` - Cleanup bool `help:"whether to remove old indexes after a rebuild"` - LogLevel string `help:"the log level, one of error, warn, info, debug"` - SentryDSN string `help:"the sentry configuration to log errors to, if any"` -} - func main() { - config := config{ - ElasticURL: "http://localhost:9200", - DB: "postgres://localhost/temba?sslmode=disable", - Index: "contacts", - Poll: 5, - Rebuild: false, - Cleanup: false, - LogLevel: "info", - } - loader := ezconf.NewLoader(&config, "indexer", "Indexes RapidPro contacts to ElasticSearch", []string{"indexer.toml"}) + cfg := indexer.NewDefaultConfig() + loader := ezconf.NewLoader(cfg, "indexer", "Indexes RapidPro contacts to ElasticSearch", []string{"indexer.toml"}) loader.MustLoad() // configure our logger log.SetOutput(os.Stdout) log.SetFormatter(&log.TextFormatter{}) - level, err := log.ParseLevel(config.LogLevel) + level, err := log.ParseLevel(cfg.LogLevel) if err != nil { log.Fatalf("Invalid log level '%s'", level) } log.SetLevel(level) // if we have a DSN entry, try to initialize it - if config.SentryDSN != "" { - hook, err := logrus_sentry.NewSentryHook(config.SentryDSN, []log.Level{log.PanicLevel, log.FatalLevel, log.ErrorLevel}) + if cfg.SentryDSN != "" { + hook, err := logrus_sentry.NewSentryHook(cfg.SentryDSN, []log.Level{log.PanicLevel, log.FatalLevel, log.ErrorLevel}) hook.Timeout = 0 hook.StacktraceConfiguration.Enable = true hook.StacktraceConfiguration.Skip = 4 hook.StacktraceConfiguration.Context = 5 if err != nil { - log.Fatalf("invalid sentry DSN: '%s': %s", config.SentryDSN, err) + log.Fatalf("invalid sentry DSN: '%s': %s", cfg.SentryDSN, err) } log.StandardLogger().Hooks.Add(hook) } - db, err := sql.Open("postgres", config.DB) + db, err := sql.Open("postgres", cfg.DB) if err != nil { - log.Fatal(err) + log.Fatalf("unable to connect to database") } - ci := indexers.NewContactIndexer(config.ElasticURL, config.Index, 500) - - for { - _, err := ci.Index(db, config.Rebuild, config.Cleanup) + idxrs := []indexers.Indexer{ + indexers.NewContactIndexer(cfg.ElasticURL, cfg.Index, 500), + } - if err != nil { - if config.Rebuild { - log.WithField("index", config.Index).WithError(err).Fatal("error during rebuilding") - } else { - log.WithField("index", config.Index).WithError(err).Error("error during indexing") - } + if cfg.Rebuild { + // if rebuilding, just do a complete index and quit. In future when we support multiple indexers, + // the rebuild argument can be become the name of the index to rebuild, e.g. --rebuild=contacts + idxr := idxrs[0] + if _, err := idxr.Index(db, true, cfg.Cleanup); err != nil { + log.WithField("indexer", idxr.Name()).WithError(err).Fatal("error during rebuilding") } + } else { + d := indexer.NewDaemon(cfg, db, idxrs) + d.Start() - // if we were rebuilding then we're done - if config.Rebuild { - os.Exit(0) - } + handleSignals(d) + } +} - // sleep a bit before starting again - time.Sleep(time.Second * 5) +// handleSignals takes care of trapping quit, interrupt or terminate signals and doing the right thing +func handleSignals(d *indexer.Daemon) { + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + + for { + sig := <-sigs + switch sig { + case syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT: + log.WithField("signal", sig).Info("received exit signal, exiting") + d.Stop() + return + } } } diff --git a/config.go b/config.go new file mode 100644 index 0000000..c83e755 --- /dev/null +++ b/config.go @@ -0,0 +1,33 @@ +package indexer + +import "os" + +type Config struct { + ElasticURL string `help:"the url for our elastic search instance"` + DB string `help:"the connection string for our database"` + Index string `help:"the alias for our contact index"` + Poll int `help:"the number of seconds to wait between checking for updated contacts"` + Rebuild bool `help:"whether to rebuild the index, swapping it when complete, then exiting (default false)"` + Cleanup bool `help:"whether to remove old indexes after a rebuild"` + LogLevel string `help:"the log level, one of error, warn, info, debug"` + SentryDSN string `help:"the sentry configuration to log errors to, if any"` + + LibratoUsername string `help:"the username that will be used to authenticate to Librato"` + LibratoToken string `help:"the token that will be used to authenticate to Librato"` + InstanceName string `help:"the unique name of this instance used for analytics"` +} + +func NewDefaultConfig() *Config { + hostname, _ := os.Hostname() + + return &Config{ + ElasticURL: "http://localhost:9200", + DB: "postgres://localhost/temba?sslmode=disable", + Index: "contacts", + Poll: 5, + Rebuild: false, + Cleanup: false, + LogLevel: "info", + InstanceName: hostname, + } +} diff --git a/daemon.go b/daemon.go new file mode 100644 index 0000000..19b78d5 --- /dev/null +++ b/daemon.go @@ -0,0 +1,134 @@ +package indexer + +import ( + "database/sql" + "sync" + "time" + + "github.com/nyaruka/librato" + "github.com/nyaruka/rp-indexer/indexers" + "github.com/sirupsen/logrus" +) + +type Daemon struct { + cfg *Config + db *sql.DB + wg *sync.WaitGroup + quit chan bool + indexers []indexers.Indexer + + prevStats map[indexers.Indexer]indexers.Stats +} + +// NewDaemon creates a new daemon to run the given indexers +func NewDaemon(cfg *Config, db *sql.DB, ixs []indexers.Indexer) *Daemon { + return &Daemon{ + cfg: cfg, + db: db, + wg: &sync.WaitGroup{}, + quit: make(chan bool), + indexers: ixs, + prevStats: make(map[indexers.Indexer]indexers.Stats, len(ixs)), + } +} + +// Start starts this daemon +func (d *Daemon) Start() { + // if we have a librato token, configure it + if d.cfg.LibratoToken != "" { + librato.Configure(d.cfg.LibratoUsername, d.cfg.LibratoToken, d.cfg.InstanceName, time.Second, d.wg) + librato.Start() + } + + for _, i := range d.indexers { + d.startIndexer(i, time.Second*5) + } + + d.startStatsReporter(time.Minute) +} + +func (d *Daemon) startIndexer(indexer indexers.Indexer, interval time.Duration) { + d.wg.Add(1) // add ourselves to the wait group + + log := logrus.WithField("indexer", indexer.Name()) + + go func() { + defer func() { + log.Info("indexer exiting") + d.wg.Done() + }() + + for { + select { + case <-d.quit: + return + case <-time.After(interval): + _, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup) + if err != nil { + log.WithError(err).Error("error during indexing") + } + } + } + }() +} + +func (d *Daemon) startStatsReporter(interval time.Duration) { + d.wg.Add(1) // add ourselves to the wait group + + go func() { + defer func() { + logrus.Info("analytics exiting") + d.wg.Done() + }() + + for { + select { + case <-d.quit: + return + case <-time.After(interval): + d.reportStats() + } + } + }() +} + +func (d *Daemon) reportStats() { + metrics := make(map[string]float64, len(d.indexers)*2) + + for _, ix := range d.indexers { + stats := ix.Stats() + prev := d.prevStats[ix] + + indexedInPeriod := stats.Indexed - prev.Indexed + deletedInPeriod := stats.Deleted - prev.Deleted + elapsedInPeriod := stats.Elapsed - prev.Elapsed + rateInPeriod := float64(0) + if indexedInPeriod > 0 && elapsedInPeriod > 0 { + rateInPeriod = float64(indexedInPeriod) / (float64(elapsedInPeriod) / float64(time.Second)) + } + + metrics[ix.Name()+"_indexed"] = float64(indexedInPeriod) + metrics[ix.Name()+"_deleted"] = float64(deletedInPeriod) + metrics[ix.Name()+"_rate"] = rateInPeriod + + d.prevStats[ix] = stats + } + + log := logrus.NewEntry(logrus.StandardLogger()) + + for k, v := range metrics { + librato.Gauge("indexer."+k, v) + log = log.WithField(k, v) + } + + log.Info("stats reported") +} + +// Stop stops this daemon +func (d *Daemon) Stop() { + logrus.Info("daemon stopping") + librato.Stop() + + close(d.quit) + d.wg.Wait() +} diff --git a/go.mod b/go.mod index fcfb589..fbfe75c 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ require ( github.com/lib/pq v1.10.4 github.com/nyaruka/ezconf v0.2.1 github.com/nyaruka/gocommon v1.17.1 + github.com/nyaruka/librato v1.0.0 github.com/olivere/elastic/v7 v7.0.22 github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.8.1 diff --git a/go.sum b/go.sum index f78ba21..2dfdccb 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,7 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfC github.com/jmoiron/sqlx v1.3.4/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= @@ -53,6 +54,8 @@ github.com/nyaruka/ezconf v0.2.1 h1:TDXWoqjqYya1uhou1mAJZg7rgFYL98EB0Tb3+BWtUh0= github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDYYxgnQw= github.com/nyaruka/gocommon v1.17.1 h1:4bbNp+0/BIbne4VDiKOxh3kcbdvEu/WsrsZiG/VyRZ8= github.com/nyaruka/gocommon v1.17.1/go.mod h1:nmYyb7MZDM0iW4DYJKiBzfKuE9nbnx+xSHZasuIBOT0= +github.com/nyaruka/librato v1.0.0 h1:Vznj9WCeC1yZXbBYyYp40KnbmXLbEkjKmHesV/v2SR0= +github.com/nyaruka/librato v1.0.0/go.mod h1:pkRNLFhFurOz0QqBz6/DuTFhHHxAubWxs4Jx+J7yUgg= github.com/nyaruka/phonenumbers v1.0.71/go.mod h1:sDaTZ/KPX5f8qyV9qN+hIm+4ZBARJrupC6LuhshJq1U= github.com/olivere/elastic/v7 v7.0.22 h1:esBA6JJwvYgfms0EVlH7Z+9J4oQ/WUADF2y/nCNDw7s= github.com/olivere/elastic/v7 v7.0.22/go.mod h1:VDexNy9NjmtAkrjNoI7tImv7FR4tf5zUA3ickqu5Pc8= @@ -63,12 +66,14 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ= github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM= github.com/smartystreets/gunit v1.4.2/go.mod h1:ZjM1ozSIMJlAz/ay4SG8PeKF00ckUp+zMHZXV9/bvak= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -96,6 +101,7 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/indexers/base.go b/indexers/base.go index 808ddcb..b661784 100644 --- a/indexers/base.go +++ b/indexers/base.go @@ -20,21 +20,24 @@ const indexCommand = `{ "index": { "_id": %d, "_type": "_doc", "version": %d, "v // deletes a document const deleteCommand = `{ "delete" : { "_id": %d, "_type": "_doc", "version": %d, "version_type": "external", "routing": %d} }` +type Stats struct { + Indexed int64 // total number of documents indexed + Deleted int64 // total number of documents deleted + Elapsed time.Duration // total time spent actually indexing +} + // Indexer is base interface for indexers type Indexer interface { Name() string Index(db *sql.DB, rebuild, cleanup bool) (string, error) - Stats() (int64, int64, time.Duration) + Stats() Stats } type baseIndexer struct { elasticURL string name string // e.g. contacts, used as the alias - // statistics - indexedTotal int64 - deletedTotal int64 - elapsedTotal time.Duration + stats Stats } func newBaseIndexer(elasticURL, name string) baseIndexer { @@ -45,8 +48,8 @@ func (i *baseIndexer) Name() string { return i.name } -func (i *baseIndexer) Stats() (int64, int64, time.Duration) { - return i.indexedTotal, i.deletedTotal, i.elapsedTotal +func (i *baseIndexer) Stats() Stats { + return i.stats } func (i *baseIndexer) log() *logrus.Entry { @@ -55,9 +58,9 @@ func (i *baseIndexer) log() *logrus.Entry { // records a complete index and updates statistics func (i *baseIndexer) recordComplete(indexed, deleted int, elapsed time.Duration) { - i.indexedTotal += int64(indexed) - i.deletedTotal += int64(deleted) - i.elapsedTotal += elapsed + i.stats.Indexed += int64(indexed) + i.stats.Deleted += int64(deleted) + i.stats.Elapsed += elapsed i.log().WithField("indexed", indexed).WithField("deleted", deleted).WithField("elapsed", elapsed).Info("completed indexing") } diff --git a/indexers/base_test.go b/indexers/base_test.go index c4a4d54..b473f22 100644 --- a/indexers/base_test.go +++ b/indexers/base_test.go @@ -78,7 +78,7 @@ func assertIndexesWithPrefix(t *testing.T, es *elastic.Client, prefix string, ex } func assertIndexerStats(t *testing.T, ix indexers.Indexer, expectedIndexed, expectedDeleted int64) { - actualIndexed, actualDeleted, _ := ix.Stats() - assert.Equal(t, expectedIndexed, actualIndexed, "indexed mismatch") - assert.Equal(t, expectedDeleted, actualDeleted, "deleted mismatch") + actual := ix.Stats() + assert.Equal(t, expectedIndexed, actual.Indexed, "indexed mismatch") + assert.Equal(t, expectedDeleted, actual.Deleted, "deleted mismatch") } diff --git a/indexers/contacts.go b/indexers/contacts.go index e25d198..c77230e 100644 --- a/indexers/contacts.go +++ b/indexers/contacts.go @@ -60,7 +60,7 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error return "", errors.Wrap(err, "error finding last modified") } - i.log().WithField("index", physicalIndex).WithField("last_modified", lastModified).Info("indexing newer than last modified") + i.log().WithField("index", physicalIndex).WithField("last_modified", lastModified).Debug("indexing newer than last modified") // now index our docs start := time.Now() @@ -243,7 +243,7 @@ func (i *ContactIndexer) indexModified(db *sql.DB, index string, lastModified ti elapsed := time.Since(start) rate := float32(processedCount) / (float32(elapsed) / float32(time.Second)) - i.log().WithField("index", index).WithFields(logrus.Fields{"rate": int(rate), "added": createdCount, "deleted": deletedCount, "elapsed": elapsed}).Info("indexed contact batch") + i.log().WithField("index", index).WithFields(logrus.Fields{"rate": int(rate), "added": createdCount, "deleted": deletedCount, "elapsed": elapsed}).Debug("indexed contact batch") } return createdCount, deletedCount, nil