From cf0f7ece21bc46fd4c9a9c3665f6fd4f91f54a0a Mon Sep 17 00:00:00 2001 From: Alfonso Subiotto Marques Date: Wed, 8 Jul 2020 18:41:25 +0200 Subject: [PATCH] colrpc: propagate error immediately in Inbox The Inbox would previously buffer any metadata received from the remote side, including errors. This could cause issues for special errors that are swallowed during draining but not execution, because all errors would only be returned during draining. Release note: None (bug not present in release) --- pkg/sql/colflow/colrpc/colrpc_test.go | 47 ++++++++++++++++++++++++--- pkg/sql/colflow/colrpc/inbox.go | 6 ++++ pkg/sql/colflow/colrpc/outbox_test.go | 17 ++++++---- 3 files changed, 58 insertions(+), 12 deletions(-) diff --git a/pkg/sql/colflow/colrpc/colrpc_test.go b/pkg/sql/colflow/colrpc/colrpc_test.go index ecb240851514..bf31a7ddc7ac 100644 --- a/pkg/sql/colflow/colrpc/colrpc_test.go +++ b/pkg/sql/colflow/colrpc/colrpc_test.go @@ -396,9 +396,17 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) { // and the Outbox generating random batches. numNextsBeforeDrain := rng.Intn(10) + expectedError := errors.New("someError") + testCases := []struct { name string numBatches int + // overrideExpectedMetadata, if set, will override the expected metadata + // the test harness uses. + overrideExpectedMetadata []execinfrapb.ProducerMetadata + // verifyExpectedMetadata, if set, will override the equality check the + // metadata test harness uses. + verifyExpectedMetadata func([]execinfrapb.ProducerMetadata) bool // test is the body of the test to be run. Metadata should be returned to // be verified. test func(context.Context, *Inbox) []execinfrapb.ProducerMetadata @@ -434,6 +442,29 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) { return inbox.DrainMeta(ctx) }, }, + { + // ErrorPropagationDuringExecution is a scenario in which the outbox + // returns an error after the last batch. + name: "ErrorPropagationDuringExecution", + numBatches: 4, + overrideExpectedMetadata: []execinfrapb.ProducerMetadata{{Err: expectedError}}, + verifyExpectedMetadata: func(meta []execinfrapb.ProducerMetadata) bool { + return len(meta) == 1 && errors.Is(meta[0].Err, expectedError) + }, + test: func(ctx context.Context, inbox *Inbox) []execinfrapb.ProducerMetadata { + for { + var b coldata.Batch + if err := colexecerror.CatchVectorizedRuntimeError(func() { + b = inbox.Next(ctx) + }); err != nil { + return []execinfrapb.ProducerMetadata{{Err: err}} + } + if b.Length() == 0 { + return nil + } + } + }, + }, } for _, tc := range testCases { @@ -457,14 +488,16 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) { ) ) - const expectedMeta = "someError" - outboxMemAcc := testMemMonitor.MakeBoundAccount() defer outboxMemAcc.Close(ctx) + expectedMetadata := []execinfrapb.ProducerMetadata{{RowNum: &execinfrapb.RemoteProducerMetadata_RowNum{LastMsg: true}}} + if tc.overrideExpectedMetadata != nil { + expectedMetadata = tc.overrideExpectedMetadata + } outbox, err := NewOutbox(colmem.NewAllocator(ctx, &outboxMemAcc, coldata.StandardColumnFactory), input, typs, []execinfrapb.MetadataSource{ execinfrapb.CallbackMetadataSource{ DrainMetaCb: func(context.Context) []execinfrapb.ProducerMetadata { - return []execinfrapb.ProducerMetadata{{Err: errors.New(expectedMeta)}} + return expectedMetadata }, }, }, nil /* toClose */) @@ -499,8 +532,12 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) { require.True(t, atomic.LoadUint32(&canceled) == 0) // Verify that we received the expected metadata. - require.True(t, len(meta) == 1) - require.True(t, testutils.IsError(meta[0].Err, expectedMeta), meta[0].Err) + if tc.verifyExpectedMetadata != nil { + require.True(t, tc.verifyExpectedMetadata(meta), "unexpected meta: %v", meta) + } else { + require.True(t, len(meta) == len(expectedMetadata)) + require.Equal(t, expectedMetadata, meta, "unexpected meta: %v", meta) + } }) } } diff --git a/pkg/sql/colflow/colrpc/inbox.go b/pkg/sql/colflow/colrpc/inbox.go index e38275fc1617..299eeb8097be 100644 --- a/pkg/sql/colflow/colrpc/inbox.go +++ b/pkg/sql/colflow/colrpc/inbox.go @@ -315,6 +315,12 @@ func (i *Inbox) Next(ctx context.Context) coldata.Batch { if !ok { continue } + if meta.Err != nil { + // If an error was encountered, it needs to be propagated immediately. + // All other metadata will simply be buffered and returned in + // DrainMeta. + colexecerror.ExpectedError(meta.Err) + } i.stateMu.bufferedMeta = append(i.stateMu.bufferedMeta, meta) } // Continue until we get the next batch or EOF. diff --git a/pkg/sql/colflow/colrpc/outbox_test.go b/pkg/sql/colflow/colrpc/outbox_test.go index c1517d6141bc..7fb2f6d3e108 100644 --- a/pkg/sql/colflow/colrpc/outbox_test.go +++ b/pkg/sql/colflow/colrpc/outbox_test.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colmem" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -61,16 +62,18 @@ func TestOutboxCatchesPanics(t *testing.T) { streamHandlerErrCh := handleStream(ctx, inbox, rpcLayer.server, func() { close(rpcLayer.server.csChan) }) - // The outbox will be sending the panic as metadata eagerly. This Next call - // is valid, but should return a zero-length batch, indicating that the caller - // should call DrainMeta. - require.True(t, inbox.Next(ctx).Length() == 0) + // The outbox will be sending the panic as eagerly. This Next call will + // propagate the panic. + err = colexecerror.CatchVectorizedRuntimeError(func() { + inbox.Next(ctx).Length() + }) + require.Error(t, err) - // Expect the panic as an error in DrainMeta. + // Expect no metadata. meta := inbox.DrainMeta(ctx) + require.True(t, len(meta) == 0) - require.True(t, len(meta) == 1) - require.True(t, testutils.IsError(meta[0].Err, "runtime error: index out of range"), meta[0]) + require.True(t, testutils.IsError(err, "runtime error: index out of range"), err) require.NoError(t, <-streamHandlerErrCh) wg.Wait()