Skip to content

Commit

Permalink
[dbnode] Optimize filesetFiles function (#3900)
Browse files Browse the repository at this point in the history
* Initial refactoring.

* Small cleanup.

* Deleted unused code.

* Small cleanup after code review.
  • Loading branch information
soundvibe authored Nov 9, 2021
1 parent 513748e commit 59ea90c
Showing 1 changed file with 70 additions and 74 deletions.
144 changes: 70 additions & 74 deletions src/dbnode/persist/fs/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,38 +411,6 @@ func (a commitlogsByTimeAndIndexAscending) Less(i, j int) bool {
return ti.Equal(tj) && ii < ij
}

// dataFileSetFilesByTimeAndVolumeIndexAscending sorts file sets files by their
// block start times and volume index in ascending order. If the files do not
// have block start times or indexes in their names, the result is undefined.
type dataFileSetFilesByTimeAndVolumeIndexAscending []string

func (a dataFileSetFilesByTimeAndVolumeIndexAscending) Len() int { return len(a) }
func (a dataFileSetFilesByTimeAndVolumeIndexAscending) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a dataFileSetFilesByTimeAndVolumeIndexAscending) Less(i, j int) bool {
ti, ii, _ := TimeAndVolumeIndexFromDataFileSetFilename(a[i])
tj, ij, _ := TimeAndVolumeIndexFromDataFileSetFilename(a[j])
if ti.Before(tj) {
return true
}
return ti.Equal(tj) && ii < ij
}

// fileSetFilesByTimeAndVolumeIndexAscending sorts file sets files by their
// block start times and volume index in ascending order. If the files do not
// have block start times or indexes in their names, the result is undefined.
type fileSetFilesByTimeAndVolumeIndexAscending []string

func (a fileSetFilesByTimeAndVolumeIndexAscending) Len() int { return len(a) }
func (a fileSetFilesByTimeAndVolumeIndexAscending) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a fileSetFilesByTimeAndVolumeIndexAscending) Less(i, j int) bool {
ti, ii, _ := TimeAndVolumeIndexFromFileSetFilename(a[i])
tj, ij, _ := TimeAndVolumeIndexFromFileSetFilename(a[j])
if ti.Before(tj) {
return true
}
return ti.Equal(tj) && ii < ij
}

// Returns the positions of filename delimiters ('-' and '.') and the number of
// delimeters found, to be used in conjunction with the intComponentAtIndex
// function to extract filename components. This function is deliberately
Expand Down Expand Up @@ -1231,7 +1199,65 @@ func SortedCommitLogFiles(commitLogsDir string) ([]string, error) {
return sortedCommitLogFiles(commitLogsDir, commitLogFilePattern)
}

type filesetFile struct {
volumeIndex int
blockStart xtime.UnixNano
fileName string
}

type toSortableFn func(files []string) sort.Interface
type toBlockStartAndVolumeIndexFn func(file string) (xtime.UnixNano, int, error)
type sortedFilesetFiles []filesetFile

func (s sortedFilesetFiles) Len() int {
return len(s)
}

func (s sortedFilesetFiles) Less(i, j int) bool {
iStart := s[i].blockStart
jStart := s[j].blockStart

if iStart.Before(jStart) {
return true
}

jVolume := s[j].volumeIndex
iVolume := s[i].volumeIndex
return iStart.Equal(jStart) && iVolume < jVolume
}

func (s sortedFilesetFiles) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

func findSortedFilesetFiles(
fileDir string, pattern string,
fn toBlockStartAndVolumeIndexFn,
) (sortedFilesetFiles, error) {
matched, err := filepath.Glob(path.Join(fileDir, pattern))
if err != nil {
return nil, err
}
if len(matched) == 0 {
return nil, nil
}
result := make([]filesetFile, len(matched))
for i, file := range matched {
blockStart, volume, err := fn(file)
if err != nil {
return nil, err
}

result[i] = filesetFile{
fileName: file,
blockStart: blockStart,
volumeIndex: volume,
}
}

sort.Sort(sortedFilesetFiles(result))
return result, nil
}

