Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: ctx fixes #9661

Merged
merged 2 commits into from
Oct 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,9 +484,15 @@ func newReplica(rangeID roachpb.RangeID, store *Store) *Replica {
store: store,
abortCache: NewAbortCache(rangeID),
}
// TODO(radu): we can do better than store.Ctx() here.
r.raftMu = syncutil.MakeTimedMutex(store.Ctx(), defaultReplicaRaftMuWarnThreshold)
r.mu.TimedMutex = syncutil.MakeTimedMutex(store.Ctx(), defaultReplicaMuWarnThreshold)

// Init rangeStr with the range ID.
r.rangeStr.store(&roachpb.RangeDescriptor{RangeID: rangeID})

// Add replica log tag - the value is rangeStr.String().
r.ctx = log.WithLogTag(store.Ctx(), "r", &r.rangeStr)

r.raftMu = syncutil.MakeTimedMutex(r.ctx, defaultReplicaRaftMuWarnThreshold)
r.mu.TimedMutex = syncutil.MakeTimedMutex(r.ctx, defaultReplicaMuWarnThreshold)
r.mu.outSnapDone = initialOutSnapDone
return r
}
Expand Down Expand Up @@ -520,7 +526,7 @@ func (r *Replica) initLocked(
desc *roachpb.RangeDescriptor, clock *hlc.Clock, replicaID roachpb.ReplicaID,
) error {
if r.mu.state.Desc != nil && r.isInitializedLocked() {
log.Fatalf(r.store.Ctx(), "r%d: cannot reinitialize an initialized replica", desc.RangeID)
log.Fatalf(r.ctx, "r%d: cannot reinitialize an initialized replica", desc.RangeID)
}
if desc.IsInitialized() && replicaID != 0 {
return errors.Errorf("replicaID must be 0 when creating an initialized replica")
Expand All @@ -536,23 +542,18 @@ func (r *Replica) initLocked(
r.mu.internalRaftGroup = nil

var err error
ctx := r.store.Ctx()

if r.mu.state, err = loadState(ctx, r.store.Engine(), desc); err != nil {
if r.mu.state, err = loadState(r.ctx, r.store.Engine(), desc); err != nil {
return err
}
r.rangeStr.store(r.mu.state.Desc)

r.mu.lastIndex, err = loadLastIndex(ctx, r.store.Engine(), r.RangeID)
r.mu.lastIndex, err = loadLastIndex(r.ctx, r.store.Engine(), r.RangeID)
if err != nil {
return err
}

// Add replica log tags - the value is rangeStr.String().
ctx = log.WithLogTag(ctx, "r", &r.rangeStr)
r.ctx = ctx

pErr, err := loadReplicaDestroyedError(ctx, r.store.Engine(), r.RangeID)
pErr, err := loadReplicaDestroyedError(r.ctx, r.store.Engine(), r.RangeID)
if err != nil {
return err
}
Expand Down Expand Up @@ -640,7 +641,7 @@ func (r *Replica) destroyDataRaftMuLocked() error {
tombstone := &roachpb.RaftTombstone{
NextReplicaID: desc.NextReplicaID,
}
if err := engine.MVCCPutProto(context.Background(), batch, nil, tombstoneKey, hlc.ZeroTimestamp, nil, tombstone); err != nil {
if err := engine.MVCCPutProto(r.ctx, batch, nil, tombstoneKey, hlc.ZeroTimestamp, nil, tombstone); err != nil {
return err
}

Expand Down Expand Up @@ -2376,7 +2377,7 @@ func (r *Replica) sendRaftMessage(msg raftpb.Message) {
r.store.Stopper().RunWorker(func() {
defer r.CloseOutSnap()
if err := r.store.ctx.Transport.SendSnapshot(
context.Background(),
r.ctx,
SnapshotRequest_Header{
RangeDescriptor: *r.Desc(),
RaftMessageRequest: RaftMessageRequest{
Expand Down
2 changes: 1 addition & 1 deletion storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ func intersectSpan(
}
if bytes.Compare(span.Key, keys.LocalRangeMax) < 0 {
if bytes.Compare(span.EndKey, keys.LocalRangeMax) >= 0 {
log.Fatalf(context.Background(), "a local intent range may not have a non-local portion: %s", span)
panic(fmt.Sprintf("a local intent range may not have a non-local portion: %s", span))
}
if containsKeyRange(desc, span.Key, span.EndKey) {
return &span, nil
Expand Down
12 changes: 6 additions & 6 deletions storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var _ raft.Storage = (*Replica)(nil)
// InitialState implements the raft.Storage interface.
// InitialState requires that the replica lock be held.
func (r *Replica) InitialState() (raftpb.HardState, raftpb.ConfState, error) {
hs, err := loadHardState(context.Background(), r.store.Engine(), r.RangeID)
hs, err := loadHardState(r.ctx, r.store.Engine(), r.RangeID)
// For uninitialized ranges, membership is unknown at this point.
if raft.IsEmptyHardState(hs) || err != nil {
return raftpb.HardState{}, raftpb.ConfState{}, err
Expand All @@ -74,7 +74,7 @@ func (r *Replica) InitialState() (raftpb.HardState, raftpb.ConfState, error) {
func (r *Replica) Entries(lo, hi, maxBytes uint64) ([]raftpb.Entry, error) {
snap := r.store.NewSnapshot()
defer snap.Close()
return entries(context.Background(), snap, r.RangeID, r.store.raftEntryCache, lo, hi, maxBytes)
return entries(r.ctx, snap, r.RangeID, r.store.raftEntryCache, lo, hi, maxBytes)
}

func entries(
Expand Down Expand Up @@ -251,7 +251,7 @@ func (r *Replica) raftTruncatedStateLocked(ctx context.Context) (roachpb.RaftTru
// FirstIndex implements the raft.Storage interface.
// FirstIndex requires that the replica lock is held.
func (r *Replica) FirstIndex() (uint64, error) {
ts, err := r.raftTruncatedStateLocked(context.Background())
ts, err := r.raftTruncatedStateLocked(r.ctx)
if err != nil {
return 0, err
}
Expand All @@ -269,7 +269,7 @@ func (r *Replica) GetFirstIndex() (uint64, error) {
// Snapshot implements the raft.Storage interface.
// Snapshot requires that the replica lock is held.
func (r *Replica) Snapshot() (raftpb.Snapshot, error) {
snap, err := r.SnapshotWithContext(context.Background())
snap, err := r.SnapshotWithContext(r.ctx)
if err != nil {
return raftpb.Snapshot{}, err
}
Expand Down Expand Up @@ -306,15 +306,15 @@ func (r *Replica) SnapshotWithContext(ctx context.Context) (*OutgoingSnapshot, e
startKey := r.mu.state.Desc.StartKey

sp := r.store.Tracer().StartSpan("snapshot")
ctxInner := opentracing.ContextWithSpan(context.Background(), sp)
ctxInner := opentracing.ContextWithSpan(r.ctx, sp)
defer sp.Finish()
snap := r.store.NewSnapshot()
log.Eventf(ctxInner, "new engine snapshot for replica %s", r)

// Delegate to a static function to make sure that we do not depend
// on any indirect calls to r.store.Engine() (or other in-memory
// state of the Replica). Everything must come from the snapshot.
snapData, err := snapshot(context.Background(), snap, rangeID, r.store.raftEntryCache, startKey)
snapData, err := snapshot(r.ctx, snap, rangeID, r.store.raftEntryCache, startKey)
if err != nil {
log.Errorf(ctxInner, "%s: error generating snapshot: %s", r, err)
return nil, err
Expand Down
4 changes: 1 addition & 3 deletions storage/replica_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,8 @@ func (r *Replica) handleTrigger(
}
}

// TODO(radu): we should provide a base context that contains the
// node and range IDs.
splitTriggerPostCommit(
context.Background(),
r.ctx,
trigger.split.RightDeltaMS,
&trigger.split.SplitTrigger,
r,
Expand Down
16 changes: 9 additions & 7 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ func IterateRangeDescriptors(ctx context.Context,
return fn(desc)
}

_, err := engine.MVCCIterate(context.Background(), eng, start, end, hlc.MaxTimestamp, false /* !consistent */, nil, /* txn */
_, err := engine.MVCCIterate(ctx, eng, start, end, hlc.MaxTimestamp, false /* !consistent */, nil, /* txn */
false /* !reverse */, kvToDesc)
log.Eventf(ctx, "iterated over %d keys to find %d range descriptors (by suffix: %v)",
allCount, matchCount, bySuffix)
Expand Down Expand Up @@ -1227,11 +1227,11 @@ func (s *Store) Bootstrap(ident roachpb.StoreIdent, stopper *stop.Stopper) error
return errors.Errorf("store %s: unable to access: %s", s.engine, err)
} else if len(kvs) > 0 {
// See if this is an already-bootstrapped store.
ok, err := engine.MVCCGetProto(context.Background(), s.engine, keys.StoreIdentKey(), hlc.ZeroTimestamp, true, nil, &s.Ident)
if err != nil {
if ok, err := engine.MVCCGetProto(
s.Ctx(), s.engine, keys.StoreIdentKey(), hlc.ZeroTimestamp, true, nil, &s.Ident,
); err != nil {
return errors.Errorf("store %s is non-empty but cluster ID could not be determined: %s", s.engine, err)
}
if ok {
} else if ok {
return errors.Errorf("store %s already belongs to cockroach cluster %s", s.engine, s.Ident.ClusterID)
}
keyVals := []string{}
Expand All @@ -1240,7 +1240,7 @@ func (s *Store) Bootstrap(ident roachpb.StoreIdent, stopper *stop.Stopper) error
}
return errors.Errorf("store %s is non-empty but does not contain store metadata (first %d key/values: %s)", s.engine, len(keyVals), keyVals)
}
err = engine.MVCCPutProto(context.Background(), s.engine, nil,
err = engine.MVCCPutProto(s.Ctx(), s.engine, nil,
keys.StoreIdentKey(), hlc.ZeroTimestamp, nil, &s.Ident)
if err != nil {
return err
Expand Down Expand Up @@ -2926,7 +2926,9 @@ func (s *Store) tryGetOrCreateReplica(
// a stale message.
tombstoneKey := keys.RaftTombstoneKey(rangeID)
var tombstone roachpb.RaftTombstone
if ok, err := engine.MVCCGetProto(context.Background(), s.Engine(), tombstoneKey, hlc.ZeroTimestamp, true, nil, &tombstone); err != nil {
if ok, err := engine.MVCCGetProto(
s.Ctx(), s.Engine(), tombstoneKey, hlc.ZeroTimestamp, true, nil, &tombstone,
); err != nil {
return nil, false, err
} else if ok {
if replicaID != 0 && replicaID < tombstone.NextReplicaID {
Expand Down
4 changes: 2 additions & 2 deletions storage/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (ls *Stores) ReadBootstrapInfo(bi *gossip.BootstrapInfo) error {
// Find the most recent bootstrap info.
for _, s := range ls.storeMap {
var storeBI gossip.BootstrapInfo
ok, err := engine.MVCCGetProto(context.Background(), s.engine, keys.StoreGossipKey(), hlc.ZeroTimestamp, true, nil, &storeBI)
ok, err := engine.MVCCGetProto(ls.ctx, s.engine, keys.StoreGossipKey(), hlc.ZeroTimestamp, true, nil, &storeBI)
if err != nil {
return err
}
Expand Down Expand Up @@ -315,7 +315,7 @@ func (ls *Stores) updateBootstrapInfo(bi *gossip.BootstrapInfo) error {
ls.latestBI = protoutil.Clone(bi).(*gossip.BootstrapInfo)
// Update all stores.
for _, s := range ls.storeMap {
if err := engine.MVCCPutProto(context.Background(), s.engine, nil, keys.StoreGossipKey(), hlc.ZeroTimestamp, nil, bi); err != nil {
if err := engine.MVCCPutProto(ls.ctx, s.engine, nil, keys.StoreGossipKey(), hlc.ZeroTimestamp, nil, bi); err != nil {
return err
}
}
Expand Down