Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Large database support for 32bit systems #12362

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@
# It might help users who have slow disks in some cases.
# tsm-use-madv-willneed = false

# If true, then the TSM file will accessed via seek/read operations instead of
# mmap. This is required on 32 bit systems with TSM files that have aggregate
# size larger than the process memory limit (around typically 2 or 3 GB).
# tsm-use-seek = false

# Settings for the inmem index

# The maximum series allowed per database before writes are dropped. This limit can prevent
Expand Down
10 changes: 9 additions & 1 deletion tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,15 @@ type Config struct {
// TSMWillNeed controls whether we hint to the kernel that we intend to
// page in mmap'd sections of TSM files. This setting defaults to off, as it has
// been found to be problematic in some cases. It may help users who have
// slow disks.
// slow disks. If tsm-use-seek is true, using this option will fadvise instead.
TSMWillNeed bool `toml:"tsm-use-madv-willneed"`

// TSMUseSeek controls how the block accessor reads block data.
// If false (the default), mmapped TSM files are used. If true,
// Seek/Read operations are used. Seek/Read operations are necessary
// on 32bit systems with large databases because of the much lower
// mmap memory address space limit.
TSMUseSeek bool `toml:"tsm-use-seek"`
}

// NewConfig returns the default configuration for tsdb.
Expand All @@ -161,6 +168,7 @@ func NewConfig() Config {

TraceLoggingEnabled: false,
TSMWillNeed: false,
TSMUseSeek: false,
}
}

