Skip to content

Commit

Permalink
kvstreamer: fix the usage of the range iterator
Browse files Browse the repository at this point in the history
Previously, after `Seek`ing the range iterator to the next key in the
batch of requests in the streamer we forgot to check the validity of the
iterator. In particular, this could lead to a crash of the process if
`Seek` encountered an error for whatever reason. In practice, I've only
observed this when running TPCH with high concurrency when GOMEMLIMIT is
set.

The bug was introduced in an innocently-looking refactor in
041b104.

Epic: None

Release note (bug fix): CockroachDB could previously crash in rare
circumstances when evaluating lookup and index joins. The bug is present
since 22.2.0 release. Temporary workaround without upgrading to the
release with this fix is changing the value of undocumented cluster
setting `sql.distsql.use_streamer.enabled` to `false`.
  • Loading branch information
yuzefovich committed Dec 21, 2022
1 parent 10266a3 commit 3125bc7
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,10 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re
}
var reqsKeysScratch []roachpb.Key
var newNumRangesPerScanRequestMemoryUsage int64
for {
for ; ; ri.Seek(ctx, rs.Key, scanDir) {
if !ri.Valid() {
return ri.Error()
}
// Find all requests that touch the current range.
var singleRangeReqs []roachpb.RequestUnion
var positions []int
Expand Down Expand Up @@ -626,11 +629,12 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re
requestsToServe = append(requestsToServe, r)
s.enqueuedSingleRangeRequests += len(singleRangeReqs)

if !ri.NeedAnother(rs) {
// This was the last range.
if allRequestsAreWithinSingleRange || !ri.NeedAnother(rs) {
// This was the last range. Breaking here rather than Seek'ing the
// iterator to RKeyMax (and, thus, invalidating it) allows us to
// avoid adding a confusing message into the trace.
break
}
ri.Seek(ctx, rs.Key, scanDir)
}

if streamerLocked {
Expand Down

0 comments on commit 3125bc7

Please sign in to comment.