Skip to content

Commit

Permalink
Merge pull request #9661 from RaduBerinde/storage-ctx-fixes
Browse files Browse the repository at this point in the history
storage: ctx fixes
  • Loading branch information
RaduBerinde authored Oct 1, 2016
2 parents 31cd68a + 8842b34 commit 38005f6
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 33 deletions.
29 changes: 15 additions & 14 deletions storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,9 +484,15 @@ func newReplica(rangeID roachpb.RangeID, store *Store) *Replica {
store: store,
abortCache: NewAbortCache(rangeID),
}
// TODO(radu): we can do better than store.Ctx() here.
r.raftMu = syncutil.MakeTimedMutex(store.Ctx(), defaultReplicaRaftMuWarnThreshold)
r.mu.TimedMutex = syncutil.MakeTimedMutex(store.Ctx(), defaultReplicaMuWarnThreshold)

// Init rangeStr with the range ID.
r.rangeStr.store(&roachpb.RangeDescriptor{RangeID: rangeID})

// Add replica log tag - the value is rangeStr.String().
r.ctx = log.WithLogTag(store.Ctx(), "r", &r.rangeStr)

r.raftMu = syncutil.MakeTimedMutex(r.ctx, defaultReplicaRaftMuWarnThreshold)
r.mu.TimedMutex = syncutil.MakeTimedMutex(r.ctx, defaultReplicaMuWarnThreshold)
r.mu.outSnapDone = initialOutSnapDone
return r
}
Expand Down Expand Up @@ -520,7 +526,7 @@ func (r *Replica) initLocked(
desc *roachpb.RangeDescriptor, clock *hlc.Clock, replicaID roachpb.ReplicaID,
) error {
if r.mu.state.Desc != nil && r.isInitializedLocked() {
log.Fatalf(r.store.Ctx(), "r%d: cannot reinitialize an initialized replica", desc.RangeID)
log.Fatalf(r.ctx, "r%d: cannot reinitialize an initialized replica", desc.RangeID)
}
if desc.IsInitialized() && replicaID != 0 {
return errors.Errorf("replicaID must be 0 when creating an initialized replica")
Expand All @@ -536,23 +542,18 @@ func (r *Replica) initLocked(
r.mu.internalRaftGroup = nil

var err error
ctx := r.store.Ctx()

if r.mu.state, err = loadState(ctx, r.store.Engine(), desc); err != nil {
if r.mu.state, err = loadState(r.ctx, r.store.Engine(), desc); err != nil {
return err
}
r.rangeStr.store(r.mu.state.Desc)

r.mu.lastIndex, err = loadLastIndex(ctx, r.store.Engine(), r.RangeID)
r.mu.lastIndex, err = loadLastIndex(r.ctx, r.store.Engine(), r.RangeID)
if err != nil {
return err
}

// Add replica log tags - the value is rangeStr.String().
ctx = log.WithLogTag(ctx, "r", &r.rangeStr)
r.ctx = ctx

pErr, err := loadReplicaDestroyedError(ctx, r.store.Engine(), r.RangeID)
pErr, err := loadReplicaDestroyedError(r.ctx, r.store.Engine(), r.RangeID)
if err != nil {
return err
}
Expand Down Expand Up @@ -640,7 +641,7 @@ func (r *Replica) destroyDataRaftMuLocked() error {
tombstone := &roachpb.RaftTombstone{
NextReplicaID: desc.NextReplicaID,
}
if err := engine.MVCCPutProto(context.Background(), batch, nil, tombstoneKey, hlc.ZeroTimestamp, nil, tombstone); err != nil {
if err := engine.MVCCPutProto(r.ctx, batch, nil, tombstoneKey, hlc.ZeroTimestamp, nil, tombstone); err != nil {
return err
}

Expand Down Expand Up @@ -2376,7 +2377,7 @@ func (r *Replica) sendRaftMessage(msg raftpb.Message) {
r.store.Stopper().RunWorker(func() {
defer r.CloseOutSnap()
if err := r.store.ctx.Transport.SendSnapshot(
context.Background(),
r.ctx,
SnapshotRequest_Header{
RangeDescriptor: *r.Desc(),
RaftMessageRequest: RaftMessageRequest{
Expand Down
2 changes: 1 addition & 1 deletion storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ func intersectSpan(
}
if bytes.Compare(span.Key, keys.LocalRangeMax) < 0 {
if bytes.Compare(span.EndKey, keys.LocalRangeMax) >= 0 {
log.Fatalf(context.Background(), "a local intent range may not have a non-local portion: %s", span)
panic(fmt.Sprintf("a local intent range may not have a non-local portion: %s", span))
}
if containsKeyRange(desc, span.Key, span.EndKey) {
return &span, nil
Expand Down
12 changes: 6 additions & 6 deletions storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var _ raft.Storage = (*Replica)(nil)
// InitialState implements the raft.Storage interface.
// InitialState requires that the replica lock be held.
func (r *Replica) InitialState() (raftpb.HardState, raftpb.ConfState, error) {
hs, err := loadHardState(context.Background(), r.store.Engine(), r.RangeID)
hs, err := loadHardState(r.ctx, r.store.Engine(), r.RangeID)
// For uninitialized ranges, membership is unknown at this point.
if raft.IsEmptyHardState(hs) || err != nil {
return raftpb.HardState{}, raftpb.ConfState{}, err
Expand All @@ -74,7 +74,7 @@ func (r *Replica) InitialState() (raftpb.HardState, raftpb.ConfState, error) {
func (r *Replica) Entries(lo, hi, maxBytes uint64) ([]raftpb.Entry, error) {
snap := r.store.NewSnapshot()
defer snap.Close()
return entries(context.Background(), snap, r.RangeID, r.store.raftEntryCache, lo, hi, maxBytes)
return entries(r.ctx, snap, r.RangeID, r.store.raftEntryCache, lo, hi, maxBytes)
}

func entries(
Expand Down Expand Up @@ -251,7 +251,7 @@ func (r *Replica) raftTruncatedStateLocked(ctx context.Context) (roachpb.RaftTru
// FirstIndex implements the raft.Storage interface.
// FirstIndex requires that the replica lock is held.
func (r *Replica) FirstIndex() (uint64, error) {
ts, err := r.raftTruncatedStateLocked(context.Background())
ts, err := r.raftTruncatedStateLocked(r.ctx)
if err != nil {
return 0, err
}
Expand All @@ -269,7 +269,7 @@ func (r *Replica) GetFirstIndex() (uint64, error) {
// Snapshot implements the raft.Storage interface.
// Snapshot requires that the replica lock is held.
func (r *Replica) Snapshot() (raftpb.Snapshot, error) {
snap, err := r.SnapshotWithContext(context.Background())
snap, err := r.SnapshotWithContext(r.ctx)
if err != nil {
return raftpb.Snapshot{}, err
}
Expand Down Expand Up @@ -306,15 +306,15 @@ func (r *Replica) SnapshotWithContext(ctx context.Context) (*OutgoingSnapshot, e
startKey := r.mu.state.Desc.StartKey

sp := r.store.Tracer().StartSpan("snapshot")
ctxInner := opentracing.ContextWithSpan(context.Background(), sp)
ctxInner := opentracing.ContextWithSpan(r.ctx, sp)
defer sp.Finish()
snap := r.store.NewSnapshot()
log.Eventf(ctxInner, "new engine snapshot for replica %s", r)

// Delegate to a static function to make sure that we do not depend
// on any indirect calls to r.store.Engine() (or other in-memory
// state of the Replica). Everything must come from the snapshot.
snapData, err := snapshot(context.Background(), snap, rangeID, r.store.raftEntryCache, startKey)
snapData, err := snapshot(r.ctx, snap, rangeID, r.store.raftEntryCache, startKey)
if err != nil {
log.Errorf(ctxInner, "%s: error generating snapshot: %s", r, err)
return nil, err
Expand Down
4 changes: 1 addition & 3 deletions storage/replica_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,8 @@ func (r *Replica) handleTrigger(
}
}

// TODO(radu): we should provide a base context that contains the
// node and range IDs.
splitTriggerPostCommit(
context.Background(),
r.ctx,
trigger.split.RightDeltaMS,
&trigger.split.SplitTrigger,
r,
Expand Down
16 changes: 9 additions & 7 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ func IterateRangeDescriptors(ctx context.Context,
return fn(desc)
}

_, err := engine.MVCCIterate(context.Background(), eng, start, end, hlc.MaxTimestamp, false /* !consistent */, nil, /* txn */
_, err := engine.MVCCIterate(ctx, eng, start, end, hlc.MaxTimestamp, false /* !consistent */, nil, /* txn */
false /* !reverse */, kvToDesc)
log.Eventf(ctx, "iterated over %d keys to find %d range descriptors (by suffix: %v)",
allCount, matchCount, bySuffix)
Expand Down Expand Up @@ -1227,11 +1227,11 @@ func (s *Store) Bootstrap(ident roachpb.StoreIdent, stopper *stop.Stopper) error
return errors.Errorf("store %s: unable to access: %s", s.engine, err)
} else if len(kvs) > 0 {
// See if this is an already-bootstrapped store.
ok, err := engine.MVCCGetProto(context.Background(), s.engine, keys.StoreIdentKey(), hlc.ZeroTimestamp, true, nil, &s.Ident)
if err != nil {
if ok, err := engine.MVCCGetProto(
s.Ctx(), s.engine, keys.StoreIdentKey(), hlc.ZeroTimestamp, true, nil, &s.Ident,
); err != nil {
return errors.Errorf("store %s is non-empty but cluster ID could not be determined: %s", s.engine, err)
}
if ok {
} else if ok {
return errors.Errorf("store %s already belongs to cockroach cluster %s", s.engine, s.Ident.ClusterID)
}
keyVals := []string{}
Expand All @@ -1240,7 +1240,7 @@ func (s *Store) Bootstrap(ident roachpb.StoreIdent, stopper *stop.Stopper) error
}
return errors.Errorf("store %s is non-empty but does not contain store metadata (first %d key/values: %s)", s.engine, len(keyVals), keyVals)
}
err = engine.MVCCPutProto(context.Background(), s.engine, nil,
err = engine.MVCCPutProto(s.Ctx(), s.engine, nil,
keys.StoreIdentKey(), hlc.ZeroTimestamp, nil, &s.Ident)
if err != nil {
return err
Expand Down Expand Up @@ -2926,7 +2926,9 @@ func (s *Store) tryGetOrCreateReplica(
// a stale message.
tombstoneKey := keys.RaftTombstoneKey(rangeID)
var tombstone roachpb.RaftTombstone
if ok, err := engine.MVCCGetProto(context.Background(), s.Engine(), tombstoneKey, hlc.ZeroTimestamp, true, nil, &tombstone); err != nil {
if ok, err := engine.MVCCGetProto(
s.Ctx(), s.Engine(), tombstoneKey, hlc.ZeroTimestamp, true, nil, &tombstone,
); err != nil {
return nil, false, err
} else if ok {
if replicaID != 0 && replicaID < tombstone.NextReplicaID {
Expand Down
4 changes: 2 additions & 2 deletions storage/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (ls *Stores) ReadBootstrapInfo(bi *gossip.BootstrapInfo) error {
// Find the most recent bootstrap info.
for _, s := range ls.storeMap {
var storeBI gossip.BootstrapInfo
ok, err := engine.MVCCGetProto(context.Background(), s.engine, keys.StoreGossipKey(), hlc.ZeroTimestamp, true, nil, &storeBI)
ok, err := engine.MVCCGetProto(ls.ctx, s.engine, keys.StoreGossipKey(), hlc.ZeroTimestamp, true, nil, &storeBI)
if err != nil {
return err
}
Expand Down Expand Up @@ -315,7 +315,7 @@ func (ls *Stores) updateBootstrapInfo(bi *gossip.BootstrapInfo) error {
ls.latestBI = protoutil.Clone(bi).(*gossip.BootstrapInfo)
// Update all stores.
for _, s := range ls.storeMap {
if err := engine.MVCCPutProto(context.Background(), s.engine, nil, keys.StoreGossipKey(), hlc.ZeroTimestamp, nil, bi); err != nil {
if err := engine.MVCCPutProto(ls.ctx, s.engine, nil, keys.StoreGossipKey(), hlc.ZeroTimestamp, nil, bi); err != nil {
return err
}
}
Expand Down

0 comments on commit 38005f6

Please sign in to comment.