Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data race in bleve indexer (#16474) #16509

Merged
merged 1 commit into from
Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions modules/indexer/bleve/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2021 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package bleve

import (
"github.com/blevesearch/bleve/v2"
)

// FlushingBatch is a batch of operations that automatically flushes to the
// underlying index once it reaches a certain size.
type FlushingBatch struct {
maxBatchSize int
batch *bleve.Batch
index bleve.Index
}

// NewFlushingBatch creates a new flushing batch for the specified index. Once
// the number of operations in the batch reaches the specified limit, the batch
// automatically flushes its operations to the index.
func NewFlushingBatch(index bleve.Index, maxBatchSize int) *FlushingBatch {
return &FlushingBatch{
maxBatchSize: maxBatchSize,
batch: index.NewBatch(),
index: index,
}
}

// Index add a new index to batch
func (b *FlushingBatch) Index(id string, data interface{}) error {
if err := b.batch.Index(id, data); err != nil {
return err
}
return b.flushIfFull()
}

// Delete add a delete index to batch
func (b *FlushingBatch) Delete(id string) error {
b.batch.Delete(id)
return b.flushIfFull()
}

func (b *FlushingBatch) flushIfFull() error {
if b.batch.Size() < b.maxBatchSize {
return nil
}
return b.Flush()
}

// Flush submit the batch and create a new one
func (b *FlushingBatch) Flush() error {
err := b.index.Batch(b.batch)
if err != nil {
return err
}
b.batch = b.index.NewBatch()
return nil
}
10 changes: 6 additions & 4 deletions modules/indexer/code/bleve.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"code.gitea.io/gitea/modules/analyze"
"code.gitea.io/gitea/modules/charset"
"code.gitea.io/gitea/modules/git"
gitea_bleve "code.gitea.io/gitea/modules/indexer/bleve"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
Expand Down Expand Up @@ -176,7 +177,8 @@ func NewBleveIndexer(indexDir string) (*BleveIndexer, bool, error) {
return indexer, created, err
}

func (b *BleveIndexer) addUpdate(batchWriter git.WriteCloserError, batchReader *bufio.Reader, commitSha string, update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error {
func (b *BleveIndexer) addUpdate(batchWriter git.WriteCloserError, batchReader *bufio.Reader, commitSha string,
update fileUpdate, repo *models.Repository, batch *gitea_bleve.FlushingBatch) error {
// Ignore vendored files in code search
if setting.Indexer.ExcludeVendored && analyze.IsVendor(update.Filename) {
return nil
Expand Down Expand Up @@ -229,7 +231,7 @@ func (b *BleveIndexer) addUpdate(batchWriter git.WriteCloserError, batchReader *
})
}

func (b *BleveIndexer) addDelete(filename string, repo *models.Repository, batch rupture.FlushingBatch) error {
func (b *BleveIndexer) addDelete(filename string, repo *models.Repository, batch *gitea_bleve.FlushingBatch) error {
id := filenameIndexerID(repo.ID, filename)
return batch.Delete(id)
}
Expand Down Expand Up @@ -267,7 +269,7 @@ func (b *BleveIndexer) Close() {

// Index indexes the data
func (b *BleveIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error {
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize)
if len(changes.Updates) > 0 {

batchWriter, batchReader, cancel := git.CatFileBatch(repo.RepoPath())
Expand Down Expand Up @@ -296,7 +298,7 @@ func (b *BleveIndexer) Delete(repoID int64) error {
if err != nil {
return err
}
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize)
for _, hit := range result.Hits {
if err = batch.Delete(hit.ID); err != nil {
return err
Expand Down
6 changes: 4 additions & 2 deletions modules/indexer/issues/bleve.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"os"
"strconv"

gitea_bleve "code.gitea.io/gitea/modules/indexer/bleve"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/util"

"github.com/blevesearch/bleve/v2"
"github.com/blevesearch/bleve/v2/analysis/analyzer/custom"
"github.com/blevesearch/bleve/v2/analysis/token/lowercase"
Expand Down Expand Up @@ -197,7 +199,7 @@ func (b *BleveIndexer) Close() {

// Index will save the index data
func (b *BleveIndexer) Index(issues []*IndexerData) error {
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize)
for _, issue := range issues {
if err := batch.Index(indexerID(issue.ID), struct {
RepoID int64
Expand All @@ -218,7 +220,7 @@ func (b *BleveIndexer) Index(issues []*IndexerData) error {

// Delete deletes indexes by ids
func (b *BleveIndexer) Delete(ids ...int64) error {
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize)
for _, id := range ids {
if err := batch.Delete(indexerID(id)); err != nil {
return err
Expand Down