Skip to content

Commit

Permalink
Merge #85867
Browse files Browse the repository at this point in the history
85867: streamingccl: fix producer stream to properly propagate error r=gh-casper a=gh-casper

Previously when rangefeed returns internal error, the error
didn't get propagated through the producer stream cursor.

The bug is that the SQL receiver's result writer only sends
error after all ValueGenerators close. However, closing eventStream
(i.e., the producer stream ValueGenerator) relies on shutting
down of streamLoop which is not explicitly shut down during Close.

Release justification: bug fixes and low-risk updates to
new functionality

Release note: None

Co-authored-by: Casper <[email protected]>
  • Loading branch information
craig[bot] and gh-casper committed Aug 22, 2022
2 parents aaf50e9 + c68231c commit 41c005e
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ func (f *subscriptionFeedSource) Next() (streamingccl.Event, bool) {
return event, hasMore
}

// Error implements the streamingtest.FeedSource interface.
func (f *subscriptionFeedSource) Error() error {
return f.sub.Err()
}

// Close implements the streamingtest.FeedSource interface.
func (f *subscriptionFeedSource) Close(ctx context.Context) {}

Expand Down Expand Up @@ -202,6 +207,10 @@ INSERT INTO d.t2 VALUES (2);
err = cg.Wait()
require.True(t, errors.Is(err, context.Canceled) || isQueryCanceledError(err))

rf.ObserveError(ctx, func(err error) bool {
return errors.Is(err, context.Canceled) || isQueryCanceledError(err)
})

// Testing client.Complete()
err = client.Complete(ctx, streaming.StreamID(999), true)
require.True(t, testutils.IsError(err, fmt.Sprintf("job %d: not found in system.jobs table", 999)), err)
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingtest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//pkg/security/username",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/testutils/serverutils",
Expand Down
51 changes: 42 additions & 9 deletions pkg/ccl/streamingccl/streamingtest/replication_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand All @@ -31,11 +32,14 @@ import (
"github.com/stretchr/testify/require"
)

// FeedPredicate allows tests to search a ReplicationFeed.
type FeedPredicate func(message streamingccl.Event) bool
// FeedEventPredicate allows tests to search a ReplicationFeed.
type FeedEventPredicate func(message streamingccl.Event) bool

