Skip to content

Commit

Permalink
Reduce repo indexer disk usage
Browse files Browse the repository at this point in the history
  • Loading branch information
ethantkoenig committed Feb 5, 2018
1 parent 283e87d commit 55a3db8
Show file tree
Hide file tree
Showing 14 changed files with 704 additions and 97 deletions.
4 changes: 2 additions & 2 deletions models/issue_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func populateIssueIndexer() error {
return err
}
for _, issue := range issues {
if err := batch.Add(issue.update()); err != nil {
if err := issue.update().AddToFlushingBatch(batch); err != nil {
return err
}
}
Expand All @@ -78,7 +78,7 @@ func processIssueIndexerUpdateQueue() {
issue, err := GetIssueByID(issueID)
if err != nil {
log.Error(4, "GetIssueByID: %v", err)
} else if err = batch.Add(issue.update()); err != nil {
} else if err = issue.update().AddToFlushingBatch(batch); err != nil {
log.Error(4, "IssueIndexer: %v", err)
}
}
Expand Down
16 changes: 10 additions & 6 deletions models/repo_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"code.gitea.io/gitea/modules/indexer"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"

"github.com/ethantkoenig/rupture"
)

// RepoIndexerStatus status of a repo's entry in the repo indexer
Expand Down Expand Up @@ -187,7 +189,7 @@ func getRepoChanges(repo *Repository, revision string) (*repoChanges, error) {
return nonGenesisChanges(repo, revision)
}

func addUpdate(update fileUpdate, repo *Repository, batch *indexer.Batch) error {
func addUpdate(update fileUpdate, repo *Repository, batch rupture.FlushingBatch) error {
stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha).
RunInDir(repo.RepoPath())
if err != nil {
Expand All @@ -206,24 +208,26 @@ func addUpdate(update fileUpdate, repo *Repository, batch *indexer.Batch) error
} else if !base.IsTextFile(fileContents) {
return nil
}
return batch.Add(indexer.RepoIndexerUpdate{
indexerUpdate := indexer.RepoIndexerUpdate{
Filepath: update.Filename,
Op: indexer.RepoIndexerOpUpdate,
Data: &indexer.RepoIndexerData{
RepoID: repo.ID,
Content: string(fileContents),
},
})
}
return indexerUpdate.AddToFlushingBatch(batch)
}

func addDelete(filename string, repo *Repository, batch *indexer.Batch) error {
return batch.Add(indexer.RepoIndexerUpdate{
func addDelete(filename string, repo *Repository, batch rupture.FlushingBatch) error {
indexerUpdate := indexer.RepoIndexerUpdate{
Filepath: filename,
Op: indexer.RepoIndexerOpDelete,
Data: &indexer.RepoIndexerData{
RepoID: repo.ID,
},
})
}
return indexerUpdate.AddToFlushingBatch(batch)
}

// parseGitLsTreeOutput parses the output of a `git ls-tree -r --full-name` command
Expand Down
59 changes: 30 additions & 29 deletions modules/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ package indexer

import (
"fmt"
"os"
"strconv"

"code.gitea.io/gitea/modules/setting"

"github.com/blevesearch/bleve"
"github.com/blevesearch/bleve/analysis/token/unicodenorm"
"github.com/blevesearch/bleve/index/upsidedown"
"github.com/blevesearch/bleve/mapping"
"github.com/blevesearch/bleve/search/query"
"github.com/ethantkoenig/rupture"
)

// indexerID a bleve-compatible unique identifier for an integer id
Expand Down Expand Up @@ -53,40 +58,36 @@ func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error {
})
}

// Update represents an update to an indexer
type Update interface {
addToBatch(batch *bleve.Batch) error
}

const maxBatchSize = 16

// Batch batch of indexer updates that automatically flushes once it
// reaches a certain size
type Batch struct {
batch *bleve.Batch
index bleve.Index
}

