Skip to content

Commit

Permalink
Rework cmd to use a daemon process
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Mar 24, 2022
1 parent 36db967 commit 3a051b3
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 44 deletions.
83 changes: 39 additions & 44 deletions cmd/rp-indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,86 +3,81 @@ package main
import (
"database/sql"
"os"
"time"
"os/signal"
"syscall"

"github.com/evalphobia/logrus_sentry"
_ "github.com/lib/pq"
"github.com/nyaruka/ezconf"
indexer "github.com/nyaruka/rp-indexer"
"github.com/nyaruka/rp-indexer/indexers"
log "github.com/sirupsen/logrus"
)

type config struct {
ElasticURL string `help:"the url for our elastic search instance"`
DB string `help:"the connection string for our database"`
Index string `help:"the alias for our contact index"`
Poll int `help:"the number of seconds to wait between checking for updated contacts"`
Rebuild bool `help:"whether to rebuild the index, swapping it when complete, then exiting (default false)"`
Cleanup bool `help:"whether to remove old indexes after a rebuild"`
LogLevel string `help:"the log level, one of error, warn, info, debug"`
SentryDSN string `help:"the sentry configuration to log errors to, if any"`
}

func main() {
config := config{
ElasticURL: "http://localhost:9200",
DB: "postgres://localhost/temba?sslmode=disable",
Index: "contacts",
Poll: 5,
Rebuild: false,
Cleanup: false,
LogLevel: "info",
}
loader := ezconf.NewLoader(&config, "indexer", "Indexes RapidPro contacts to ElasticSearch", []string{"indexer.toml"})
cfg := indexer.NewDefaultConfig()
loader := ezconf.NewLoader(cfg, "indexer", "Indexes RapidPro contacts to ElasticSearch", []string{"indexer.toml"})
loader.MustLoad()

// configure our logger
log.SetOutput(os.Stdout)
log.SetFormatter(&log.TextFormatter{})

level, err := log.ParseLevel(config.LogLevel)
level, err := log.ParseLevel(cfg.LogLevel)
if err != nil {
log.Fatalf("Invalid log level '%s'", level)
}
log.SetLevel(level)

// if we have a DSN entry, try to initialize it
if config.SentryDSN != "" {
hook, err := logrus_sentry.NewSentryHook(config.SentryDSN, []log.Level{log.PanicLevel, log.FatalLevel, log.ErrorLevel})
if cfg.SentryDSN != "" {
hook, err := logrus_sentry.NewSentryHook(cfg.SentryDSN, []log.Level{log.PanicLevel, log.FatalLevel, log.ErrorLevel})
hook.Timeout = 0
hook.StacktraceConfiguration.Enable = true
hook.StacktraceConfiguration.Skip = 4
hook.StacktraceConfiguration.Context = 5
if err != nil {
log.Fatalf("invalid sentry DSN: '%s': %s", config.SentryDSN, err)
log.Fatalf("invalid sentry DSN: '%s': %s", cfg.SentryDSN, err)
}
log.StandardLogger().Hooks.Add(hook)
}

db, err := sql.Open("postgres", config.DB)
db, err := sql.Open("postgres", cfg.DB)
if err != nil {
log.Fatal(err)
log.Fatalf("unable to connect to database")
}

ci := indexers.NewContactIndexer(config.ElasticURL, config.Index, 500)

for {
_, err := ci.Index(db, config.Rebuild, config.Cleanup)
idxrs := []indexers.Indexer{
indexers.NewContactIndexer(cfg.ElasticURL, cfg.Index, 500),
}

if err != nil {
if config.Rebuild {
log.WithField("index", config.Index).WithError(err).Fatal("error during rebuilding")
} else {
log.WithField("index", config.Index).WithError(err).Error("error during indexing")
}
if cfg.Rebuild {
// if rebuilding, just do a complete index and quit. In future when we support multiple indexers,
// the rebuild argument can be become the name of the index to rebuild, e.g. --rebuild=contacts
idxr := idxrs[0]
if _, err := idxr.Index(db, true, cfg.Cleanup); err != nil {
log.WithField("indexer", idxr.Name()).WithError(err).Fatal("error during rebuilding")
}
} else {
d := indexer.NewDaemon(cfg, db, idxrs)
d.Start()

// if we were rebuilding then we're done
if config.Rebuild {
os.Exit(0)
}
handleSignals(d)
}
}

// sleep a bit before starting again
time.Sleep(time.Second * 5)
// handleSignals takes care of trapping quit, interrupt or terminate signals and doing the right thing
func handleSignals(d *indexer.Daemon) {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

for {
sig := <-sigs
switch sig {
case syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT:
log.WithField("signal", sig).Info("received exit signal, exiting")
d.Stop()
return
}
}
}
33 changes: 33 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package indexer

import "os"

type Config struct {
ElasticURL string `help:"the url for our elastic search instance"`
DB string `help:"the connection string for our database"`
Index string `help:"the alias for our contact index"`
Poll int `help:"the number of seconds to wait between checking for updated contacts"`
Rebuild bool `help:"whether to rebuild the index, swapping it when complete, then exiting (default false)"`
Cleanup bool `help:"whether to remove old indexes after a rebuild"`
LogLevel string `help:"the log level, one of error, warn, info, debug"`
SentryDSN string `help:"the sentry configuration to log errors to, if any"`

LibratoUsername string `help:"the username that will be used to authenticate to Librato"`
LibratoToken string `help:"the token that will be used to authenticate to Librato"`
InstanceName string `help:"the unique name of this instance used for analytics"`
}

func NewDefaultConfig() *Config {
hostname, _ := os.Hostname()

return &Config{
ElasticURL: "http://localhost:9200",
DB: "postgres://localhost/temba?sslmode=disable",
Index: "contacts",
Poll: 5,
Rebuild: false,
Cleanup: false,
LogLevel: "info",
InstanceName: hostname,
}
}
75 changes: 75 additions & 0 deletions daemon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package indexer

import (
"database/sql"
"sync"
"time"

"github.com/nyaruka/librato"
"github.com/nyaruka/rp-indexer/indexers"
"github.com/sirupsen/logrus"
)

type Daemon struct {
cfg *Config
db *sql.DB
wg *sync.WaitGroup
quit chan bool
indexers []indexers.Indexer
}

// NewDaemon creates a new daemon to run the given indexers
func NewDaemon(cfg *Config, db *sql.DB, ixs []indexers.Indexer) *Daemon {
return &Daemon{
cfg: cfg,
db: db,
wg: &sync.WaitGroup{},
quit: make(chan bool),
indexers: ixs,
}
}

// Start starts this daemon
func (d *Daemon) Start() {
// if we have a librato token, configure it
if d.cfg.LibratoToken != "" {
librato.Configure(d.cfg.LibratoUsername, d.cfg.LibratoToken, d.cfg.InstanceName, time.Second, d.wg)
librato.Start()
}

for _, i := range d.indexers {
d.startIndexer(i, time.Second*5)
}
}

func (d *Daemon) startIndexer(indexer indexers.Indexer, interval time.Duration) {
d.wg.Add(1) // add ourselves to the wait group

go func() {
defer func() {
logrus.WithField("indexer", indexer.Name()).Info("indexer exiting")
d.wg.Done()
}()

for {
select {
case <-d.quit:
return
case <-time.After(interval):
_, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup)
if err != nil {
logrus.WithField("index", d.cfg.Index).WithError(err).Error("error during indexing")
}
}
}
}()
}

// Stop stops this daemon
func (d *Daemon) Stop() {
logrus.Info("daemon stopping")
librato.Stop()

close(d.quit)
d.wg.Wait()
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ require (
github.com/lib/pq v1.10.4
github.com/nyaruka/ezconf v0.2.1
github.com/nyaruka/gocommon v1.17.1
github.com/nyaruka/librato v1.0.0
github.com/olivere/elastic/v7 v7.0.22
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.8.1
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfC
github.com/jmoiron/sqlx v1.3.4/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
Expand All @@ -53,6 +54,8 @@ github.com/nyaruka/ezconf v0.2.1 h1:TDXWoqjqYya1uhou1mAJZg7rgFYL98EB0Tb3+BWtUh0=
github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDYYxgnQw=
github.com/nyaruka/gocommon v1.17.1 h1:4bbNp+0/BIbne4VDiKOxh3kcbdvEu/WsrsZiG/VyRZ8=
github.com/nyaruka/gocommon v1.17.1/go.mod h1:nmYyb7MZDM0iW4DYJKiBzfKuE9nbnx+xSHZasuIBOT0=
github.com/nyaruka/librato v1.0.0 h1:Vznj9WCeC1yZXbBYyYp40KnbmXLbEkjKmHesV/v2SR0=
github.com/nyaruka/librato v1.0.0/go.mod h1:pkRNLFhFurOz0QqBz6/DuTFhHHxAubWxs4Jx+J7yUgg=
github.com/nyaruka/phonenumbers v1.0.71/go.mod h1:sDaTZ/KPX5f8qyV9qN+hIm+4ZBARJrupC6LuhshJq1U=
github.com/olivere/elastic/v7 v7.0.22 h1:esBA6JJwvYgfms0EVlH7Z+9J4oQ/WUADF2y/nCNDw7s=
github.com/olivere/elastic/v7 v7.0.22/go.mod h1:VDexNy9NjmtAkrjNoI7tImv7FR4tf5zUA3ickqu5Pc8=
Expand All @@ -63,12 +66,14 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM=
github.com/smartystreets/gunit v1.4.2/go.mod h1:ZjM1ozSIMJlAz/ay4SG8PeKF00ckUp+zMHZXV9/bvak=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
Expand Down Expand Up @@ -96,6 +101,7 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down

0 comments on commit 3a051b3

Please sign in to comment.