From 4aeb9ee854e79d5dcc00442f6673463921a03731 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 22 May 2020 18:58:15 -0400 Subject: [PATCH 1/2] vendor: bump arenaskl to 4b42aa066e46a6900c75bf45abd9f0dbfb704c46 --- Gopkg.lock | 4 ++-- vendor | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index a1a1d5bb4ce1..3e5a4591a68f 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -188,11 +188,11 @@ [[projects]] branch = "master" - digest = "1:9270dd50e9aafb05285e767cf2ecc395f74c1682582d3b2e8dbaf0f472d978fc" + digest = "1:82298f2aa48be0c275272f8cada5e78e2100cefd032bf13ef84a9d26c1a80d92" name = "github.com/andy-kimball/arenaskl" packages = ["."] pruneopts = "UT" - revision = "7f79c0f6e4fa77a7f8a296b4d937ea4708e8ccc7" + revision = "4b42aa066e46a6900c75bf45abd9f0dbfb704c46" [[projects]] digest = "1:66b3310cf22cdc96c35ef84ede4f7b9b370971c4025f394c89a2638729653b11" diff --git a/vendor b/vendor index ee4b8b547ebc..4cda30a6d7fa 160000 --- a/vendor +++ b/vendor @@ -1 +1 @@ -Subproject commit ee4b8b547ebccd3819566522e87f069ad82c0bc0 +Subproject commit 4cda30a6d7fa4a28c3d5bf494742210caf1b36d7 From 188688511dc8595f8926c8fced7133a822bbeb7b Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 21 May 2020 19:05:19 -0400 Subject: [PATCH 2/2] kv/tscache: dynamically size intervalSkl pages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses a suggestion from Peter: https://github.com/cockroachdb/cockroach/pull/48058#issuecomment-631707053. This change updates `intervalSkl` to dynamically grow the size of its pages as pages are rotated. This allows the structure to start off small (128 KB per page) and grow exponentially to a maximum size (32 MB per page) as it is used. The pages start small to limit the memory footprint of the data structure for short-lived tests but will settle upon the maximum page size under steady-state on a long-lived process. This does not appear to have an impact on benchmarks: ``` ➜ benchdiff --run='BenchmarkJoinReader/reqOrdering=false/matchratio=onetothirtytwo/lookuprows=16384' ./pkg/sql/rowexec checking out 'f575fa8' building benchmark binaries for 'f575fa8' 1/1 - checking out '3d46054' building benchmark binaries for '3d46054' 1/1 / pkg=1/1 iter=10/10 cockroachdb/cockroach/pkg/sql/rowexec | name old time/op new time/op delta JoinReader/reqOrdering=false/matchratio=onetothirtytwo/lookuprows=16384-16 1.34s ± 2% 1.34s ± 4% ~ (p=1.000 n=9+10) name old speed new speed delta JoinReader/reqOrdering=false/matchratio=onetothirtytwo/lookuprows=16384-16 3.23MB/s ± 2% 3.23MB/s ± 3% ~ (p=0.953 n=9+10) name old alloc/op new alloc/op delta JoinReader/reqOrdering=false/matchratio=onetothirtytwo/lookuprows=16384-16 72.1MB ± 0% 73.9MB ± 0% +2.44% (p=0.000 n=10+10) name old allocs/op new allocs/op delta JoinReader/reqOrdering=false/matchratio=onetothirtytwo/lookuprows=16384-16 556k ± 0% 556k ± 0% ~ (p=0.781 n=10+10) ``` --- pkg/kv/kvserver/store.go | 6 +- pkg/kv/kvserver/tscache/cache.go | 7 +- pkg/kv/kvserver/tscache/cache_test.go | 6 +- pkg/kv/kvserver/tscache/interval_skl.go | 71 ++++++-- pkg/kv/kvserver/tscache/interval_skl_test.go | 169 ++++++++++++++---- pkg/kv/kvserver/tscache/skl_impl.go | 40 ++--- pkg/server/config.go | 4 - pkg/server/server.go | 1 - pkg/server/testserver.go | 2 - .../localtestcluster/local_test_cluster.go | 2 - 10 files changed, 210 insertions(+), 98 deletions(-) diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 406735337844..87242d1b1bd1 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -182,7 +182,6 @@ func TestStoreConfig(clock *hlc.Clock) StoreConfig { CoalescedHeartbeatsInterval: 50 * time.Millisecond, RaftHeartbeatIntervalTicks: 1, ScanInterval: 10 * time.Minute, - TimestampCachePageSize: tscache.TestSklPageSize, HistogramWindowInterval: metric.TestSampleInterval, EnableEpochRangeLeases: true, ClosedTimestamp: container.NoopContainer(), @@ -701,9 +700,6 @@ type StoreConfig struct { // to be applied concurrently. concurrentSnapshotApplyLimit int - // TimestampCachePageSize is (server.Config).TimestampCachePageSize - TimestampCachePageSize uint32 - // HistogramWindowInterval is (server.Config).HistogramWindowInterval HistogramWindowInterval time.Duration @@ -835,7 +831,7 @@ func NewStore( s.rangefeedReplicas.m = map[roachpb.RangeID]struct{}{} s.rangefeedReplicas.Unlock() - s.tsCache = tscache.New(cfg.Clock, cfg.TimestampCachePageSize) + s.tsCache = tscache.New(cfg.Clock) s.metrics.registry.AddMetricStruct(s.tsCache.Metrics()) s.txnWaitMetrics = txnwait.NewMetrics(cfg.HistogramWindowInterval) diff --git a/pkg/kv/kvserver/tscache/cache.go b/pkg/kv/kvserver/tscache/cache.go index da7e3be59105..f2872f614a6c 100644 --- a/pkg/kv/kvserver/tscache/cache.go +++ b/pkg/kv/kvserver/tscache/cache.go @@ -79,13 +79,12 @@ type Cache interface { getLowWater() hlc.Timestamp } -// New returns a new timestamp cache with the supplied hybrid clock. If the -// pageSize is provided, it will override the default page size. -func New(clock *hlc.Clock, pageSize uint32) Cache { +// New returns a new timestamp cache with the supplied hybrid-logical clock. +func New(clock *hlc.Clock) Cache { if envutil.EnvOrDefaultBool("COCKROACH_USE_TREE_TSCACHE", false) { return newTreeImpl(clock) } - return newSklImpl(clock, pageSize) + return newSklImpl(clock) } // cacheValue combines a timestamp with an optional txnID. It is shared between diff --git a/pkg/kv/kvserver/tscache/cache_test.go b/pkg/kv/kvserver/tscache/cache_test.go index 195082490883..2d7a163b1c87 100644 --- a/pkg/kv/kvserver/tscache/cache_test.go +++ b/pkg/kv/kvserver/tscache/cache_test.go @@ -33,7 +33,7 @@ import ( var cacheImplConstrs = []func(clock *hlc.Clock) Cache{ func(clock *hlc.Clock) Cache { return newTreeImpl(clock) }, - func(clock *hlc.Clock) Cache { return newSklImpl(clock, TestSklPageSize) }, + func(clock *hlc.Clock) Cache { return newSklImpl(clock) }, } func forEachCacheImpl( @@ -431,7 +431,7 @@ func TestTimestampCacheLargeKeys(t *testing.T) { defer leaktest.AfterTest(t)() forEachCacheImpl(t, func(t *testing.T, tc Cache, clock *hlc.Clock, manual *hlc.ManualClock) { - keyStart := roachpb.Key(make([]byte, 5*TestSklPageSize)) + keyStart := roachpb.Key(make([]byte, 5*maximumSklPageSize)) keyEnd := keyStart.Next() ts1 := clock.Now() txn1 := uuid.MakeV4() @@ -653,7 +653,7 @@ func identicalAndRatcheted( func BenchmarkTimestampCacheInsertion(b *testing.B) { manual := hlc.NewManualClock(123) clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) - tc := New(clock, 0) + tc := New(clock) for i := 0; i < b.N; i++ { cdTS := clock.Now() diff --git a/pkg/kv/kvserver/tscache/interval_skl.go b/pkg/kv/kvserver/tscache/interval_skl.go index 42cdbb940c02..d4fd166c4534 100644 --- a/pkg/kv/kvserver/tscache/interval_skl.go +++ b/pkg/kv/kvserver/tscache/interval_skl.go @@ -90,9 +90,21 @@ const ( ) const ( - encodedTsSize = int(unsafe.Sizeof(int64(0)) + unsafe.Sizeof(int32(0))) - encodedTxnIDSize = int(unsafe.Sizeof(uuid.UUID{})) - encodedValSize = encodedTsSize + encodedTxnIDSize + encodedTsSize = int(unsafe.Sizeof(int64(0)) + unsafe.Sizeof(int32(0))) + encodedTxnIDSize = int(unsafe.Sizeof(uuid.UUID{})) + encodedValSize = encodedTsSize + encodedTxnIDSize + + // initialSklPageSize is the initial size of each page in the sklImpl's + // intervalSkl. The pages start small to limit the memory footprint of + // the data structure for short-lived tests. Reducing this size can hurt + // performance but it decreases the risk of OOM failures when many tests + // are running concurrently. + initialSklPageSize = 128 << 10 // 128 KB + // maximumSklPageSize is the maximum size of each page in the sklImpl's + // intervalSkl. A long-running server is expected to settle on pages of + // this size under steady-state load. + maximumSklPageSize = 32 << 20 // 32 MB + defaultMinSklPages = 2 ) @@ -147,12 +159,17 @@ type intervalSkl struct { clock *hlc.Clock minRet time.Duration - // The size of each page in the data structure, in bytes. When a page fills, - // the pages will be rotated and older entries will be discarded. The entire - // data structure will usually have a size limit of pageSize*minPages. - // However, this limit can be violated if the intervalSkl needs to grow - // larger to enforce a minimum retention policy. - pageSize uint32 + // The size of the last allocated page in the data structure, in bytes. When + // a page fills, a new page will be allocate, the pages will be rotated, and + // older entries will be discarded. Page sizes grow exponentially as pages + // are allocated up to a maximum of maximumSklPageSize. The value will never + // regress over the lifetime of an intervalSkl instance. + // + // The entire data structure is typically bound to a maximum a size of + // maximumSklPageSize*minPages. However, this limit can be violated if the + // intervalSkl needs to grow larger to enforce a minimum retention policy. + pageSize uint32 + pageSizeFixed bool // testing only // The linked list maintains fixed-size skiplist pages, ordered by creation // time such that the first page is the one most recently created. When the @@ -177,13 +194,11 @@ type intervalSkl struct { // newIntervalSkl creates a new interval skiplist with the given minimum // retention duration and the maximum size. -func newIntervalSkl( - clock *hlc.Clock, minRet time.Duration, pageSize uint32, metrics sklMetrics, -) *intervalSkl { +func newIntervalSkl(clock *hlc.Clock, minRet time.Duration, metrics sklMetrics) *intervalSkl { s := intervalSkl{ clock: clock, minRet: minRet, - pageSize: pageSize, + pageSize: initialSklPageSize / 2, // doubled in pushNewPage minPages: defaultMinSklPages, metrics: metrics, } @@ -222,7 +237,7 @@ func (s *intervalSkl) AddRange(from, to []byte, opt rangeOptions, val cacheValue if from == nil && to == nil { panic("from and to keys cannot be nil") } - if encodedRangeSize(from, to, opt) > int(s.pageSize)-initialSklAllocSize { + if encodedRangeSize(from, to, opt) > int(s.maximumPageSize())-initialSklAllocSize { // Without this check, we could fall into an infinite page rotation loop // if a range would take up more space than available in an empty page. panic("key range too large to fit in any page") @@ -371,17 +386,41 @@ func (s *intervalSkl) frontPage() *sklPage { // pushNewPage prepends a new empty page to the front of the pages list. It // accepts an optional arena argument to facilitate re-use. func (s *intervalSkl) pushNewPage(maxWallTime int64, arena *arenaskl.Arena) { - if arena != nil { + size := s.nextPageSize() + if arena != nil && arena.Cap() == size { // Re-use the provided arena, if possible. arena.Reset() } else { - arena = arenaskl.NewArena(s.pageSize) + // Otherwise, construct new memory arena. + arena = arenaskl.NewArena(size) } p := newSklPage(arena) p.maxWallTime = maxWallTime s.pages.PushFront(p) } +// nextPageSize returns the size that the next allocated page should use. +func (s *intervalSkl) nextPageSize() uint32 { + if s.pageSizeFixed || s.pageSize == maximumSklPageSize { + return s.pageSize + } + s.pageSize *= 2 + if s.pageSize > maximumSklPageSize { + s.pageSize = maximumSklPageSize + } + return s.pageSize +} + +// maximumPageSize returns the maximum page size that this instance of the +// intervalSkl will be able to accommodate. The method takes into consideration +// whether the page size is fixed or dynamic. +func (s *intervalSkl) maximumPageSize() uint32 { + if s.pageSizeFixed { + return s.pageSize + } + return maximumSklPageSize +} + // rotatePages makes the later page the earlier page, and then discards the // earlier page. The max timestamp of the earlier page becomes the new floor // timestamp, in order to guarantee that timestamp lookups never return decreasing diff --git a/pkg/kv/kvserver/tscache/interval_skl_test.go b/pkg/kv/kvserver/tscache/interval_skl_test.go index de553503f088..7598e8644201 100644 --- a/pkg/kv/kvserver/tscache/interval_skl_test.go +++ b/pkg/kv/kvserver/tscache/interval_skl_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/andy-kimball/arenaskl" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -63,6 +64,14 @@ func makeSklMetrics() sklMetrics { return makeMetrics().Skl } +// setFixedPageSize sets the pageSize of the intervalSkl to a fixed value. +func (s *intervalSkl) setFixedPageSize(pageSize uint32) { + s.pageSize = pageSize + s.pageSizeFixed = true + s.pages.Init() // clear + s.pushNewPage(0 /* maxWallTime */, nil /* arena */) +} + // setMinPages sets the minimum number of pages intervalSkl will evict down to. // This is only exposed as a testing method because there's no reason to use // this outside of testing. @@ -78,7 +87,7 @@ func TestIntervalSklAdd(t *testing.T) { val1 := makeVal(ts1, "1") val2 := makeVal(ts2, "2") - s := newIntervalSkl(nil /* clock */, 0 /* minRet */, TestSklPageSize, makeSklMetrics()) + s := newIntervalSkl(nil /* clock */, 0 /* minRet */, makeSklMetrics()) s.Add([]byte("apricot"), val1) require.Equal(t, ts1.WallTime, s.frontPage().maxWallTime) @@ -100,7 +109,7 @@ func TestIntervalSklSingleRange(t *testing.T) { val3 := makeVal(makeTS(300, 50), "3") val4 := makeVal(makeTS(400, 50), "4") - s := newIntervalSkl(nil /* clock */, 0 /* minRet */, TestSklPageSize, makeSklMetrics()) + s := newIntervalSkl(nil /* clock */, 0 /* minRet */, makeSklMetrics()) // val1: [a--------------o] s.AddRange([]byte("apricot"), []byte("orange"), 0, val1) @@ -190,7 +199,7 @@ func TestIntervalSklKeyBoundaries(t *testing.T) { val4 := makeVal(makeTS(400, 0), "4") val5 := makeVal(makeTS(500, 0), "5") - s := newIntervalSkl(nil /* clock */, 0 /* minRet */, TestSklPageSize, makeSklMetrics()) + s := newIntervalSkl(nil /* clock */, 0 /* minRet */, makeSklMetrics()) s.floorTS = floorTS // Can't insert a key at infinity. @@ -264,7 +273,7 @@ func TestIntervalSklSupersetRange(t *testing.T) { val5 := makeVal(makeTS(500, 0), "5") val6 := makeVal(makeTS(600, 0), "6") - s := newIntervalSkl(nil /* clock */, 0 /* minRet */, TestSklPageSize, makeSklMetrics()) + s := newIntervalSkl(nil /* clock */, 0 /* minRet */, makeSklMetrics()) s.floorTS = floorTS // Same range. @@ -359,7 +368,7 @@ func TestIntervalSklContiguousRanges(t *testing.T) { val2 := makeVal(ts1, "2") val2WithoutID := makeValWithoutID(ts1) - s := newIntervalSkl(nil /* clock */, 0 /* minRet */, TestSklPageSize, makeSklMetrics()) + s := newIntervalSkl(nil /* clock */, 0 /* minRet */, makeSklMetrics()) s.floorTS = floorTS // val1: [b---------k) @@ -393,7 +402,7 @@ func TestIntervalSklOverlappingRanges(t *testing.T) { val3 := makeVal(makeTS(300, 0), "3") val4 := makeVal(makeTS(400, 0), "4") - s := newIntervalSkl(nil /* clock */, 0 /* minRet */, TestSklPageSize, makeSklMetrics()) + s := newIntervalSkl(nil /* clock */, 0 /* minRet */, makeSklMetrics()) s.floorTS = floorTS // val1: [b---------k] @@ -437,7 +446,7 @@ func TestIntervalSklOverlappingRanges(t *testing.T) { func TestIntervalSklSingleKeyRanges(t *testing.T) { val1 := makeVal(makeTS(100, 100), "1") - s := newIntervalSkl(nil /* clock */, 0 /* minRet */, TestSklPageSize, makeSklMetrics()) + s := newIntervalSkl(nil /* clock */, 0 /* minRet */, makeSklMetrics()) // Don't allow inverted ranges. require.Panics(t, func() { s.AddRange([]byte("kiwi"), []byte("apple"), 0, val1) }) @@ -493,7 +502,7 @@ func TestIntervalSklRatchetTxnIDs(t *testing.T) { val6 := makeVal(ts3, "5") val6WithoutID := makeValWithoutID(ts3) - s := newIntervalSkl(nil /* clock */, 0 /* minRet */, TestSklPageSize, makeSklMetrics()) + s := newIntervalSkl(nil /* clock */, 0 /* minRet */, makeSklMetrics()) s.AddRange([]byte("apricot"), []byte("raspberry"), 0, val1) require.Equal(t, emptyVal, s.LookupTimestamp([]byte("apple"))) @@ -569,7 +578,7 @@ func TestIntervalSklLookupRange(t *testing.T) { val4 := makeVal(ts3, "4") val5 := makeVal(ts4, "5") - s := newIntervalSkl(nil /* clock */, 0 /* minRet */, TestSklPageSize, makeSklMetrics()) + s := newIntervalSkl(nil /* clock */, 0 /* minRet */, makeSklMetrics()) // Perform range lookups over a single key. s.Add([]byte("apricot"), val1) @@ -644,7 +653,7 @@ func TestIntervalSklLookupRangeSingleKeyRanges(t *testing.T) { // Perform range lookups over [key, key.Next()) ranges. t.Run("[key, key.Next())", func(t *testing.T) { - s := newIntervalSkl(nil /* clock */, 0 /* minRet */, TestSklPageSize, makeSklMetrics()) + s := newIntervalSkl(nil /* clock */, 0 /* minRet */, makeSklMetrics()) s.AddRange(key1, key2, excludeTo, val1) s.AddRange(key2, key3, excludeTo, val2) @@ -690,7 +699,7 @@ func TestIntervalSklLookupRangeSingleKeyRanges(t *testing.T) { // Perform the same lookups, but this time use single key ranges. t.Run("[key, key]", func(t *testing.T) { - s := newIntervalSkl(nil /* clock */, 0 /* minRet */, TestSklPageSize, makeSklMetrics()) + s := newIntervalSkl(nil /* clock */, 0 /* minRet */, makeSklMetrics()) s.AddRange(key1, key1, 0, val1) // same as Add(key1, val1) s.AddRange(key2, key2, 0, val2) // ... Add(key2, val2) @@ -737,7 +746,7 @@ func TestIntervalSklLookupEqualsEarlierMaxWallTime(t *testing.T) { txnID2 := "2" testutils.RunTrueAndFalse(t, "tsWithLogicalPart", func(t *testing.T, logicalPart bool) { - s := newIntervalSkl(nil /* clock */, 0 /* minRet */, TestSklPageSize, makeSklMetrics()) + s := newIntervalSkl(nil /* clock */, 0 /* minRet */, makeSklMetrics()) s.floorTS = floorTS // Insert an initial value into intervalSkl. @@ -797,7 +806,8 @@ func TestIntervalSklFill(t *testing.T) { const n = 200 const txnID = "123" - s := newIntervalSkl(nil /* clock */, 0 /* minRet */, 1500, makeSklMetrics()) + s := newIntervalSkl(nil /* clock */, 0 /* minRet */, makeSklMetrics()) + s.setFixedPageSize(1500) for i := 0; i < n; i++ { key := []byte(fmt.Sprintf("%05d", i)) @@ -826,9 +836,10 @@ func TestIntervalSklFill2(t *testing.T) { const txnID = "123" // n >> 1000 so the intervalSkl's pages will be filled. - s := newIntervalSkl(nil /* clock */, 0 /* minRet */, 1000, makeSklMetrics()) - key := []byte("some key") + s := newIntervalSkl(nil /* clock */, 0 /* minRet */, makeSklMetrics()) + s.setFixedPageSize(1000) + key := []byte("some key") for i := 0; i < n; i++ { val := makeVal(makeTS(int64(i), int32(i)), txnID) s.Add(key, val) @@ -844,7 +855,8 @@ func TestIntervalSklMinRetentionWindow(t *testing.T) { clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) const minRet = 500 - s := newIntervalSkl(clock, minRet, 1500, makeSklMetrics()) + s := newIntervalSkl(clock, minRet, makeSklMetrics()) + s.setFixedPageSize(1500) s.floorTS = floorTS // Add an initial value. Rotate the page so it's alone. @@ -906,7 +918,7 @@ func TestIntervalSklConcurrency(t *testing.T) { {name: "Pages", pageSize: 4096, minPages: 16}, // Test concurrency with a larger page size in order to test slot // concurrency without the added complication of page rotations. - {name: "Slots", pageSize: TestSklPageSize}, + {name: "Slots", pageSize: initialSklPageSize}, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -916,7 +928,8 @@ func TestIntervalSklConcurrency(t *testing.T) { // testing timestamp collisions. testutils.RunTrueAndFalse(t, "useClock", func(t *testing.T, useClock bool) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) - s := newIntervalSkl(clock, 0 /* minRet */, tc.pageSize, makeSklMetrics()) + s := newIntervalSkl(clock, 0 /* minRet */, makeSklMetrics()) + s.setFixedPageSize(tc.pageSize) if tc.minPages != 0 { s.setMinPages(tc.minPages) } @@ -1016,8 +1029,10 @@ func TestIntervalSklConcurrentVsSequential(t *testing.T) { const smallPageSize = 32 * 1024 // 32 KB const retainForever = math.MaxInt64 - sequentialS := newIntervalSkl(clock, retainForever, smallPageSize, makeSklMetrics()) - concurrentS := newIntervalSkl(clock, retainForever, smallPageSize, makeSklMetrics()) + sequentialS := newIntervalSkl(clock, retainForever, makeSklMetrics()) + sequentialS.setFixedPageSize(smallPageSize) + concurrentS := newIntervalSkl(clock, retainForever, makeSklMetrics()) + concurrentS.setFixedPageSize(smallPageSize) // We run a goroutine for each slot. Goroutines insert new value // over random intervals, but verify that the value in their @@ -1144,30 +1159,116 @@ func assertRatchet(t *testing.T, before, after cacheValue) { // rotation loop for ranges that are too large to fit in a single page. Instead, // we detect this scenario early and panic. func TestIntervalSklMaxEncodedSize(t *testing.T) { - ts := makeTS(200, 0) + manual := hlc.NewManualClock(200) + clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) + + ts := clock.Now() val := makeVal(ts, "1") - key := make([]byte, 65) - encSize := encodedRangeSize(key, nil, 0) + testutils.RunTrueAndFalse(t, "fit", func(t *testing.T, fit bool) { + testutils.RunTrueAndFalse(t, "fixed", func(t *testing.T, fixed bool) { + var key []byte + var encSize int + if fixed { + // Create an arbitrarily sized key. We'll set the pageSize to + // either exactly accommodate this or to be one byte too small. + key = make([]byte, 65) + encSize = encodedRangeSize(key, nil, 0) + } else { + // Create either the largest possible key that will fit in the + // maximumSklPageSize or a key one byte larger than this. This + // test forces the intervalSkl to quickly grow its page size + // until it is large enough to accommodate the key. + encSize = maximumSklPageSize - initialSklAllocSize + encOverhead := encodedRangeSize(nil, nil, 0) + keySize := encSize - encOverhead + if !fit { + keySize++ + } + key = make([]byte, keySize) + if fit { + require.Equal(t, encSize, encodedRangeSize(key, nil, 0)) + } else { + require.Equal(t, encSize+1, encodedRangeSize(key, nil, 0)) + } + } - t.Run("fit", func(t *testing.T) { - size := uint32(initialSklAllocSize + encSize) - s := newIntervalSkl(nil /* clock */, 0 /* minRet */, size, makeSklMetrics()) - require.NotPanics(t, func() { s.Add(key, val) }) - }) - t.Run("!fit", func(t *testing.T) { - size := uint32(initialSklAllocSize + encSize - 1) - s := newIntervalSkl(nil /* clock */, 0 /* minRet */, size, makeSklMetrics()) - require.Panics(t, func() { s.Add(key, val) }) + s := newIntervalSkl(clock, 1, makeSklMetrics()) + if fixed { + fixedSize := uint32(initialSklAllocSize + encSize) + if !fit { + fixedSize-- + } + s.setFixedPageSize(fixedSize) + } + initPageSize := s.pageSize + + if fit { + require.NotPanics(t, func() { s.Add(key, val) }) + } else { + require.Panics(t, func() { s.Add(key, val) }) + } + + if fit && !fixed { + // Page size should have grown to maximum. + require.Equal(t, uint32(maximumSklPageSize), s.pageSize) + } else { + // Page size should not have grown. + require.Equal(t, initPageSize, s.pageSize) + } + }) }) } +// TestArenaReuse tests that arenas are re-used when possible during page +// rotations. Skiplist memory arenas are only re-used when they have the same +// capacity as the new page. +func TestArenaReuse(t *testing.T) { + s := newIntervalSkl(nil /* clock */, 0 /* minRet */, makeSklMetrics()) + + // Track the unique arenas that we observe in use. + arenas := make(map[*arenaskl.Arena]struct{}) + const iters = 256 + for i := 0; i < iters; i++ { + for e := s.pages.Front(); e != nil; e = e.Next() { + p := e.Value.(*sklPage) + arenas[p.list.Arena()] = struct{}{} + } + s.rotatePages(s.frontPage()) + } + + // We expect to see a single arena with each of the allocation sizes between + // initialSklPageSize and maximumSklPageSize. We then expect to see repeated + // pages with the same size once we hit maximumSklPageSize. Only then do we + // expect to see arena re-use. + // + // Example: + // initSize = 4 + // maxSize = 32 + // minPages = 2 + // + // arena sizes: + // 4 (A1) + // 8 (A2) + // 16 (A3) + // 32 (A4) + // 32 (A5) + // 32 (A4) + // 32 (A5) + // ... + // + intermediatePages := int(math.Log2(maximumSklPageSize) - math.Log2(initialSklPageSize)) + expArenas := defaultMinSklPages + intermediatePages + require.Less(t, expArenas, iters) + require.Equal(t, expArenas, len(arenas)) +} + func BenchmarkIntervalSklAdd(b *testing.B) { const max = 500000000 // max size of range const txnID = "123" clock := hlc.NewClock(hlc.UnixNano, time.Millisecond) - s := newIntervalSkl(clock, MinRetentionWindow, defaultSklPageSize, makeSklMetrics()) + s := newIntervalSkl(clock, MinRetentionWindow, makeSklMetrics()) rng := rand.New(rand.NewSource(timeutil.Now().UnixNano())) size := 1 @@ -1192,7 +1293,7 @@ func BenchmarkIntervalSklAddAndLookup(b *testing.B) { const txnID = "123" clock := hlc.NewClock(hlc.UnixNano, time.Millisecond) - s := newIntervalSkl(clock, MinRetentionWindow, defaultSklPageSize, makeSklMetrics()) + s := newIntervalSkl(clock, MinRetentionWindow, makeSklMetrics()) rng := rand.New(rand.NewSource(timeutil.Now().UnixNano())) for i := 0; i < data; i++ { diff --git a/pkg/kv/kvserver/tscache/skl_impl.go b/pkg/kv/kvserver/tscache/skl_impl.go index 2d617c285e3d..93a74e608f11 100644 --- a/pkg/kv/kvserver/tscache/skl_impl.go +++ b/pkg/kv/kvserver/tscache/skl_impl.go @@ -19,44 +19,30 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" ) -const ( - // defaultSklPageSize is the default size of each page in the sklImpl's - // read and write intervalSkl. - defaultSklPageSize = 32 << 20 // 32 MB - // TestSklPageSize is passed to tests as the size of each page in the - // sklImpl to limit its memory footprint. Reducing this size can hurt - // performance but it decreases the risk of OOM failures when many tests - // are running concurrently. - TestSklPageSize = 128 << 10 // 128 KB -) - -// sklImpl implements the Cache interface. It maintains a pair of skiplists -// containing keys or key ranges and the timestamps at which they were most -// recently read or written. If a timestamp was read or written by a -// transaction, the txn ID is stored with the timestamp to avoid advancing -// timestamps on successive requests from the same transaction. +// sklImpl implements the Cache interface. It maintains a collection of +// skiplists containing keys or key ranges and the timestamps at which +// they were most recently read or written. If a timestamp was read or +// written by a transaction, the txn ID is stored with the timestamp to +// avoid advancing timestamps on successive requests from the same +// transaction. type sklImpl struct { - cache *intervalSkl - clock *hlc.Clock - pageSize uint32 - metrics Metrics + cache *intervalSkl + clock *hlc.Clock + metrics Metrics } var _ Cache = &sklImpl{} // newSklImpl returns a new treeImpl with the supplied hybrid clock. -func newSklImpl(clock *hlc.Clock, pageSize uint32) *sklImpl { - if pageSize == 0 { - pageSize = defaultSklPageSize - } - tc := sklImpl{clock: clock, pageSize: pageSize, metrics: makeMetrics()} +func newSklImpl(clock *hlc.Clock) *sklImpl { + tc := sklImpl{clock: clock, metrics: makeMetrics()} tc.clear(clock.Now()) return &tc } // clear clears the cache and resets the low-water mark. func (tc *sklImpl) clear(lowWater hlc.Timestamp) { - tc.cache = newIntervalSkl(tc.clock, MinRetentionWindow, tc.pageSize, tc.metrics.Skl) + tc.cache = newIntervalSkl(tc.clock, MinRetentionWindow, tc.metrics.Skl) tc.cache.floorTS = lowWater } @@ -101,7 +87,7 @@ func (tc *sklImpl) boundKeyLengths(start, end roachpb.Key) (roachpb.Key, roachpb // and still not trigger the "key range too large" panic in intervalSkl, // but anything larger could require multiple page rotations before it's // able to fit in if other ranges are being added concurrently. - maxKeySize := int(tc.pageSize / 32) + maxKeySize := int(maximumSklPageSize / 32) // If either key is too long, truncate its length, making sure to always // grow the [start,end) range instead of shrinking it. This will reduce the diff --git a/pkg/server/config.go b/pkg/server/config.go index 09cec322171d..a412c6592f22 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -212,10 +212,6 @@ type Config struct { // ReadWithinUncertaintyIntervalError. MaxOffset MaxOffsetType - // TimestampCachePageSize is the size in bytes of the pages in the - // timestamp cache held by each store. - TimestampCachePageSize uint32 - // ScanInterval determines a duration during which each range should be // visited approximately once by the range scanner. Set to 0 to disable. // Environment Variable: COCKROACH_SCAN_INTERVAL diff --git a/pkg/server/server.go b/pkg/server/server.go index a8d0fe32c15b..5aab2a6f7aaf 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -428,7 +428,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { ScanInterval: cfg.ScanInterval, ScanMinIdleTime: cfg.ScanMinIdleTime, ScanMaxIdleTime: cfg.ScanMaxIdleTime, - TimestampCachePageSize: cfg.TimestampCachePageSize, HistogramWindowInterval: cfg.HistogramWindowInterval(), StorePool: storePool, SQLExecutor: internalExecutor, diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 6ad2fb813aa6..92566b5f4eba 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -36,7 +36,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptprovider" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/tscache" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -108,7 +107,6 @@ func makeTestConfig(st *cluster.Settings) Config { cfg.HTTPAddr = util.TestAddr.String() // Set standard user for intra-cluster traffic. cfg.User = security.NodeUser - cfg.TimestampCachePageSize = tscache.TestSklPageSize // Enable web session authentication. cfg.EnableWebSessionAuthentication = true diff --git a/pkg/testutils/localtestcluster/local_test_cluster.go b/pkg/testutils/localtestcluster/local_test_cluster.go index 66a884ef21f5..4dc61f88351b 100644 --- a/pkg/testutils/localtestcluster/local_test_cluster.go +++ b/pkg/testutils/localtestcluster/local_test_cluster.go @@ -24,7 +24,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/tscache" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -172,7 +171,6 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto /* deterministic */ false, ) cfg.Transport = transport - cfg.TimestampCachePageSize = tscache.TestSklPageSize ctx := context.TODO() if err := kvserver.WriteClusterVersion(ctx, ltc.Eng, clusterversion.TestingClusterVersion); err != nil {