-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
logstore.go
616 lines (563 loc) · 21.6 KB
/
logstore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
// Package logstore implements the Raft log storage.
package logstore
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"
)
var disableSyncRaftLog = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.raft_log.disable_synchronization_unsafe",
"disables synchronization of Raft log writes to persistent storage. "+
"Setting to true risks data loss or data corruption on process or OS crashes. "+
"This not only disables fsync, but also disables flushing writes to the OS buffer. "+
"The setting is meant for internal testing only and SHOULD NOT be used in production.",
envutil.EnvOrDefaultBool("COCKROACH_DISABLE_RAFT_LOG_SYNCHRONIZATION_UNSAFE", false),
)
var enableNonBlockingRaftLogSync = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.raft_log.non_blocking_synchronization.enabled",
"set to true to enable non-blocking synchronization on Raft log writes to "+
"persistent storage. Setting to true does not risk data loss or data corruption "+
"on server crashes, but can reduce write latency.",
envutil.EnvOrDefaultBool("COCKROACH_ENABLE_RAFT_LOG_NON_BLOCKING_SYNCHRONIZATION", true),
)
// MsgStorageAppend is a raftpb.Message with type MsgStorageAppend.
type MsgStorageAppend raftpb.Message
// MakeMsgStorageAppend constructs a MsgStorageAppend from a raftpb.Message.
func MakeMsgStorageAppend(m raftpb.Message) MsgStorageAppend {
if m.Type != raftpb.MsgStorageAppend {
panic(fmt.Sprintf("unexpected message type %s", m.Type))
}
return MsgStorageAppend(m)
}
// RaftState stores information about the last entry and the size of the log.
type RaftState struct {
LastIndex kvpb.RaftIndex
LastTerm kvpb.RaftTerm
ByteSize int64
}
// AppendStats describes a completed log storage append operation.
type AppendStats struct {
Begin time.Time
End time.Time
RegularEntries int
RegularBytes int64
SideloadedEntries int
SideloadedBytes int64
PebbleBegin time.Time
PebbleEnd time.Time
PebbleBytes int64
// Only set when !NonBlocking, which means almost never, since
// kv.raft_log.non_blocking_synchronization.enabled defaults to true.
PebbleCommitStats storage.BatchCommitStats
Sync bool
// If true, PebbleEnd-PebbleBegin does not include the sync time.
NonBlocking bool
}
// Metrics contains metrics specific to the log storage.
type Metrics struct {
RaftLogCommitLatency metric.IHistogram
}
// LogStore is a stub of a separated Raft log storage.
type LogStore struct {
RangeID roachpb.RangeID
Engine storage.Engine
Sideload SideloadStorage
StateLoader StateLoader
SyncWaiter *SyncWaiterLoop
EntryCache *raftentry.Cache
Settings *cluster.Settings
Metrics Metrics
}
// SyncCallback is a callback that is notified when a raft log write has been
// durably committed to disk. The function is handed the response messages that
// are associated with the MsgStorageAppend that triggered the fsync.
// commitStats is populated iff this was a non-blocking sync.
type SyncCallback interface {
OnLogSync(context.Context, []raftpb.Message, storage.BatchCommitStats)
}
func newStoreEntriesBatch(eng storage.Engine) storage.Batch {
// Use an unindexed batch because we don't need to read our writes, and
// it is more efficient.
return eng.NewUnindexedBatch()
}
// StoreEntries persists newly appended Raft log Entries to the log storage,
// then calls the provided callback with the input's response messages (if any)
// once the entries are durable. The durable log write may or may not be
// blocking (and therefore the callback may or may not be called synchronously),
// depending on the kv.raft_log.non_blocking_synchronization.enabled cluster
// setting. Either way, the effects of the log append will be immediately
// visible to readers of the Engine.
//
// Accepts the state of the log before the operation, returns the state after.
// Persists HardState atomically with, or strictly after Entries.
func (s *LogStore) StoreEntries(
ctx context.Context, state RaftState, m MsgStorageAppend, cb SyncCallback, stats *AppendStats,
) (RaftState, error) {
batch := newStoreEntriesBatch(s.Engine)
return s.storeEntriesAndCommitBatch(ctx, state, m, cb, stats, batch)
}
// storeEntriesAndCommitBatch is like StoreEntries, but it accepts a
// storage.Batch, which it takes responsibility for committing and closing.
func (s *LogStore) storeEntriesAndCommitBatch(
ctx context.Context,
state RaftState,
m MsgStorageAppend,
cb SyncCallback,
stats *AppendStats,
batch storage.Batch,
) (RaftState, error) {
// Before returning, Close the batch if we haven't handed ownership of it to a
// SyncWaiterLoop. If batch == nil, SyncWaiterLoop is responsible for closing
// it once the in-progress disk writes complete.
defer func() {
if batch != nil {
batch.Close()
}
}()
prevLastIndex := state.LastIndex
overwriting := false
if len(m.Entries) > 0 {
firstPurge := kvpb.RaftIndex(m.Entries[0].Index) // first new entry written
overwriting = firstPurge <= prevLastIndex
stats.Begin = timeutil.Now()
// All of the entries are appended to distinct keys, returning a new
// last index.
thinEntries, numSideloaded, sideLoadedEntriesSize, otherEntriesSize, err := MaybeSideloadEntries(ctx, m.Entries, s.Sideload)
if err != nil {
const expl = "during sideloading"
return RaftState{}, errors.Wrap(err, expl)
}
state.ByteSize += sideLoadedEntriesSize
if state, err = logAppend(
ctx, s.StateLoader.RaftLogPrefix(), batch, state, thinEntries,
); err != nil {
const expl = "during append"
return RaftState{}, errors.Wrap(err, expl)
}
stats.RegularEntries += len(thinEntries) - numSideloaded
stats.RegularBytes += otherEntriesSize
stats.SideloadedEntries += numSideloaded
stats.SideloadedBytes += sideLoadedEntriesSize
stats.End = timeutil.Now()
}
hs := raftpb.HardState{
Term: m.Term,
Vote: m.Vote,
Commit: m.Commit,
}
if !raft.IsEmptyHardState(hs) {
// NB: Note that without additional safeguards, it's incorrect to write
// the HardState before appending m.Entries. When catching up, a follower
// will receive Entries that are immediately Committed in the same
// Ready. If we persist the HardState but happen to lose the Entries,
// assertions can be tripped.
//
// We have both in the same batch, so there's no problem. If that ever
// changes, we must write and sync the Entries before the HardState.
if err := s.StateLoader.SetHardState(ctx, batch, hs); err != nil {
const expl = "during setHardState"
return RaftState{}, errors.Wrap(err, expl)
}
}
// Synchronously commit the batch with the Raft log entries and Raft hard
// state as we're promising not to lose this data.
//
// Note that the data is visible to other goroutines before it is synced to
// disk. This is fine. The important constraints are that these syncs happen
// before the MsgStorageAppend's responses are delivered back to the RawNode.
// Our regular locking is sufficient for this and if other goroutines can see
// the data early, that's fine. In particular, snapshots are not a problem (I
// think they're the only thing that might access log entries or HardState
// from other goroutines). Snapshots do not include either the HardState or
// uncommitted log entries, and even if they did include log entries that
// were not persisted to disk, it wouldn't be a problem because raft does not
// infer the that entries are persisted on the node that sends a snapshot.
//
// TODO(pavelkalinnikov): revisit the comment above (written in 82cbb49). It
// communicates an important invariant, but is hard to grok now and can be
// outdated. Raft invariants are in the responsibility of the layer above
// (Replica), so this comment might need to move.
stats.PebbleBegin = timeutil.Now()
stats.PebbleBytes = int64(batch.Len())
wantsSync := len(m.Responses) > 0
willSync := wantsSync && !disableSyncRaftLog.Get(&s.Settings.SV)
// Use the non-blocking log sync path if we are performing a log sync ...
nonBlockingSync := willSync &&
// and the cluster setting is enabled ...
enableNonBlockingRaftLogSync.Get(&s.Settings.SV) &&
// and we are not overwriting any previous log entries. If we are
// overwriting, we may need to purge the sideloaded SSTables associated with
// overwritten entries. This must be performed after the corresponding
// entries are durably replaced and it's easier to do ensure proper ordering
// using a blocking log sync. This is a rare case, so it's not worth
// optimizing for.
!overwriting &&
// Also, randomly disable non-blocking sync in test builds to exercise the
// interleaved blocking and non-blocking syncs.
!(buildutil.CrdbTestBuild && rand.Intn(2) == 0)
if nonBlockingSync {
// If non-blocking synchronization is enabled, apply the batched updates to
// the engine and initiate a synchronous disk write, but don't wait for the
// write to complete.
if err := batch.CommitNoSyncWait(); err != nil {
const expl = "while committing batch without sync wait"
return RaftState{}, errors.Wrap(err, expl)
}
stats.PebbleEnd = timeutil.Now()
// Instead, enqueue that waiting on the SyncWaiterLoop, who will signal the
// callback when the write completes.
waiterCallback := nonBlockingSyncWaiterCallbackPool.Get().(*nonBlockingSyncWaiterCallback)
*waiterCallback = nonBlockingSyncWaiterCallback{
ctx: ctx,
cb: cb,
msgs: m.Responses,
batch: batch,
metrics: s.Metrics,
logCommitBegin: stats.PebbleBegin,
}
s.SyncWaiter.enqueue(ctx, batch, waiterCallback)
// Do not Close batch on return. Will be Closed by SyncWaiterLoop.
batch = nil
} else {
if err := batch.Commit(willSync); err != nil {
const expl = "while committing batch"
return RaftState{}, errors.Wrap(err, expl)
}
stats.PebbleEnd = timeutil.Now()
stats.PebbleCommitStats = batch.CommitStats()
if wantsSync {
logCommitEnd := stats.PebbleEnd
s.Metrics.RaftLogCommitLatency.RecordValue(logCommitEnd.Sub(stats.PebbleBegin).Nanoseconds())
cb.OnLogSync(ctx, m.Responses, storage.BatchCommitStats{})
}
}
stats.Sync = wantsSync
stats.NonBlocking = nonBlockingSync
if overwriting {
// We may have just overwritten parts of the log which contain
// sideloaded SSTables from a previous term (and perhaps discarded some
// entries that we didn't overwrite). Remove any such leftover on-disk
// payloads (we can do that now because we've committed the deletion
// just above).
firstPurge := kvpb.RaftIndex(m.Entries[0].Index) // first new entry written
purgeTerm := kvpb.RaftTerm(m.Entries[0].Term - 1)
lastPurge := prevLastIndex // old end of the log, include in deletion
purgedSize, err := maybePurgeSideloaded(ctx, s.Sideload, firstPurge, lastPurge, purgeTerm)
if err != nil {
const expl = "while purging sideloaded storage"
return RaftState{}, errors.Wrap(err, expl)
}
state.ByteSize -= purgedSize
if state.ByteSize < 0 {
// Might have gone negative if node was recently restarted.
state.ByteSize = 0
}
}
// Update raft log entry cache. We clear any older, uncommitted log entries
// and cache the latest ones.
//
// In the blocking log sync case, these entries are already durable. In the
// non-blocking case, these entries have been written to the pebble engine (so
// reads of the engine will see them), but they are not yet be durable. This
// means that the entry cache can lead the durable log. This is allowed by
// etcd/raft, which maintains its own tracking of entry durability by
// splitting its log into an unstable portion for entries that are not known
// to be durable and a stable portion for entries that are known to be
// durable.
s.EntryCache.Add(s.RangeID, m.Entries, true /* truncate */)
return state, nil
}
// nonBlockingSyncWaiterCallback packages up the callback that is handed to the
// SyncWaiterLoop during a non-blocking Raft log sync. Structuring the callback
// as a struct with a method instead of an anonymous function avoids individual
// fields escaping to the heap. It also provides the opportunity to pool the
// callback.
type nonBlockingSyncWaiterCallback struct {
// Used to run SyncCallback.
ctx context.Context
cb SyncCallback
msgs []raftpb.Message
// Used to extract stats. This is the batch that has been synced.
batch storage.WriteBatch
// Used to record Metrics.
metrics Metrics
logCommitBegin time.Time
}
// run is the callback's logic. It is executed on the SyncWaiterLoop goroutine.
func (cb *nonBlockingSyncWaiterCallback) run() {
dur := timeutil.Since(cb.logCommitBegin).Nanoseconds()
cb.metrics.RaftLogCommitLatency.RecordValue(dur)
commitStats := cb.batch.CommitStats()
cb.cb.OnLogSync(cb.ctx, cb.msgs, commitStats)
cb.release()
}
func (cb *nonBlockingSyncWaiterCallback) release() {
*cb = nonBlockingSyncWaiterCallback{}
nonBlockingSyncWaiterCallbackPool.Put(cb)
}
var nonBlockingSyncWaiterCallbackPool = sync.Pool{
New: func() interface{} { return new(nonBlockingSyncWaiterCallback) },
}
var valPool = sync.Pool{
New: func() interface{} { return &roachpb.Value{} },
}
// logAppend adds the given entries to the raft log. Takes the previous log
// state, and returns the updated state. It's the caller's responsibility to
// maintain exclusive access to the raft log for the duration of the method
// call.
//
// logAppend is intentionally oblivious to the existence of sideloaded
// proposals. They are managed by the caller, including cleaning up obsolete
// on-disk payloads in case the log tail is replaced.
func logAppend(
ctx context.Context,
raftLogPrefix roachpb.Key,
rw storage.ReadWriter,
prev RaftState,
entries []raftpb.Entry,
) (RaftState, error) {
if len(entries) == 0 {
return prev, nil
}
var diff enginepb.MVCCStats
value := valPool.Get().(*roachpb.Value)
value.RawBytes = value.RawBytes[:0]
defer valPool.Put(value)
for i := range entries {
ent := &entries[i]
key := keys.RaftLogKeyFromPrefix(raftLogPrefix, kvpb.RaftIndex(ent.Index))
if err := value.SetProto(ent); err != nil {
return RaftState{}, err
}
value.InitChecksum(key)
var err error
if kvpb.RaftIndex(ent.Index) > prev.LastIndex {
err = storage.MVCCBlindPut(ctx, rw, &diff, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, *value, nil /* txn */)
} else {
err = storage.MVCCPut(ctx, rw, &diff, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, *value, nil /* txn */)
}
if err != nil {
return RaftState{}, err
}
}
newLastIndex := kvpb.RaftIndex(entries[len(entries)-1].Index)
// Delete any previously appended log entries which never committed.
if prev.LastIndex > 0 {
for i := newLastIndex + 1; i <= prev.LastIndex; i++ {
// Note that the caller is in charge of deleting any sideloaded payloads
// (which they must only do *after* the batch has committed).
_, err := storage.MVCCDelete(ctx, rw, &diff, keys.RaftLogKeyFromPrefix(raftLogPrefix, i),
hlc.Timestamp{}, hlc.ClockTimestamp{}, nil)
if err != nil {
return RaftState{}, err
}
}
}
return RaftState{
LastIndex: newLastIndex,
LastTerm: kvpb.RaftTerm(entries[len(entries)-1].Term),
ByteSize: prev.ByteSize + diff.SysBytes,
}, nil
}
// LoadTerm returns the term of the entry at the given index for the specified
// range. The result is loaded from the storage engine if it's not in the cache.
func LoadTerm(
ctx context.Context,
rsl StateLoader,
eng storage.Engine,
rangeID roachpb.RangeID,
eCache *raftentry.Cache,
index kvpb.RaftIndex,
) (kvpb.RaftTerm, error) {
entry, found := eCache.Get(rangeID, index)
if found {
return kvpb.RaftTerm(entry.Term), nil
}
reader := eng.NewReadOnly(storage.StandardDurability)
defer reader.Close()
if err := raftlog.Visit(reader, rangeID, index, index+1, func(ent raftpb.Entry) error {
if found {
return errors.Errorf("found more than one entry in [%d,%d)", index, index+1)
}
found = true
entry = ent
return nil
}); err != nil {
return 0, err
}
if found {
// Found an entry. Double-check that it has a correct index.
if got, want := kvpb.RaftIndex(entry.Index), index; got != want {
return 0, errors.Errorf("there is a gap at index %d, found entry #%d", want, got)
}
// Cache the entry except if it is sideloaded. We don't load/inline the
// sideloaded entries here to keep the term fetching cheap.
// TODO(pavelkalinnikov): consider not caching here, after measuring if it
// makes any difference.
typ, err := raftlog.EncodingOf(entry)
if err != nil {
return 0, err
}
if !typ.IsSideloaded() {
eCache.Add(rangeID, []raftpb.Entry{entry}, false /* truncate */)
}
return kvpb.RaftTerm(entry.Term), nil
}
// Otherwise, the entry at the given index is not found. This can happen if
// the index is ahead of lastIndex, or it has been compacted away.
lastIndex, err := rsl.LoadLastIndex(ctx, reader)
if err != nil {
return 0, err
}
if index > lastIndex {
return 0, raft.ErrUnavailable
}
ts, err := rsl.LoadRaftTruncatedState(ctx, reader)
if err != nil {
return 0, err
}
if index == ts.Index {
return ts.Term, nil
}
if index > ts.Index {
return 0, errors.Errorf("there is a gap at index %d", index)
}
return 0, raft.ErrCompacted
}
// LoadEntries retrieves entries from the engine. It inlines the sideloaded
// entries, and caches all the loaded entries. The size of the returned entries
// does not exceed maxSize, unless only one entry is returned.
//
// TODO(pavelkalinnikov): return all entries we've read, consider maxSize a
// target size. Currently we may read one extra entry and drop it.
func LoadEntries(
ctx context.Context,
rsl StateLoader,
eng storage.Engine,
rangeID roachpb.RangeID,
eCache *raftentry.Cache,
sideloaded SideloadStorage,
lo, hi kvpb.RaftIndex,
maxBytes uint64,
) (_ []raftpb.Entry, _cachedSize uint64, _loadedSize uint64, _ error) {
if lo > hi {
return nil, 0, 0, errors.Errorf("lo:%d is greater than hi:%d", lo, hi)
}
n := hi - lo
if n > 100 {
n = 100
}
ents := make([]raftpb.Entry, 0, n)
ents, cachedSize, hitIndex, exceededMaxBytes := eCache.Scan(ents, rangeID, lo, hi, maxBytes)
// Return results if the correct number of results came back or if
// we ran into the max bytes limit.
if kvpb.RaftIndex(len(ents)) == hi-lo || exceededMaxBytes {
return ents, cachedSize, 0, nil
}
combinedSize := cachedSize // size tracks total size of ents.
// Scan over the log to find the requested entries in the range [lo, hi),
// stopping once we have enough.
expectedIndex := hitIndex
scanFunc := func(ent raftpb.Entry) error {
// Exit early if we have any gaps or it has been compacted.
if kvpb.RaftIndex(ent.Index) != expectedIndex {
return iterutil.StopIteration()
}
expectedIndex++
typ, err := raftlog.EncodingOf(ent)
if err != nil {
return err
}
if typ.IsSideloaded() {
newEnt, err := MaybeInlineSideloadedRaftCommand(
ctx, rangeID, ent, sideloaded, eCache,
)
if err != nil {
return err
}
if newEnt != nil {
ent = *newEnt
}
}
// Note that we track the size of proposals with payloads inlined.
combinedSize += uint64(ent.Size())
if combinedSize > maxBytes {
exceededMaxBytes = true
if len(ents) == 0 { // make sure to return at least one entry
ents = append(ents, ent)
}
return iterutil.StopIteration()
}
ents = append(ents, ent)
return nil
}
reader := eng.NewReadOnly(storage.StandardDurability)
defer reader.Close()
if err := raftlog.Visit(reader, rangeID, expectedIndex, hi, scanFunc); err != nil {
return nil, 0, 0, err
}
eCache.Add(rangeID, ents, false /* truncate */)
// Did the correct number of results come back? If so, we're all good.
if kvpb.RaftIndex(len(ents)) == hi-lo {
return ents, cachedSize, combinedSize - cachedSize, nil
}
// Did we hit the size limit? If so, return what we have.
if exceededMaxBytes {
return ents, cachedSize, combinedSize - cachedSize, nil
}
// Did we get any results at all? Because something went wrong.
if len(ents) > 0 {
// Was the missing index after the last index?
lastIndex, err := rsl.LoadLastIndex(ctx, reader)
if err != nil {
return nil, 0, 0, err
}
if lastIndex <= expectedIndex {
return nil, 0, 0, raft.ErrUnavailable
}
// We have a gap in the record, if so, return a nasty error.
return nil, 0, 0, errors.Errorf("there is a gap in the index record between lo:%d and hi:%d at index:%d", lo, hi, expectedIndex)
}
// No results, was it due to unavailability or truncation?
ts, err := rsl.LoadRaftTruncatedState(ctx, reader)
if err != nil {
return nil, 0, 0, err
}
if ts.Index >= lo {
// The requested lo index has already been truncated.
return nil, 0, 0, raft.ErrCompacted
}
// The requested lo index does not yet exist.
return nil, 0, 0, raft.ErrUnavailable
}