Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv/tscache: dynamically size intervalSkl pages #49422

Merged
merged 2 commits into from
May 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 1 addition & 5 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ func TestStoreConfig(clock *hlc.Clock) StoreConfig {
CoalescedHeartbeatsInterval: 50 * time.Millisecond,
RaftHeartbeatIntervalTicks: 1,
ScanInterval: 10 * time.Minute,
TimestampCachePageSize: tscache.TestSklPageSize,
HistogramWindowInterval: metric.TestSampleInterval,
EnableEpochRangeLeases: true,
ClosedTimestamp: container.NoopContainer(),
Expand Down Expand Up @@ -701,9 +700,6 @@ type StoreConfig struct {
// to be applied concurrently.
concurrentSnapshotApplyLimit int

// TimestampCachePageSize is (server.Config).TimestampCachePageSize
TimestampCachePageSize uint32

// HistogramWindowInterval is (server.Config).HistogramWindowInterval
HistogramWindowInterval time.Duration

Expand Down Expand Up @@ -835,7 +831,7 @@ func NewStore(
s.rangefeedReplicas.m = map[roachpb.RangeID]struct{}{}
s.rangefeedReplicas.Unlock()

s.tsCache = tscache.New(cfg.Clock, cfg.TimestampCachePageSize)
s.tsCache = tscache.New(cfg.Clock)
s.metrics.registry.AddMetricStruct(s.tsCache.Metrics())

s.txnWaitMetrics = txnwait.NewMetrics(cfg.HistogramWindowInterval)
Expand Down
7 changes: 3 additions & 4 deletions pkg/kv/kvserver/tscache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,12 @@ type Cache interface {
getLowWater() hlc.Timestamp
}

// New returns a new timestamp cache with the supplied hybrid clock. If the
// pageSize is provided, it will override the default page size.
func New(clock *hlc.Clock, pageSize uint32) Cache {
// New returns a new timestamp cache with the supplied hybrid-logical clock.
func New(clock *hlc.Clock) Cache {
if envutil.EnvOrDefaultBool("COCKROACH_USE_TREE_TSCACHE", false) {
return newTreeImpl(clock)
}
return newSklImpl(clock, pageSize)
return newSklImpl(clock)
}

// cacheValue combines a timestamp with an optional txnID. It is shared between
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/tscache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

var cacheImplConstrs = []func(clock *hlc.Clock) Cache{
func(clock *hlc.Clock) Cache { return newTreeImpl(clock) },
func(clock *hlc.Clock) Cache { return newSklImpl(clock, TestSklPageSize) },
func(clock *hlc.Clock) Cache { return newSklImpl(clock) },
}

func forEachCacheImpl(
Expand Down Expand Up @@ -431,7 +431,7 @@ func TestTimestampCacheLargeKeys(t *testing.T) {
defer leaktest.AfterTest(t)()

forEachCacheImpl(t, func(t *testing.T, tc Cache, clock *hlc.Clock, manual *hlc.ManualClock) {
keyStart := roachpb.Key(make([]byte, 5*TestSklPageSize))
keyStart := roachpb.Key(make([]byte, 5*maximumSklPageSize))
keyEnd := keyStart.Next()
ts1 := clock.Now()
txn1 := uuid.MakeV4()
Expand Down Expand Up @@ -653,7 +653,7 @@ func identicalAndRatcheted(
func BenchmarkTimestampCacheInsertion(b *testing.B) {
manual := hlc.NewManualClock(123)
clock := hlc.NewClock(manual.UnixNano, time.Nanosecond)
tc := New(clock, 0)
tc := New(clock)

for i := 0; i < b.N; i++ {
cdTS := clock.Now()
Expand Down
71 changes: 55 additions & 16 deletions pkg/kv/kvserver/tscache/interval_skl.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,21 @@ const (
)

const (
encodedTsSize = int(unsafe.Sizeof(int64(0)) + unsafe.Sizeof(int32(0)))
encodedTxnIDSize = int(unsafe.Sizeof(uuid.UUID{}))
encodedValSize = encodedTsSize + encodedTxnIDSize
encodedTsSize = int(unsafe.Sizeof(int64(0)) + unsafe.Sizeof(int32(0)))
encodedTxnIDSize = int(unsafe.Sizeof(uuid.UUID{}))
encodedValSize = encodedTsSize + encodedTxnIDSize

// initialSklPageSize is the initial size of each page in the sklImpl's
// intervalSkl. The pages start small to limit the memory footprint of
// the data structure for short-lived tests. Reducing this size can hurt
// performance but it decreases the risk of OOM failures when many tests
// are running concurrently.
initialSklPageSize = 128 << 10 // 128 KB
// maximumSklPageSize is the maximum size of each page in the sklImpl's
// intervalSkl. A long-running server is expected to settle on pages of
// this size under steady-state load.
maximumSklPageSize = 32 << 20 // 32 MB

defaultMinSklPages = 2
)

Expand Down Expand Up @@ -147,12 +159,17 @@ type intervalSkl struct {
clock *hlc.Clock
minRet time.Duration

// The size of each page in the data structure, in bytes. When a page fills,
// the pages will be rotated and older entries will be discarded. The entire
// data structure will usually have a size limit of pageSize*minPages.
// However, this limit can be violated if the intervalSkl needs to grow
// larger to enforce a minimum retention policy.
pageSize uint32
// The size of the last allocated page in the data structure, in bytes. When
// a page fills, a new page will be allocate, the pages will be rotated, and
// older entries will be discarded. Page sizes grow exponentially as pages
// are allocated up to a maximum of maximumSklPageSize. The value will never
// regress over the lifetime of an intervalSkl instance.
//
// The entire data structure is typically bound to a maximum a size of
// maximumSklPageSize*minPages. However, this limit can be violated if the
// intervalSkl needs to grow larger to enforce a minimum retention policy.
pageSize uint32
pageSizeFixed bool // testing only

// The linked list maintains fixed-size skiplist pages, ordered by creation
// time such that the first page is the one most recently created. When the
Expand All @@ -177,13 +194,11 @@ type intervalSkl struct {

// newIntervalSkl creates a new interval skiplist with the given minimum
// retention duration and the maximum size.
func newIntervalSkl(
clock *hlc.Clock, minRet time.Duration, pageSize uint32, metrics sklMetrics,
) *intervalSkl {
func newIntervalSkl(clock *hlc.Clock, minRet time.Duration, metrics sklMetrics) *intervalSkl {
s := intervalSkl{
clock: clock,
minRet: minRet,
pageSize: pageSize,
pageSize: initialSklPageSize / 2, // doubled in pushNewPage
minPages: defaultMinSklPages,
metrics: metrics,
}
Expand Down Expand Up @@ -222,7 +237,7 @@ func (s *intervalSkl) AddRange(from, to []byte, opt rangeOptions, val cacheValue
if from == nil && to == nil {
panic("from and to keys cannot be nil")
}
if encodedRangeSize(from, to, opt) > int(s.pageSize)-initialSklAllocSize {
if encodedRangeSize(from, to, opt) > int(s.maximumPageSize())-initialSklAllocSize {
// Without this check, we could fall into an infinite page rotation loop
// if a range would take up more space than available in an empty page.
panic("key range too large to fit in any page")
Expand Down Expand Up @@ -371,17 +386,41 @@ func (s *intervalSkl) frontPage() *sklPage {
// pushNewPage prepends a new empty page to the front of the pages list. It
// accepts an optional arena argument to facilitate re-use.
func (s *intervalSkl) pushNewPage(maxWallTime int64, arena *arenaskl.Arena) {
if arena != nil {
size := s.nextPageSize()
if arena != nil && arena.Cap() == size {
// Re-use the provided arena, if possible.
arena.Reset()
} else {
arena = arenaskl.NewArena(s.pageSize)
// Otherwise, construct new memory arena.
arena = arenaskl.NewArena(size)
}
p := newSklPage(arena)
p.maxWallTime = maxWallTime
s.pages.PushFront(p)
}

// nextPageSize returns the size that the next allocated page should use.
func (s *intervalSkl) nextPageSize() uint32 {
if s.pageSizeFixed || s.pageSize == maximumSklPageSize {
return s.pageSize
}
s.pageSize *= 2
if s.pageSize > maximumSklPageSize {
s.pageSize = maximumSklPageSize
}
return s.pageSize
}

// maximumPageSize returns the maximum page size that this instance of the
// intervalSkl will be able to accommodate. The method takes into consideration
// whether the page size is fixed or dynamic.
func (s *intervalSkl) maximumPageSize() uint32 {
if s.pageSizeFixed {
return s.pageSize
}
return maximumSklPageSize
}

// rotatePages makes the later page the earlier page, and then discards the
// earlier page. The max timestamp of the earlier page becomes the new floor
// timestamp, in order to guarantee that timestamp lookups never return decreasing
Expand Down
Loading