// Add add update to batch, possibly flushing
func (batch *Batch) Add(update Update) error {
if err := update.addToBatch(batch.batch); err != nil {
return err
// openIndexer open the index at the specified path, checking for metadata
// updates and bleve version updates. If index needs to be created (or
// re-created), returns (nil, nil)
func openIndexer(path string, latestVersion int) (bleve.Index, error) {
_, err := os.Stat(setting.Indexer.IssuePath)
if err != nil && os.IsNotExist(err) {
return nil, nil
} else if err != nil {
return nil, err
}
return batch.flushIfFull()
}

func (batch *Batch) flushIfFull() error {
if batch.batch.Size() >= maxBatchSize {
return batch.Flush()
metadata, err := rupture.ReadIndexMetadata(path)
if err != nil {
return nil, err
}
if metadata.Version < latestVersion {
// the indexer is using a previous version, so we should delete it and
// re-populate
return nil, os.RemoveAll(path)
}
return nil
}

// Flush manually flush the batch, regardless of its size
func (batch *Batch) Flush() error {
if err := batch.index.Batch(batch.batch); err != nil {
return err
index, err := bleve.Open(path)
if err != nil && err == upsidedown.IncompatibleVersion {
// the indexer was built with a previous version of bleve, so we should
// delete it and re-populate
return nil, os.RemoveAll(path)
} else if err != nil {
return nil, err
}
batch.batch.Reset()
return nil
return index, nil
}
59 changes: 32 additions & 27 deletions modules/indexer/issue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,26 @@
package indexer

import (
"os"

"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"

"github.com/blevesearch/bleve"
"github.com/blevesearch/bleve/analysis/analyzer/custom"
"github.com/blevesearch/bleve/analysis/token/lowercase"
"github.com/blevesearch/bleve/analysis/tokenizer/unicode"
"github.com/blevesearch/bleve/index/upsidedown"
"github.com/ethantkoenig/rupture"
)

// issueIndexer (thread-safe) index for searching issues
var issueIndexer bleve.Index

const (
issueIndexerAnalyzer = "issueIndexer"
issueIndexerDocType = "issueIndexerDocType"

issueIndexerLatestVersion = 1
)

// IssueIndexerData data stored in the issue indexer
type IssueIndexerData struct {
RepoID int64
Expand All @@ -28,35 +33,33 @@ type IssueIndexerData struct {
Comments []string
}

// Type returns the document type, for bleve's mapping.Classifier interface.
func (i *IssueIndexerData) Type() string {
return issueIndexerDocType
}

// IssueIndexerUpdate an update to the issue indexer
type IssueIndexerUpdate struct {
IssueID int64
Data *IssueIndexerData
}

func (update IssueIndexerUpdate) addToBatch(batch *bleve.Batch) error {
return batch.Index(indexerID(update.IssueID), update.Data)
// AddToFlushingBatch adds the update to the given flushing batch.
func (i IssueIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) error {
return batch.Index(indexerID(i.IssueID), i.Data)
}

const issueIndexerAnalyzer = "issueIndexer"

// InitIssueIndexer initialize issue indexer
func InitIssueIndexer(populateIndexer func() error) {
_, err := os.Stat(setting.Indexer.IssuePath)
if err != nil && !os.IsNotExist(err) {
var err error
issueIndexer, err = openIndexer(setting.Indexer.IssuePath, issueIndexerLatestVersion)
if err != nil {
log.Fatal(4, "InitIssueIndexer: %v", err)
} else if err == nil {
issueIndexer, err = bleve.Open(setting.Indexer.IssuePath)
if err == nil {
return
} else if err != upsidedown.IncompatibleVersion {
log.Fatal(4, "InitIssueIndexer, open index: %v", err)
}
log.Warn("Incompatible bleve version, deleting and recreating issue indexer")
if err = os.RemoveAll(setting.Indexer.IssuePath); err != nil {
log.Fatal(4, "InitIssueIndexer: remove index, %v", err)
}
}
if issueIndexer != nil {
return
}

if err = createIssueIndexer(); err != nil {
log.Fatal(4, "InitIssuesIndexer: create index, %v", err)
}
Expand All @@ -70,9 +73,13 @@ func createIssueIndexer() error {
mapping := bleve.NewIndexMapping()
docMapping := bleve.NewDocumentMapping()

docMapping.AddFieldMappingsAt("RepoID", bleve.NewNumericFieldMapping())
numericFieldMapping := bleve.NewNumericFieldMapping()
numericFieldMapping.IncludeInAll = false
docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping)

textFieldMapping := bleve.NewTextFieldMapping()
textFieldMapping.Store = false
textFieldMapping.IncludeInAll = false
docMapping.AddFieldMappingsAt("Title", textFieldMapping)
docMapping.AddFieldMappingsAt("Content", textFieldMapping)
docMapping.AddFieldMappingsAt("Comments", textFieldMapping)
Expand All @@ -89,19 +96,17 @@ func createIssueIndexer() error {
}

mapping.DefaultAnalyzer = issueIndexerAnalyzer
mapping.AddDocumentMapping("issues", docMapping)
mapping.AddDocumentMapping(issueIndexerDocType, docMapping)
mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping())

var err error
issueIndexer, err = bleve.New(setting.Indexer.IssuePath, mapping)
return err
}

// IssueIndexerBatch batch to add updates to
func IssueIndexerBatch() *Batch {
return &Batch{
batch: issueIndexer.NewBatch(),
index: issueIndexer,
}
func IssueIndexerBatch() rupture.FlushingBatch {
return rupture.NewFlushingBatch(issueIndexer, maxBatchSize)
}

// SearchIssuesByKeyword searches for issues by given conditions.
Expand Down
Loading

0 comments on commit 55a3db8

Please sign in to comment.