Expand Down
4 changes: 4 additions & 0 deletions tsdb/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dir = "/var/lib/influxdb/data"
wal-dir = "/var/lib/influxdb/wal"
wal-fsync-delay = "10s"
tsm-use-madv-willneed = true
tsm-use-seek = true
`, &c); err != nil {
t.Fatal(err)
}
Expand All @@ -36,6 +37,9 @@ tsm-use-madv-willneed = true
if got, exp := c.TSMWillNeed, true; got != exp {
t.Errorf("unexpected tsm-madv-willneed:\n\nexp=%v\n\ngot=%v\n\n", exp, got)
}
if got, exp := c.TSMUseSeek, true; got != exp {
t.Errorf("unexpected tsm-use-seek:\n\nexp=%v\n\ngot=%v\n\n", exp, got)
}
}

func TestConfig_Validate_Error(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts
fs.WithObserver(opt.FileStoreObserver)
}
fs.tsmMMAPWillNeed = opt.Config.TSMWillNeed
fs.tsmUseSeek = opt.Config.TSMUseSeek

cache := NewCache(uint64(opt.Config.CacheMaxMemorySize))

Expand Down
9 changes: 7 additions & 2 deletions tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ type FileStore struct {

files []TSMFile
tsmMMAPWillNeed bool // If true then the kernel will be advised MMAP_WILLNEED for TSM files.
tsmUseSeek bool // If true, seek/read ops are used instead of mmap for TSM files
openLimiter limiter.Fixed // limit the number of concurrent opening TSM files.

logger *zap.Logger // Logger to be used for important messages
Expand Down Expand Up @@ -532,7 +533,9 @@ func (f *FileStore) Open() error {
defer f.openLimiter.Release()

start := time.Now()
df, err := NewTSMReader(file, WithMadviseWillNeed(f.tsmMMAPWillNeed))
df, err := NewTSMReader(file,
WithMadviseWillNeed(f.tsmMMAPWillNeed),
WithUseSeek(f.tsmUseSeek))
f.logger.Info("Opened file",
zap.String("path", file.Name()),
zap.Int("id", idx),
Expand Down Expand Up @@ -760,7 +763,9 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
}
}

tsm, err := NewTSMReader(fd, WithMadviseWillNeed(f.tsmMMAPWillNeed))
tsm, err := NewTSMReader(fd,
WithMadviseWillNeed(f.tsmMMAPWillNeed),
WithUseSeek(f.tsmUseSeek))
if err != nil {
if newName != oldName {
if err1 := os.Rename(newName, oldName); err1 != nil {
Expand Down
12 changes: 12 additions & 0 deletions tsdb/engine/tsm1/mmap_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,15 @@ func madviseDontNeed(b []byte) error {
func madvise(b []byte, advice int) (err error) {
return unix.Madvise(b, advice)
}

// Fadvise
func fadviseWillNeed(fd uintptr, l int64) error {
return fadvise(fd, l, unix.FADV_WILLNEED)
}

func fadviseDontNeed(fd uintptr, l int64) error {
return fadvise(fd, l, unix.FADV_DONTNEED)
}
func fadvise(fd uintptr, l int64, advice int) (err error) {
return unix.Fadvise(int(fd), 0, l, advice)
}
11 changes: 11 additions & 0 deletions tsdb/engine/tsm1/mmap_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,14 @@ func madvise(b []byte, advice int) error {
// Not implemented
return nil
}
// Fadvise, not supported on Windows
func fadviseWillNeed(fd uintptr, l int64) error {
return nil
}

func fadviseDontNeed(fd uintptr, l int64) error {
return nil
}
func fadvise(fd uintptr, l int64, advice int) (err error) {
return nil
}
84 changes: 30 additions & 54 deletions tsdb/engine/tsm1/reader.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,40 +90,16 @@ func (t *TSMReader) ReadBooleanArrayBlockAt(entry *IndexEntry, vals *tsdb.Boolea
return err
}

// blockAccessor abstracts a method of accessing blocks from a
// TSM file.
type blockAccessor interface {
init() (*indirectIndex, error)
read(key []byte, timestamp int64) ([]Value, error)
readAll(key []byte) ([]Value, error)
readBlock(entry *IndexEntry, values []Value) ([]Value, error)
readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error)
readFloatArrayBlock(entry *IndexEntry, values *tsdb.FloatArray) error
readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error)
readIntegerArrayBlock(entry *IndexEntry, values *tsdb.IntegerArray) error
readUnsignedBlock(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error)
readUnsignedArrayBlock(entry *IndexEntry, values *tsdb.UnsignedArray) error
readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error)
readStringArrayBlock(entry *IndexEntry, values *tsdb.StringArray) error
readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error)
readBooleanArrayBlock(entry *IndexEntry, values *tsdb.BooleanArray) error
readBytes(entry *IndexEntry, buf []byte) (uint32, []byte, error)
rename(path string) error
path() string
close() error
free() error
}

func (m *mmapAccessor) readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error) {
func (m *accessor) readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error) {
m.incAccess()

m.mu.RLock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
if int64(m.b.length()) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return nil, ErrTSMClosed
}

a, err := DecodeFloatBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
a, err := DecodeFloatBlock(m.b.read(entry.Offset+4, entry.Offset+int64(entry.Size)), values)
m.mu.RUnlock()

if err != nil {
Expand All @@ -133,31 +109,31 @@ func (m *mmapAccessor) readFloatBlock(entry *IndexEntry, values *[]FloatValue) (
return a, nil
}

func (m *mmapAccessor) readFloatArrayBlock(entry *IndexEntry, values *tsdb.FloatArray) error {
func (m *accessor) readFloatArrayBlock(entry *IndexEntry, values *tsdb.FloatArray) error {
m.incAccess()

m.mu.RLock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
if int64(m.b.length()) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return ErrTSMClosed
}

err := DecodeFloatArrayBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
err := DecodeFloatArrayBlock(m.b.read(entry.Offset+4, entry.Offset+int64(entry.Size)), values)
m.mu.RUnlock()

return err
}

func (m *mmapAccessor) readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error) {
func (m *accessor) readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error) {
m.incAccess()

m.mu.RLock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
if int64(m.b.length()) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return nil, ErrTSMClosed
}

a, err := DecodeIntegerBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
a, err := DecodeIntegerBlock(m.b.read(entry.Offset+4, entry.Offset+int64(entry.Size)), values)
m.mu.RUnlock()

if err != nil {
Expand All @@ -167,31 +143,31 @@ func (m *mmapAccessor) readIntegerBlock(entry *IndexEntry, values *[]IntegerValu
return a, nil
}

func (m *mmapAccessor) readIntegerArrayBlock(entry *IndexEntry, values *tsdb.IntegerArray) error {
func (m *accessor) readIntegerArrayBlock(entry *IndexEntry, values *tsdb.IntegerArray) error {
m.incAccess()

m.mu.RLock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
if int64(m.b.length()) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return ErrTSMClosed
}

err := DecodeIntegerArrayBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
err := DecodeIntegerArrayBlock(m.b.read(entry.Offset+4, entry.Offset+int64(entry.Size)), values)
m.mu.RUnlock()

return err
}

func (m *mmapAccessor) readUnsignedBlock(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error) {
func (m *accessor) readUnsignedBlock(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error) {
m.incAccess()

m.mu.RLock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
if int64(m.b.length()) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return nil, ErrTSMClosed
}

a, err := DecodeUnsignedBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
a, err := DecodeUnsignedBlock(m.b.read(entry.Offset+4, entry.Offset+int64(entry.Size)), values)
m.mu.RUnlock()

if err != nil {
Expand All @@ -201,31 +177,31 @@ func (m *mmapAccessor) readUnsignedBlock(entry *IndexEntry, values *[]UnsignedVa
return a, nil
}

func (m *mmapAccessor) readUnsignedArrayBlock(entry *IndexEntry, values *tsdb.UnsignedArray) error {
func (m *accessor) readUnsignedArrayBlock(entry *IndexEntry, values *tsdb.UnsignedArray) error {
m.incAccess()

m.mu.RLock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
if int64(m.b.length()) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return ErrTSMClosed
}

err := DecodeUnsignedArrayBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
err := DecodeUnsignedArrayBlock(m.b.read(entry.Offset+4, entry.Offset+int64(entry.Size)), values)
m.mu.RUnlock()

return err
}

func (m *mmapAccessor) readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error) {
func (m *accessor) readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error) {
m.incAccess()

m.mu.RLock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
if int64(m.b.length()) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return nil, ErrTSMClosed
}

a, err := DecodeStringBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
a, err := DecodeStringBlock(m.b.read(entry.Offset+4, entry.Offset+int64(entry.Size)), values)
m.mu.RUnlock()

if err != nil {
Expand All @@ -235,31 +211,31 @@ func (m *mmapAccessor) readStringBlock(entry *IndexEntry, values *[]StringValue)
return a, nil
}

func (m *mmapAccessor) readStringArrayBlock(entry *IndexEntry, values *tsdb.StringArray) error {
func (m *accessor) readStringArrayBlock(entry *IndexEntry, values *tsdb.StringArray) error {
m.incAccess()

m.mu.RLock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
if int64(m.b.length()) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return ErrTSMClosed
}

err := DecodeStringArrayBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
err := DecodeStringArrayBlock(m.b.read(entry.Offset+4, entry.Offset+int64(entry.Size)), values)
m.mu.RUnlock()

return err
}

func (m *mmapAccessor) readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error) {
func (m *accessor) readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error) {
m.incAccess()

m.mu.RLock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
if int64(m.b.length()) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return nil, ErrTSMClosed
}

a, err := DecodeBooleanBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
a, err := DecodeBooleanBlock(m.b.read(entry.Offset+4, entry.Offset+int64(entry.Size)), values)
m.mu.RUnlock()

if err != nil {
Expand All @@ -269,16 +245,16 @@ func (m *mmapAccessor) readBooleanBlock(entry *IndexEntry, values *[]BooleanValu
return a, nil
}

func (m *mmapAccessor) readBooleanArrayBlock(entry *IndexEntry, values *tsdb.BooleanArray) error {
func (m *accessor) readBooleanArrayBlock(entry *IndexEntry, values *tsdb.BooleanArray) error {
m.incAccess()

m.mu.RLock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
if int64(m.b.length()) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return ErrTSMClosed
}

err := DecodeBooleanArrayBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
err := DecodeBooleanArrayBlock(m.b.read(entry.Offset+4, entry.Offset+int64(entry.Size)), values)
m.mu.RUnlock()

return err
Expand Down
Loading