diff --git a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go index 5b47817e7d90..8ee2b9d90374 100644 --- a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go +++ b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go @@ -52,6 +52,11 @@ func (f *subscriptionFeedSource) Next() (streamingccl.Event, bool) { return event, hasMore } +// Error implements the streamingtest.FeedSource interface. +func (f *subscriptionFeedSource) Error() error { + return f.sub.Err() +} + // Close implements the streamingtest.FeedSource interface. func (f *subscriptionFeedSource) Close(ctx context.Context) {} @@ -202,6 +207,10 @@ INSERT INTO d.t2 VALUES (2); err = cg.Wait() require.True(t, errors.Is(err, context.Canceled) || isQueryCanceledError(err)) + rf.ObserveError(ctx, func(err error) bool { + return errors.Is(err, context.Canceled) || isQueryCanceledError(err) + }) + // Testing client.Complete() err = client.Complete(ctx, streaming.StreamID(999), true) require.True(t, testutils.IsError(err, fmt.Sprintf("job %d: not found in system.jobs table", 999)), err) diff --git a/pkg/ccl/streamingccl/streamingtest/BUILD.bazel b/pkg/ccl/streamingccl/streamingtest/BUILD.bazel index bb16c6d7d56a..13ed90594d9f 100644 --- a/pkg/ccl/streamingccl/streamingtest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingtest/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "//pkg/security/username", "//pkg/sql/catalog", "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/desctestutils", "//pkg/sql/rowenc", "//pkg/sql/sem/tree", "//pkg/testutils/serverutils", diff --git a/pkg/ccl/streamingccl/streamingtest/replication_helpers.go b/pkg/ccl/streamingccl/streamingtest/replication_helpers.go index 6d13de1968d1..46bb3cb6212f 100644 --- a/pkg/ccl/streamingccl/streamingtest/replication_helpers.go +++ b/pkg/ccl/streamingccl/streamingtest/replication_helpers.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -31,11 +32,14 @@ import ( "github.com/stretchr/testify/require" ) -// FeedPredicate allows tests to search a ReplicationFeed. -type FeedPredicate func(message streamingccl.Event) bool +// FeedEventPredicate allows tests to search a ReplicationFeed. +type FeedEventPredicate func(message streamingccl.Event) bool -// KeyMatches makes a FeedPredicate that matches a given key. -func KeyMatches(key roachpb.Key) FeedPredicate { +// FeedErrorPredicate allows tests to match an error from ReplicationFeed. +type FeedErrorPredicate func(err error) bool + +// KeyMatches makes a FeedEventPredicate that matches a given key. +func KeyMatches(key roachpb.Key) FeedEventPredicate { return func(msg streamingccl.Event) bool { if msg.Type() != streamingccl.KVEvent { return false @@ -54,9 +58,9 @@ func minResolvedTimestamp(resolvedSpans []jobspb.ResolvedSpan) hlc.Timestamp { return minTimestamp } -// ResolvedAtLeast makes a FeedPredicate that matches when a timestamp has been +// ResolvedAtLeast makes a FeedEventPredicate that matches when a timestamp has been // reached. -func ResolvedAtLeast(lo hlc.Timestamp) FeedPredicate { +func ResolvedAtLeast(lo hlc.Timestamp) FeedEventPredicate { return func(msg streamingccl.Event) bool { if msg.Type() != streamingccl.CheckpointEvent { return false @@ -70,6 +74,11 @@ type FeedSource interface { // Next returns the next event, and a flag indicating if there are more events // to consume. Next() (streamingccl.Event, bool) + + // Error returns the error encountered in the feed. If present, it + // is set after Next() indicates there is no more event to consume. + Error() error + // Close shuts down the source. Close(ctx context.Context) } @@ -93,23 +102,37 @@ func MakeReplicationFeed(t *testing.T, f FeedSource) *ReplicationFeed { // Note: we don't do any buffering here. Therefore, it is required that the key // we want to observe will arrive at some point in the future. func (rf *ReplicationFeed) ObserveKey(ctx context.Context, key roachpb.Key) roachpb.KeyValue { - require.NoError(rf.t, rf.consumeUntil(ctx, KeyMatches(key))) + require.NoError(rf.t, rf.consumeUntil(ctx, KeyMatches(key), func(err error) bool { + return true + })) return *rf.msg.GetKV() } // ObserveResolved consumes the feed until we received resolved timestamp that's at least // as high as the specified low watermark. Returns observed resolved timestamp. func (rf *ReplicationFeed) ObserveResolved(ctx context.Context, lo hlc.Timestamp) hlc.Timestamp { - require.NoError(rf.t, rf.consumeUntil(ctx, ResolvedAtLeast(lo))) + require.NoError(rf.t, rf.consumeUntil(ctx, ResolvedAtLeast(lo), func(err error) bool { + return true + })) return minResolvedTimestamp(*rf.msg.GetResolvedSpans()) } +// ObserveError consumes the feed until the feed is exhausted, and the final error should +// match 'errPred'. +func (rf *ReplicationFeed) ObserveError(ctx context.Context, errPred FeedErrorPredicate) { + require.NoError(rf.t, rf.consumeUntil(ctx, func(message streamingccl.Event) bool { + return false + }, errPred)) +} + // Close cleans up any resources. func (rf *ReplicationFeed) Close(ctx context.Context) { rf.f.Close(ctx) } -func (rf *ReplicationFeed) consumeUntil(ctx context.Context, pred FeedPredicate) error { +func (rf *ReplicationFeed) consumeUntil( + ctx context.Context, pred FeedEventPredicate, errPred FeedErrorPredicate, +) error { const maxWait = 20 * time.Second doneCh := make(chan struct{}) mu := struct { @@ -139,6 +162,9 @@ func (rf *ReplicationFeed) consumeUntil(ctx context.Context, pred FeedPredicate) mu.Unlock() if err != nil { rf.t.Fatal(err) + } else if rf.f.Error() != nil { + require.True(rf.t, errPred(rf.f.Error())) + return nil } else { rf.t.Fatalf("ran out of rows after processing %d rows", rowCount) } @@ -225,3 +251,10 @@ func (rh *ReplicationHelper) CreateTenant( require.NoError(t, tenantConn.Close()) } } + +// TableSpan returns primary index span for a table. +func (rh *ReplicationHelper) TableSpan(codec keys.SQLCodec, table string) roachpb.Span { + desc := desctestutils.TestingGetPublicTableDescriptor( + rh.SysServer.DB(), codec, "d", table) + return desc.PrimaryIndexSpan(codec) +} diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go index a0b7a115f715..568d6c0528cb 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go @@ -50,6 +50,7 @@ type eventStream struct { // Fields below initialized when Start called. rf *rangefeed.RangeFeed // Currently running rangefeed. streamGroup ctxgroup.Group // Context group controlling stream execution. + doneChan chan struct{} // Channel signaled to close the stream loop. eventsCh chan kvcoord.RangeFeedMessage // Channel receiving rangefeed events. errCh chan error // Signaled when error occurs in rangefeed. streamCh chan tree.Datums // Channel signaled to forward datums to consumer. @@ -89,6 +90,8 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) error { // Stream channel receives datums to be sent to the consumer. s.streamCh = make(chan tree.Datums) + s.doneChan = make(chan struct{}) + // Common rangefeed options. opts := []rangefeed.Option{ rangefeed.WithOnCheckpoint(s.onCheckpoint), @@ -216,6 +219,7 @@ func (s *eventStream) Close(ctx context.Context) { s.rf.Close() s.acc.Close(ctx) + close(s.doneChan) if err := s.streamGroup.Wait(); err != nil { // Note: error in close is normal; we expect to be terminated with context canceled. log.Errorf(ctx, "partition stream %d terminated with error %v", s.streamID, err) @@ -445,6 +449,8 @@ func (s *eventStream) streamLoop(ctx context.Context, frontier *span.Frontier) e select { case <-ctx.Done(): return ctx.Err() + case <-s.doneChan: + return nil case ev := <-s.eventsCh: switch { case ev.Val != nil: diff --git a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go index 2e2028005d85..2f793464031e 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go +++ b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go @@ -14,6 +14,7 @@ import ( "net/http" "net/http/httptest" "sort" + "strings" "testing" "time" @@ -135,6 +136,11 @@ func (f *pgConnReplicationFeedSource) Next() (streamingccl.Event, bool) { return e, true } +// Error implements the streamingtest.FeedSource interface. +func (f *pgConnReplicationFeedSource) Error() error { + return f.rows.Err() +} + // startReplication starts replication stream, specified as query and its args. func startReplication( t *testing.T, @@ -315,7 +321,9 @@ func TestStreamPartition(t *testing.T) { srcTenant.SQL.Exec(t, ` CREATE DATABASE d; CREATE TABLE d.t1(i int primary key, a string, b string); +CREATE TABLE d.t2(i int primary key, a string, b string); INSERT INTO d.t1 (i) VALUES (42); +INSERT INTO d.t2 (i) VALUES (42); USE d; `) @@ -325,6 +333,31 @@ USE d; const streamPartitionQuery = `SELECT * FROM crdb_internal.stream_partition($1, $2)` t1Descr := desctestutils.TestingGetPublicTableDescriptor(h.SysServer.DB(), srcTenant.Codec, "d", "t1") + t2Descr := desctestutils.TestingGetPublicTableDescriptor(h.SysServer.DB(), srcTenant.Codec, "d", "t2") + + t.Run("stream-table-cursor-error", func(t *testing.T) { + _, feed := startReplication(t, h, makePartitionStreamDecoder, + streamPartitionQuery, streamID, encodeSpec(t, h, srcTenant, hlc.Timestamp{}, "t2")) + defer feed.Close(ctx) + + subscribedSpan := h.TableSpan(srcTenant.Codec, "t2") + // Send a ClearRange to trigger rows cursor to return internal error from rangefeed. + // Choose 't2' so that it doesn't trigger error on other registered span in rangefeeds, + // affecting other tests. + _, err := kv.SendWrapped(ctx, h.SysServer.DB().NonTransactionalSender(), &roachpb.ClearRangeRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: subscribedSpan.Key, + EndKey: subscribedSpan.EndKey, + }, + }) + require.Nil(t, err) + + expected := streamingtest.EncodeKV(t, srcTenant.Codec, t2Descr, 42) + feed.ObserveKey(ctx, expected.Key) + feed.ObserveError(ctx, func(err error) bool { + return strings.Contains(err.Error(), "unexpected MVCC history mutation") + }) + }) t.Run("stream-table", func(t *testing.T) { _, feed := startReplication(t, h, makePartitionStreamDecoder, @@ -373,7 +406,7 @@ USE d; t.Run("stream-batches-events", func(t *testing.T) { srcTenant.SQL.Exec(t, ` -CREATE TABLE t2( +CREATE TABLE t3( i INT PRIMARY KEY, a STRING, b STRING, @@ -384,7 +417,7 @@ CREATE TABLE t2( addRows := func(start, n int) { // Insert few more rows into the table. We expect for i := start; i < n; i++ { - srcTenant.SQL.Exec(t, "INSERT INTO t2 (i, a, b) VALUES ($1, $2, $3)", + srcTenant.SQL.Exec(t, "INSERT INTO t3 (i, a, b) VALUES ($1, $2, $3)", i, fmt.Sprintf("i=%d", i), fmt.Sprintf("10-i=%d", 10-i)) } } @@ -615,13 +648,8 @@ USE d; defer feed.Close(ctx) // TODO(casper): Replace with DROP TABLE once drop table uses the MVCC-compatible DelRange - tableSpan := func(table string) roachpb.Span { - desc := desctestutils.TestingGetPublicTableDescriptor( - h.SysServer.DB(), srcTenant.Codec, "d", table) - return desc.PrimaryIndexSpan(srcTenant.Codec) - } - - t1Span, t2Span, t3Span := tableSpan("t1"), tableSpan("t2"), tableSpan("t3") + t1Span, t2Span, t3Span := h.TableSpan(srcTenant.Codec, "t1"), + h.TableSpan(srcTenant.Codec, "t2"), h.TableSpan(srcTenant.Codec, "t3") // Range deleted is outside the subscribed spans require.NoError(t, h.SysServer.DB().DelRangeUsingTombstone(ctx, t2Span.EndKey, t3Span.Key)) // Range is t1s - t2e, emitting 2 events, t1s - t1e and t2s - t2e.