// KeyMatches makes a FeedPredicate that matches a given key.
func KeyMatches(key roachpb.Key) FeedPredicate {
// FeedErrorPredicate allows tests to match an error from ReplicationFeed.
type FeedErrorPredicate func(err error) bool

// KeyMatches makes a FeedEventPredicate that matches a given key.
func KeyMatches(key roachpb.Key) FeedEventPredicate {
return func(msg streamingccl.Event) bool {
if msg.Type() != streamingccl.KVEvent {
return false
Expand All @@ -54,9 +58,9 @@ func minResolvedTimestamp(resolvedSpans []jobspb.ResolvedSpan) hlc.Timestamp {
return minTimestamp
}

// ResolvedAtLeast makes a FeedPredicate that matches when a timestamp has been
// ResolvedAtLeast makes a FeedEventPredicate that matches when a timestamp has been
// reached.
func ResolvedAtLeast(lo hlc.Timestamp) FeedPredicate {
func ResolvedAtLeast(lo hlc.Timestamp) FeedEventPredicate {
return func(msg streamingccl.Event) bool {
if msg.Type() != streamingccl.CheckpointEvent {
return false
Expand All @@ -70,6 +74,11 @@ type FeedSource interface {
// Next returns the next event, and a flag indicating if there are more events
// to consume.
Next() (streamingccl.Event, bool)

// Error returns the error encountered in the feed. If present, it
// is set after Next() indicates there is no more event to consume.
Error() error

// Close shuts down the source.
Close(ctx context.Context)
}
Expand All @@ -93,23 +102,37 @@ func MakeReplicationFeed(t *testing.T, f FeedSource) *ReplicationFeed {
// Note: we don't do any buffering here. Therefore, it is required that the key
// we want to observe will arrive at some point in the future.
func (rf *ReplicationFeed) ObserveKey(ctx context.Context, key roachpb.Key) roachpb.KeyValue {
require.NoError(rf.t, rf.consumeUntil(ctx, KeyMatches(key)))
require.NoError(rf.t, rf.consumeUntil(ctx, KeyMatches(key), func(err error) bool {
return true
}))
return *rf.msg.GetKV()
}

// ObserveResolved consumes the feed until we received resolved timestamp that's at least
// as high as the specified low watermark. Returns observed resolved timestamp.
func (rf *ReplicationFeed) ObserveResolved(ctx context.Context, lo hlc.Timestamp) hlc.Timestamp {
require.NoError(rf.t, rf.consumeUntil(ctx, ResolvedAtLeast(lo)))
require.NoError(rf.t, rf.consumeUntil(ctx, ResolvedAtLeast(lo), func(err error) bool {
return true
}))
return minResolvedTimestamp(*rf.msg.GetResolvedSpans())
}

// ObserveError consumes the feed until the feed is exhausted, and the final error should
// match 'errPred'.
func (rf *ReplicationFeed) ObserveError(ctx context.Context, errPred FeedErrorPredicate) {
require.NoError(rf.t, rf.consumeUntil(ctx, func(message streamingccl.Event) bool {
return false
}, errPred))
}

// Close cleans up any resources.
func (rf *ReplicationFeed) Close(ctx context.Context) {
rf.f.Close(ctx)
}

func (rf *ReplicationFeed) consumeUntil(ctx context.Context, pred FeedPredicate) error {
func (rf *ReplicationFeed) consumeUntil(
ctx context.Context, pred FeedEventPredicate, errPred FeedErrorPredicate,
) error {
const maxWait = 20 * time.Second
doneCh := make(chan struct{})
mu := struct {
Expand Down Expand Up @@ -139,6 +162,9 @@ func (rf *ReplicationFeed) consumeUntil(ctx context.Context, pred FeedPredicate)
mu.Unlock()
if err != nil {
rf.t.Fatal(err)
} else if rf.f.Error() != nil {
require.True(rf.t, errPred(rf.f.Error()))
return nil
} else {
rf.t.Fatalf("ran out of rows after processing %d rows", rowCount)
}
Expand Down Expand Up @@ -225,3 +251,10 @@ func (rh *ReplicationHelper) CreateTenant(
require.NoError(t, tenantConn.Close())
}
}

// TableSpan returns primary index span for a table.
func (rh *ReplicationHelper) TableSpan(codec keys.SQLCodec, table string) roachpb.Span {
desc := desctestutils.TestingGetPublicTableDescriptor(
rh.SysServer.DB(), codec, "d", table)
return desc.PrimaryIndexSpan(codec)
}
6 changes: 6 additions & 0 deletions pkg/ccl/streamingccl/streamproducer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type eventStream struct {
// Fields below initialized when Start called.
rf *rangefeed.RangeFeed // Currently running rangefeed.
streamGroup ctxgroup.Group // Context group controlling stream execution.
doneChan chan struct{} // Channel signaled to close the stream loop.
eventsCh chan kvcoord.RangeFeedMessage // Channel receiving rangefeed events.
errCh chan error // Signaled when error occurs in rangefeed.
streamCh chan tree.Datums // Channel signaled to forward datums to consumer.
Expand Down Expand Up @@ -89,6 +90,8 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) error {
// Stream channel receives datums to be sent to the consumer.
s.streamCh = make(chan tree.Datums)

s.doneChan = make(chan struct{})

// Common rangefeed options.
opts := []rangefeed.Option{
rangefeed.WithOnCheckpoint(s.onCheckpoint),
Expand Down Expand Up @@ -216,6 +219,7 @@ func (s *eventStream) Close(ctx context.Context) {
s.rf.Close()
s.acc.Close(ctx)

close(s.doneChan)
if err := s.streamGroup.Wait(); err != nil {
// Note: error in close is normal; we expect to be terminated with context canceled.
log.Errorf(ctx, "partition stream %d terminated with error %v", s.streamID, err)
Expand Down Expand Up @@ -445,6 +449,8 @@ func (s *eventStream) streamLoop(ctx context.Context, frontier *span.Frontier) e
select {
case <-ctx.Done():
return ctx.Err()
case <-s.doneChan:
return nil
case ev := <-s.eventsCh:
switch {
case ev.Val != nil:
Expand Down
46 changes: 37 additions & 9 deletions pkg/ccl/streamingccl/streamproducer/replication_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net/http"
"net/http/httptest"
"sort"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -135,6 +136,11 @@ func (f *pgConnReplicationFeedSource) Next() (streamingccl.Event, bool) {
return e, true
}

// Error implements the streamingtest.FeedSource interface.
func (f *pgConnReplicationFeedSource) Error() error {
return f.rows.Err()
}

// startReplication starts replication stream, specified as query and its args.
func startReplication(
t *testing.T,
Expand Down Expand Up @@ -315,7 +321,9 @@ func TestStreamPartition(t *testing.T) {
srcTenant.SQL.Exec(t, `
CREATE DATABASE d;
CREATE TABLE d.t1(i int primary key, a string, b string);
CREATE TABLE d.t2(i int primary key, a string, b string);
INSERT INTO d.t1 (i) VALUES (42);
INSERT INTO d.t2 (i) VALUES (42);
USE d;
`)

Expand All @@ -325,6 +333,31 @@ USE d;

const streamPartitionQuery = `SELECT * FROM crdb_internal.stream_partition($1, $2)`
t1Descr := desctestutils.TestingGetPublicTableDescriptor(h.SysServer.DB(), srcTenant.Codec, "d", "t1")
t2Descr := desctestutils.TestingGetPublicTableDescriptor(h.SysServer.DB(), srcTenant.Codec, "d", "t2")

t.Run("stream-table-cursor-error", func(t *testing.T) {
_, feed := startReplication(t, h, makePartitionStreamDecoder,
streamPartitionQuery, streamID, encodeSpec(t, h, srcTenant, hlc.Timestamp{}, "t2"))
defer feed.Close(ctx)

subscribedSpan := h.TableSpan(srcTenant.Codec, "t2")
// Send a ClearRange to trigger rows cursor to return internal error from rangefeed.
// Choose 't2' so that it doesn't trigger error on other registered span in rangefeeds,
// affecting other tests.
_, err := kv.SendWrapped(ctx, h.SysServer.DB().NonTransactionalSender(), &roachpb.ClearRangeRequest{
RequestHeader: roachpb.RequestHeader{
Key: subscribedSpan.Key,
EndKey: subscribedSpan.EndKey,
},
})
require.Nil(t, err)

expected := streamingtest.EncodeKV(t, srcTenant.Codec, t2Descr, 42)
feed.ObserveKey(ctx, expected.Key)
feed.ObserveError(ctx, func(err error) bool {
return strings.Contains(err.Error(), "unexpected MVCC history mutation")
})
})

t.Run("stream-table", func(t *testing.T) {
_, feed := startReplication(t, h, makePartitionStreamDecoder,
Expand Down Expand Up @@ -373,7 +406,7 @@ USE d;

t.Run("stream-batches-events", func(t *testing.T) {
srcTenant.SQL.Exec(t, `
CREATE TABLE t2(
CREATE TABLE t3(
i INT PRIMARY KEY,
a STRING,
b STRING,
Expand All @@ -384,7 +417,7 @@ CREATE TABLE t2(
addRows := func(start, n int) {
// Insert few more rows into the table. We expect
for i := start; i < n; i++ {
srcTenant.SQL.Exec(t, "INSERT INTO t2 (i, a, b) VALUES ($1, $2, $3)",
srcTenant.SQL.Exec(t, "INSERT INTO t3 (i, a, b) VALUES ($1, $2, $3)",
i, fmt.Sprintf("i=%d", i), fmt.Sprintf("10-i=%d", 10-i))
}
}
Expand Down Expand Up @@ -615,13 +648,8 @@ USE d;
defer feed.Close(ctx)

// TODO(casper): Replace with DROP TABLE once drop table uses the MVCC-compatible DelRange
tableSpan := func(table string) roachpb.Span {
desc := desctestutils.TestingGetPublicTableDescriptor(
h.SysServer.DB(), srcTenant.Codec, "d", table)
return desc.PrimaryIndexSpan(srcTenant.Codec)
}

t1Span, t2Span, t3Span := tableSpan("t1"), tableSpan("t2"), tableSpan("t3")
t1Span, t2Span, t3Span := h.TableSpan(srcTenant.Codec, "t1"),
h.TableSpan(srcTenant.Codec, "t2"), h.TableSpan(srcTenant.Codec, "t3")
// Range deleted is outside the subscribed spans
require.NoError(t, h.SysServer.DB().DelRangeUsingTombstone(ctx, t2Span.EndKey, t3Span.Key))
// Range is t1s - t2e, emitting 2 events, t1s - t1e and t2s - t2e.
Expand Down

0 comments on commit 41c005e

Please sign in to comment.