From 3cb6f0e406579675c8782f0c2c2e6c8acc5bee1d Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Mon, 20 Jul 2020 03:41:00 -0400 Subject: [PATCH] Rate limit reading index files --- go.mod | 2 ++ go.sum | 3 +++ src/dbnode/digest/digest.go | 6 +++++ src/dbnode/persist/fs/index_read.go | 31 +++++++++++++++++++++++- src/dbnode/persist/fs/persist_manager.go | 5 ++-- 5 files changed, 44 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 9baf5f79c2..9986cbf714 100644 --- a/go.mod +++ b/go.mod @@ -115,11 +115,13 @@ require ( go.etcd.io/etcd v3.4.3+incompatible go.uber.org/atomic v1.5.1 go.uber.org/config v1.4.0 + go.uber.org/ratelimit v0.1.0 go.uber.org/zap v1.13.0 golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f // indirect golang.org/x/net v0.0.0-20200301022130-244492dfa37a golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527 + golang.org/x/time v0.0.0-20191024005414-555d28b269f0 golang.org/x/tools v0.0.0-20200601175630-2caf76543d99 // indirect google.golang.org/grpc v1.27.1 gopkg.in/go-ini/ini.v1 v1.57.0 // indirect diff --git a/go.sum b/go.sum index fc3b9cee8e..1844808e8e 100644 --- a/go.sum +++ b/go.sum @@ -792,6 +792,8 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/ go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.4.0 h1:f3WCSC2KzAcBXGATIxAB1E2XuCpNU255wNKZ505qi3E= go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= +go.uber.org/ratelimit v0.1.0 h1:U2AruXqeTb4Eh9sYQSTrMhH8Cb7M0Ian2ibBOnBcnAw= +go.uber.org/ratelimit v0.1.0/go.mod h1:2X8KaoNd1J0lZV+PxJk/5+DGbO/tpwLR1m++a7FnB/Y= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= @@ -914,6 +916,7 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e h1:EHBhcS0mlXEAVwNyO2dLfjToGsyY4j24pTs2ScHnX7s= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181011042414-1f849cf54d09/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/src/dbnode/digest/digest.go b/src/dbnode/digest/digest.go index 653a50103a..b04296b6eb 100644 --- a/src/dbnode/digest/digest.go +++ b/src/dbnode/digest/digest.go @@ -21,6 +21,7 @@ package digest import ( + "hash" "hash/adler32" "github.com/m3db/stackadler32" @@ -32,6 +33,11 @@ func NewDigest() stackadler32.Digest { return stackadler32.NewDigest() } +// NewDigestWriter returns a stateful digest writer. +func NewDigestWriter() hash.Hash32 { + return adler32.New() +} + // Checksum returns the checksum for a buffer. func Checksum(buf []byte) uint32 { return adler32.Checksum(buf) diff --git a/src/dbnode/persist/fs/index_read.go b/src/dbnode/persist/fs/index_read.go index f86c2d9c1c..6a721a3bc9 100644 --- a/src/dbnode/persist/fs/index_read.go +++ b/src/dbnode/persist/fs/index_read.go @@ -34,6 +34,7 @@ import ( idxpersist "github.com/m3db/m3/src/m3ninx/persist" "github.com/m3db/m3/src/x/mmap" + "go.uber.org/ratelimit" "go.uber.org/zap" ) @@ -257,11 +258,39 @@ func (r *indexReader) ReadSegmentFileSet() ( r.logger.Warn("warning while mmapping files in reader", zap.Error(warning)) } + // Create limiter right before file read in case value just changed. + limits := r.opts.RuntimeOptionsManager().Get().PersistRateLimitOptions() + limiter := ratelimit.NewUnlimited() + if limits.LimitEnabled() { + // We copy 1mb at a time, so set the limit to be how + // many per second we can call. + megaBytesPerSecond := int(limits.LimitMbps() / 8.0) + limiter = ratelimit.New(megaBytesPerSecond) + } + + // Use 1mb batch read size to match the rate limit value. + const batchReadSize = 1024 * 1024 + hash := digest.NewDigestWriter() + reader := bytes.NewReader(desc.Bytes) + for { + // Wait for availability. + limiter.Take() + + // Read batch now rate limiter allowed progression. + _, err := io.CopyN(hash, reader, batchReadSize) + if err == io.EOF { + break + } + if err != nil { + return nil, err + } + } + file := newReadableIndexSegmentFileMmap(segFileType, fd, desc) result.files = append(result.files, file) digests.files = append(digests.files, indexReaderReadSegmentFileDigest{ segmentFileType: segFileType, - digest: digest.Checksum(desc.Bytes), + digest: hash.Sum32(), }) // NB(bodu): Free mmaped bytes after we take the checksum so we don't get memory spikes at bootstrap time. diff --git a/src/dbnode/persist/fs/persist_manager.go b/src/dbnode/persist/fs/persist_manager.go index 867f8fea88..c13f1df00d 100644 --- a/src/dbnode/persist/fs/persist_manager.go +++ b/src/dbnode/persist/fs/persist_manager.go @@ -44,7 +44,8 @@ import ( ) const ( - bytesPerMegabit = 1024 * 1024 / 8 + // BytesPerMegabit is the number of bytes per megabit. + BytesPerMegabit = 1024 * 1024 / 8 ) type persistManagerStatus int @@ -513,7 +514,7 @@ func (pm *persistManager) persist( if pm.start.IsZero() { pm.start = start } else if pm.count >= opts.LimitCheckEvery() { - target := time.Duration(float64(time.Second) * float64(pm.bytesWritten) / (rateLimitMbps * bytesPerMegabit)) + target := time.Duration(float64(time.Second) * float64(pm.bytesWritten) / (rateLimitMbps * BytesPerMegabit)) if elapsed := start.Sub(pm.start); elapsed < target { pm.sleepFn(target - elapsed) // Recapture start for precise timing, might take some time to "wakeup"