func findFiles(fileDir string, pattern string, fn toSortableFn) ([]string, error) {
matched, err := filepath.Glob(path.Join(fileDir, pattern))
Expand Down Expand Up @@ -1278,22 +1304,18 @@ type filesetFilesSelector struct {

func filesetFiles(args filesetFilesSelector) (FileSetFilesSlice, error) {
var (
byTimeAsc []string
byTimeAsc sortedFilesetFiles
err error
)
switch args.fileSetType {
case persist.FileSetFlushType:
switch args.contentType {
case persist.FileSetDataContentType:
dir := ShardDataDirPath(args.filePathPrefix, args.namespace, args.shard)
byTimeAsc, err = findFiles(dir, args.pattern, func(files []string) sort.Interface {
return dataFileSetFilesByTimeAndVolumeIndexAscending(files)
})
byTimeAsc, err = findSortedFilesetFiles(dir, args.pattern, TimeAndVolumeIndexFromDataFileSetFilename)
case persist.FileSetIndexContentType:
dir := NamespaceIndexDataDirPath(args.filePathPrefix, args.namespace)
byTimeAsc, err = findFiles(dir, args.pattern, func(files []string) sort.Interface {
return fileSetFilesByTimeAndVolumeIndexAscending(files)
})
byTimeAsc, err = findSortedFilesetFiles(dir, args.pattern, TimeAndVolumeIndexFromFileSetFilename)
default:
return nil, fmt.Errorf("unknown content type: %d", args.contentType)
}
Expand All @@ -1307,9 +1329,7 @@ func filesetFiles(args filesetFilesSelector) (FileSetFilesSlice, error) {
default:
return nil, fmt.Errorf("unknown content type: %d", args.contentType)
}
byTimeAsc, err = findFiles(dir, args.pattern, func(files []string) sort.Interface {
return fileSetFilesByTimeAndVolumeIndexAscending(files)
})
byTimeAsc, err = findSortedFilesetFiles(dir, args.pattern, TimeAndVolumeIndexFromFileSetFilename)
default:
return nil, fmt.Errorf("unknown type: %d", args.fileSetType)
}
Expand All @@ -1328,51 +1348,27 @@ func filesetFiles(args filesetFilesSelector) (FileSetFilesSlice, error) {
filesetFiles = []FileSetFile{}
)
for _, file := range byTimeAsc {
var (
currentFileBlockStart xtime.UnixNano
volumeIndex int
err error
)
switch args.fileSetType {
case persist.FileSetFlushType:
switch args.contentType {
case persist.FileSetDataContentType:
currentFileBlockStart, volumeIndex, err = TimeAndVolumeIndexFromDataFileSetFilename(file)
case persist.FileSetIndexContentType:
currentFileBlockStart, volumeIndex, err = TimeAndVolumeIndexFromFileSetFilename(file)
default:
return nil, fmt.Errorf("unknown content type: %d", args.contentType)
}
case persist.FileSetSnapshotType:
currentFileBlockStart, volumeIndex, err = TimeAndVolumeIndexFromFileSetFilename(file)
default:
return nil, fmt.Errorf("unknown type: %d", args.fileSetType)
}
if err != nil {
return nil, err
}

if latestBlockStart == 0 {
latestFileSetFile = NewFileSetFile(FileSetFileIdentifier{
Namespace: args.namespace,
BlockStart: currentFileBlockStart,
BlockStart: file.blockStart,
Shard: args.shard,
VolumeIndex: volumeIndex,
VolumeIndex: file.volumeIndex,
}, args.filePathPrefix)
} else if !currentFileBlockStart.Equal(latestBlockStart) || latestVolumeIndex != volumeIndex {
} else if !file.blockStart.Equal(latestBlockStart) || latestVolumeIndex != file.volumeIndex {
filesetFiles = append(filesetFiles, latestFileSetFile)
latestFileSetFile = NewFileSetFile(FileSetFileIdentifier{
Namespace: args.namespace,
BlockStart: currentFileBlockStart,
BlockStart: file.blockStart,
Shard: args.shard,
VolumeIndex: volumeIndex,
VolumeIndex: file.volumeIndex,
}, args.filePathPrefix)
}

latestBlockStart = currentFileBlockStart
latestVolumeIndex = volumeIndex
latestBlockStart = file.blockStart
latestVolumeIndex = file.volumeIndex

latestFileSetFile.AbsoluteFilePaths = append(latestFileSetFile.AbsoluteFilePaths, file)
latestFileSetFile.AbsoluteFilePaths = append(latestFileSetFile.AbsoluteFilePaths, file.fileName)
}

filesetFiles = append(filesetFiles, latestFileSetFile)
Expand Down

0 comments on commit 59ea90c

Please sign in to comment.