-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathstores_server.go
183 lines (171 loc) · 6.27 KB
/
stores_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
// Copyright 2016 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kvserver
import (
"bytes"
"context"
"time"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/redact"
)
// Server implements PerReplicaServer.
type Server struct {
stores *Stores
}
var _ PerReplicaServer = Server{}
var _ PerStoreServer = Server{}
// MakeServer returns a new instance of Server.
func MakeServer(descriptor *roachpb.NodeDescriptor, stores *Stores) Server {
return Server{stores}
}
func (is Server) execStoreCommand(
ctx context.Context, h StoreRequestHeader, f func(context.Context, *Store) error,
) error {
store, err := is.stores.GetStore(h.StoreID)
if err != nil {
return err
}
// NB: we use a task here to prevent errant RPCs that arrive after stopper shutdown from
// causing crashes. See #56085 for an example of such a crash.
return store.stopper.RunTaskWithErr(ctx, "store command", func(ctx context.Context) error {
return f(ctx, store)
})
}
// CollectChecksum implements PerReplicaServer.
func (is Server) CollectChecksum(
ctx context.Context, req *CollectChecksumRequest,
) (*CollectChecksumResponse, error) {
resp := &CollectChecksumResponse{}
err := is.execStoreCommand(ctx, req.StoreRequestHeader,
func(ctx context.Context, s *Store) error {
r, err := s.GetReplica(req.RangeID)
if err != nil {
return err
}
c, err := r.getChecksum(ctx, req.ChecksumID)
if err != nil {
return err
}
ccr := c.CollectChecksumResponse
if !bytes.Equal(req.Checksum, ccr.Checksum) {
// If this check is false, then this request is the replica carrying out
// the consistency check. The message is spurious, but we want to leave the
// snapshot (if present) intact.
if len(req.Checksum) > 0 {
log.Errorf(ctx, "consistency check failed on range r%d: expected checksum %x, got %x",
req.RangeID, redact.Safe(req.Checksum), redact.Safe(ccr.Checksum))
// Leave resp.Snapshot alone so that the caller will receive what's
// in it (if anything).
}
} else {
ccr.Snapshot = nil
}
resp = &ccr
return nil
})
return resp, err
}
// WaitForApplication implements PerReplicaServer.
//
// It is the caller's responsibility to cancel or set a timeout on the context.
// If the context is never canceled, WaitForApplication will retry forever.
func (is Server) WaitForApplication(
ctx context.Context, req *WaitForApplicationRequest,
) (*WaitForApplicationResponse, error) {
resp := &WaitForApplicationResponse{}
err := is.execStoreCommand(ctx, req.StoreRequestHeader, func(ctx context.Context, s *Store) error {
// TODO(benesch): Once Replica changefeeds land, see if we can implement
// this request handler without polling.
retryOpts := retry.Options{InitialBackoff: 10 * time.Millisecond}
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
// Long-lived references to replicas are frowned upon, so re-fetch the
// replica on every turn of the loop.
repl, err := s.GetReplica(req.RangeID)
if err != nil {
return err
}
repl.mu.RLock()
leaseAppliedIndex := repl.mu.state.LeaseAppliedIndex
repl.mu.RUnlock()
if leaseAppliedIndex >= req.LeaseIndex {
// For performance reasons, we don't sync to disk when
// applying raft commands. This means that if a node restarts
// after applying but before the next sync, its
// LeaseAppliedIndex could temporarily regress (until it
// reapplies its latest raft log entries).
//
// Merging relies on the monotonicity of the log applied
// index, so before returning ensure that rocksdb has synced
// everything up to this point to disk.
//
// https://github.com/cockroachdb/cockroach/issues/33120
return storage.WriteSyncNoop(s.engine)
}
}
if ctx.Err() == nil {
log.Fatal(ctx, "infinite retry loop exited but context has no error")
}
return ctx.Err()
})
return resp, err
}
// WaitForReplicaInit implements PerReplicaServer.
//
// It is the caller's responsibility to cancel or set a timeout on the context.
// If the context is never canceled, WaitForReplicaInit will retry forever.
func (is Server) WaitForReplicaInit(
ctx context.Context, req *WaitForReplicaInitRequest,
) (*WaitForReplicaInitResponse, error) {
resp := &WaitForReplicaInitResponse{}
err := is.execStoreCommand(ctx, req.StoreRequestHeader, func(ctx context.Context, s *Store) error {
retryOpts := retry.Options{InitialBackoff: 10 * time.Millisecond}
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
// Long-lived references to replicas are frowned upon, so re-fetch the
// replica on every turn of the loop.
if repl, err := s.GetReplica(req.RangeID); err == nil && repl.IsInitialized() {
return nil
}
}
if ctx.Err() == nil {
log.Fatal(ctx, "infinite retry loop exited but context has no error")
}
return ctx.Err()
})
return resp, err
}
// CompactEngineSpan implements PerStoreServer. It blocks until the compaction
// is done, so it can be a long-lived RPC.
func (is Server) CompactEngineSpan(
ctx context.Context, req *CompactEngineSpanRequest,
) (*CompactEngineSpanResponse, error) {
resp := &CompactEngineSpanResponse{}
err := is.execStoreCommand(ctx, req.StoreRequestHeader,
func(ctx context.Context, s *Store) error {
return s.Engine().CompactRange(req.Span.Key, req.Span.EndKey)
})
return resp, err
}
// SetCompactionConcurrency implements PerStoreServer. It changes the compaction
// concurrency of a store.
func (is Server) SetCompactionConcurrency(
ctx context.Context, req *CompactionConcurrencyRequest,
) (*CompactionConcurrencyResponse, error) {
resp := &CompactionConcurrencyResponse{}
err := is.execStoreCommand(ctx, req.StoreRequestHeader,
func(ctx context.Context, s *Store) error {
prevConcurrency := s.Engine().SetCompactionConcurrency(req.CompactionConcurrency)
resp.OldCompactionConcurrency = prevConcurrency
return nil
})
return resp, err
}