Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disk utilization fixes #9225

Merged
merged 4 commits into from
Dec 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ golang.org/x/crypto 9477e0b78b9ac3d0b03822fd95422e2fe07627cd
golang.org/x/net 9dfe39835686865bff950a07b394c12a98ddc811
golang.org/x/sys 062cd7e4e68206d8bab9b18396626e855c992658
golang.org/x/text a71fd10341b064c10f4a81ceac72bcf70f26ea34
golang.org/x/time 6dc17368e09b0e8634d71cac8168d853e869a0c7
2 changes: 1 addition & 1 deletion etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
# snapshot the cache and write it to a TSM file, freeing up memory
# Valid size suffixes are k, m, or g (case insensitive, 1024 = 1k).
# Values without a size suffix are in bytes.
# cache-snapshot-memory-size = "256m"
# cache-snapshot-memory-size = "25m"

# CacheSnapshotWriteColdDuration is the length of time at
# which the engine will snapshot the cache and write it to
Expand Down
34 changes: 34 additions & 0 deletions pkg/limiter/write_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package limiter_test

import (
"bytes"
"io"
"testing"
"time"

"github.com/influxdata/influxdb/pkg/limiter"
)

func TestWriter_Limited(t *testing.T) {
r := bytes.NewReader(bytes.Repeat([]byte{0}, 1024*1024))

limit := 512 * 1024
w := limiter.NewWriter(discardCloser{}, limit, 10*1024*1024)

start := time.Now()
n, err := io.Copy(w, r)
elapsed := time.Since(start)
if err != nil {
t.Error("copy error: ", err)
}

rate := float64(n) / elapsed.Seconds()
if rate > float64(limit) {
t.Errorf("rate limit mismath: exp %f, got %f", float64(limit), rate)
}
}

type discardCloser struct{}

func (d discardCloser) Write(b []byte) (int, error) { return len(b), nil }
func (d discardCloser) Close() error { return nil }
83 changes: 83 additions & 0 deletions pkg/limiter/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package limiter

import (
"context"
"io"
"os"
"time"

"golang.org/x/time/rate"
)

type Writer struct {
w io.WriteCloser
limiter Rate
ctx context.Context
}

type Rate interface {
WaitN(ctx context.Context, n int) error
}

func NewRate(bytesPerSec, burstLimit int) Rate {
limiter := rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit)
limiter.AllowN(time.Now(), burstLimit) // spend initial burst
return limiter
}

// NewWriter returns a writer that implements io.Writer with rate limiting.
// The limiter use a token bucket approach and limits the rate to bytesPerSec
// with a maximum burst of burstLimit.
func NewWriter(w io.WriteCloser, bytesPerSec, burstLimit int) *Writer {
limiter := NewRate(bytesPerSec, burstLimit)

return &Writer{
w: w,
ctx: context.Background(),
limiter: limiter,
}
}

// WithRate returns a Writer with the specified rate limiter.
func NewWriterWithRate(w io.WriteCloser, limiter Rate) *Writer {
return &Writer{
w: w,
ctx: context.Background(),
limiter: limiter,
}
}

// Write writes bytes from p.
func (s *Writer) Write(b []byte) (int, error) {
if s.limiter == nil {
return s.w.Write(b)
}

n, err := s.w.Write(b)
if err != nil {
return n, err
}

if err := s.limiter.WaitN(s.ctx, n); err != nil {
return n, err
}
return n, err
}

func (s *Writer) Sync() error {
if f, ok := s.w.(*os.File); ok {
return f.Sync()
}
return nil
}

func (s *Writer) Name() string {
if f, ok := s.w.(*os.File); ok {
return f.Name()
}
return ""
}

func (s *Writer) Close() error {
return s.w.Close()
}
2 changes: 1 addition & 1 deletion tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (

// DefaultCacheSnapshotMemorySize is the size at which the engine will
// snapshot the cache and write it to a TSM file, freeing up memory
DefaultCacheSnapshotMemorySize = 256 * 1024 * 1024 // 256MB
DefaultCacheSnapshotMemorySize = 25 * 1024 * 1024 // 25MB

// DefaultCacheSnapshotWriteColdDuration is the length of time at which
// the engine will snapshot the cache and write it to a new TSM file if
Expand Down
3 changes: 2 additions & 1 deletion tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ type EngineOptions struct {
ShardID uint64
InmemIndex interface{} // shared in-memory index

CompactionLimiter limiter.Fixed
CompactionLimiter limiter.Fixed
CompactionThroughputLimiter limiter.Rate

Config Config
}
Expand Down
Loading