diff --git a/monstache.go b/monstache.go index eee74ea..dc376b8 100644 --- a/monstache.go +++ b/monstache.go @@ -26,6 +26,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "syscall" "text/template" "time" @@ -166,6 +167,9 @@ type indexClient struct { directReadsPending bool externalShutdown bool rwmutex sync.RWMutex + bulkErrs atomic.Int64 + bulkBackoff elastic.Backoff + bulkBackoffC chan struct{} } type sigHandler struct { @@ -549,27 +553,59 @@ func (config *configOptions) ignoreCollectionForDirectReads(col string) bool { return strings.HasPrefix(col, "system.") } -func afterBulk(executionID int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) { - if response == nil || !response.Errors { - return - } - if failed := response.Failed(); failed != nil { - for _, item := range failed { - if item.Status == 409 { - // ignore version conflict since this simply means the doc - // is already in the index - continue +func (ic *indexClient) afterBulk() func(int64, []elastic.BulkableRequest, *elastic.BulkResponse, error) { + return func(executionID int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) { + if response == nil || !response.Errors { + ic.bulkErrs.Store(0) + return + } + if failed := response.Failed(); failed != nil { + backoff := false + for _, item := range failed { + if item.Status == 409 { + // ignore version conflict since this simply means the doc + // is already in the index + continue + } + backoff = true + json, err := json.Marshal(item) + if err != nil { + errorLog.Printf("Unable to marshal bulk response item: %s", err) + } else { + errorLog.Printf("Bulk response item: %s", string(json)) + } } - json, err := json.Marshal(item) - if err != nil { - errorLog.Printf("Unable to marshal bulk response item: %s", err) - } else { - errorLog.Printf("Bulk response item: %s", string(json)) + if backoff { + infoLog.Println("Backing off after bulk indexing failures") + ic.bulkErrs.Add(1) + // signal the event loop to pause pulling new events for a duration + ic.bulkBackoffC <- struct{}{} + // pause the bulk worker for a duration + ic.backoff() } } } } +func (ic *indexClient) backoff() { + consecutiveErrors := int(ic.bulkErrs.Load()) + wait, ok := ic.bulkBackoff.Next(consecutiveErrors) + if !ok { + return + } + timer := time.NewTimer(wait) + defer timer.Stop() + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + defer signal.Stop(sigs) + select { + case <-timer.C: + return + case <-sigs: + return + } +} + func (config *configOptions) parseElasticsearchVersion(number string) (err error) { if number == "" { err = errors.New("Elasticsearch version cannot be blank") @@ -593,7 +629,8 @@ func (config *configOptions) parseElasticsearchVersion(number string) (err error return } -func (config *configOptions) newBulkProcessor(client *elastic.Client) (bulk *elastic.BulkProcessor, err error) { +func (ic *indexClient) newBulkProcessor(client *elastic.Client) (bulk *elastic.BulkProcessor, err error) { + config := ic.config bulkService := client.BulkProcessor().Name("monstache") bulkService.Workers(config.ElasticMaxConns) bulkService.Stats(config.Stats) @@ -602,18 +639,18 @@ func (config *configOptions) newBulkProcessor(client *elastic.Client) (bulk *ela if config.ElasticRetry == false { bulkService.Backoff(&elastic.StopBackoff{}) } - bulkService.After(afterBulk) + bulkService.After(ic.afterBulk()) bulkService.FlushInterval(time.Duration(config.ElasticMaxSeconds) * time.Second) return bulkService.Do(context.Background()) } -func (config *configOptions) newStatsBulkProcessor(client *elastic.Client) (bulk *elastic.BulkProcessor, err error) { +func (ic *indexClient) newStatsBulkProcessor(client *elastic.Client) (bulk *elastic.BulkProcessor, err error) { bulkService := client.BulkProcessor().Name("monstache-stats") bulkService.Workers(1) bulkService.Stats(false) bulkService.BulkActions(-1) bulkService.BulkSize(-1) - bulkService.After(afterBulk) + bulkService.After(ic.afterBulk()) bulkService.FlushInterval(time.Duration(5) * time.Second) return bulkService.Do(context.Background()) } @@ -1342,11 +1379,11 @@ func parseIndexMeta(op *gtm.Op) (meta *indexingMeta) { func (ic *indexClient) addFileContent(op *gtm.Op) (err error) { op.Data["file"] = "" - var gridByteBuffer bytes.Buffer + var fileContentBuilder strings.Builder db, bucketName := ic.mongo.Database(op.GetDatabase()), strings.SplitN(op.GetCollection(), ".", 2)[0] - encoder := base64.NewEncoder(base64.StdEncoding, &gridByteBuffer) + encoder := base64.NewEncoder(base64.StdEncoding, &fileContentBuilder) opts := &options.BucketOptions{} opts.SetName(bucketName) var bucket *gridfs.Bucket @@ -1366,7 +1403,7 @@ func (ic *indexClient) addFileContent(op *gtm.Op) (err error) { if err = encoder.Close(); err != nil { return } - op.Data["file"] = string(gridByteBuffer.Bytes()) + op.Data["file"] = fileContentBuilder.String() return } @@ -4373,14 +4410,13 @@ func (ic *indexClient) setupFileIndexing() { } func (ic *indexClient) setupBulk() { - config := ic.config - bulk, err := config.newBulkProcessor(ic.client) + bulk, err := ic.newBulkProcessor(ic.client) if err != nil { errorLog.Fatalf("Unable to start bulk processor: %s", err) } var bulkStats *elastic.BulkProcessor - if config.IndexStats { - bulkStats, err = config.newStatsBulkProcessor(ic.client) + if ic.config.IndexStats { + bulkStats, err = ic.newStatsBulkProcessor(ic.client) if err != nil { errorLog.Fatalf("Unable to start stats bulk processor: %s", err) } @@ -4977,6 +5013,8 @@ func (ic *indexClient) eventLoop() { ic.sigH.clientStartedC <- ic for { select { + case <-ic.bulkBackoffC: + ic.backoff() case timeout := <-ic.doneC: ic.enabled = false ic.shutdown(timeout) @@ -5179,7 +5217,6 @@ func (ic *indexClient) saveTimestampFromServerStatus() { } else { ic.processErr(err) } - return } func (ic *indexClient) saveTimestampFromReplStatus() { @@ -5297,24 +5334,26 @@ func main() { elasticClient := buildElasticClient(config) ic := &indexClient{ - config: config, - mongo: mongoClient, - client: elasticClient, - fileWg: &sync.WaitGroup{}, - indexWg: &sync.WaitGroup{}, - processWg: &sync.WaitGroup{}, - relateWg: &sync.WaitGroup{}, - opsConsumed: make(chan bool), - closeC: make(chan bool), - doneC: make(chan int), - enabled: true, - indexC: make(chan *gtm.Op), - processC: make(chan *gtm.Op), - fileC: make(chan *gtm.Op), - relateC: make(chan *gtm.Op, config.RelateBuffer), - statusReqC: make(chan *statusRequest), - sigH: sh, - tokens: bson.M{}, + config: config, + mongo: mongoClient, + client: elasticClient, + fileWg: &sync.WaitGroup{}, + indexWg: &sync.WaitGroup{}, + processWg: &sync.WaitGroup{}, + relateWg: &sync.WaitGroup{}, + opsConsumed: make(chan bool), + closeC: make(chan bool), + doneC: make(chan int), + enabled: true, + indexC: make(chan *gtm.Op), + processC: make(chan *gtm.Op), + fileC: make(chan *gtm.Op), + relateC: make(chan *gtm.Op, config.RelateBuffer), + statusReqC: make(chan *statusRequest), + sigH: sh, + tokens: bson.M{}, + bulkBackoffC: make(chan struct{}), + bulkBackoff: elastic.NewExponentialBackoff(1*time.Minute, 1*time.Hour), } ic.run()