Skip to content

Commit

Permalink
distsqlrun: add tests for cockroachdb#13989
Browse files Browse the repository at this point in the history
We didn't have any tests exercising a stream failing to connect within
the registry's timeout.
  • Loading branch information
andreimatei committed Mar 14, 2017
1 parent ca4de0e commit 395b3f6
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 28 deletions.
3 changes: 3 additions & 0 deletions pkg/sql/distsqlrun/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ func (rb *RowBuffer) PushRow(row sqlbase.EncDatumRow) bool {

// Close is part of the RowReceiver interface.
func (rb *RowBuffer) Close(err error) {
if rb.Closed {
panic("RowBuffer already closed")
}
rb.Err = err
rb.Closed = true
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsqlrun/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (f *Flow) Start(ctx context.Context, doneFn func()) {
// set up the WaitGroup counter before.
f.waitGroup.Add(len(f.inboundStreams) + len(f.outboxes) + len(f.processors))

f.flowRegistry.RegisterFlow(ctx, f.id, f, f.inboundStreams)
f.flowRegistry.RegisterFlow(ctx, f.id, f, f.inboundStreams, flowStreamDefaultTimeout)
if log.V(1) {
log.Infof(ctx, "registered flow %s", f.id.Short())
}
Expand Down
19 changes: 11 additions & 8 deletions pkg/sql/distsqlrun/flow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
"github.com/pkg/errors"
)

// flowStreamTimeout is the amount of time incoming streams wait for a flow to
// flowStreamDefaultTimeout is the amount of time incoming streams wait for a flow to
// be set up before erroring out.
const flowStreamTimeout time.Duration = 10 * time.Second
const flowStreamDefaultTimeout time.Duration = 10 * time.Second

// inboundStreamInfo represents the endpoint where a data stream from another
// node connects to a flow. The external node initiates this process through a
Expand Down Expand Up @@ -125,10 +125,13 @@ func (fr *flowRegistry) releaseEntryLocked(id FlowID) {
// flow from the registry.
//
// inboundStreams are all the remote streams that will be connected into this
// flow. If any of them is not connected within a timeout, errors are
// propagated.
// flow. If any of them is not connected within timeout, errors are propagated.
func (fr *flowRegistry) RegisterFlow(
ctx context.Context, id FlowID, f *Flow, inboundStreams map[StreamID]*inboundStreamInfo,
ctx context.Context,
id FlowID,
f *Flow,
inboundStreams map[StreamID]*inboundStreamInfo,
timeout time.Duration,
) {
fr.Lock()
defer fr.Unlock()
Expand All @@ -147,7 +150,7 @@ func (fr *flowRegistry) RegisterFlow(

if len(inboundStreams) > 0 {
// Set up a function to time out inbound streams after a while.
entry.streamTimer = time.AfterFunc(flowStreamTimeout, func() {
entry.streamTimer = time.AfterFunc(timeout, func() {
fr.Lock()
defer fr.Unlock()
numTimedOut := 0
Expand All @@ -171,7 +174,7 @@ func (fr *flowRegistry) RegisterFlow(
"flow id:%s : %d inbound streams timed out after %s; propagated error throughout flow",
id,
numTimedOut,
flowStreamTimeout,
timeout,
)
}
})
Expand Down Expand Up @@ -239,7 +242,7 @@ func (fr *flowRegistry) ConnectInboundStream(
) (*Flow, *inboundStreamInfo, error) {
fr.Lock()
defer fr.Unlock()
entry := fr.waitForFlowLocked(flowID, flowStreamTimeout)
entry := fr.waitForFlowLocked(flowID, flowStreamDefaultTimeout)
if entry == nil {
return nil, nil, errors.Errorf("flow %s not found", flowID)
}
Expand Down
86 changes: 67 additions & 19 deletions pkg/sql/distsqlrun/flow_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,27 @@ import (

"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/pkg/errors"
)

// lookupFlow returns the registered flow with the given ID. If no such flow is
// registered, waits until it gets registered - up to the given timeout. If the
// timeout elapses, returns nil.
func lookupFlow(fr *flowRegistry, id FlowID, timeout time.Duration) *Flow {
// timeout elapses and the flow is not registered, the bool return value will be
// false.
//
// A copy of the flowRegistry's flowEntry is returned so that the entry's fields
// can be accessed without locking.
func lookupFlow(fr *flowRegistry, id FlowID, timeout time.Duration) (flowEntry, bool) {
fr.Lock()
defer fr.Unlock()
entry := fr.waitForFlowLocked(id, timeout)
if entry == nil {
return nil
return flowEntry{}, false
}
return entry.flow
return *entry, true
}

func TestFlowRegistry(t *testing.T) {
Expand All @@ -62,35 +68,35 @@ func TestFlowRegistry(t *testing.T) {

// -- Lookup, register, lookup, unregister, lookup. --

if f := lookupFlow(reg, id1, 0); f != nil {
if _, ok := lookupFlow(reg, id1, 0); ok {
t.Error("looked up unregistered flow")
}

ctx := context.Background()
reg.RegisterFlow(ctx, id1, f1, nil /* inboundStreams */)
reg.RegisterFlow(ctx, id1, f1, nil /* inboundStreams */, flowStreamDefaultTimeout)

if f := lookupFlow(reg, id1, 0); f != f1 {
if fe, _ := lookupFlow(reg, id1, 0); fe.flow != f1 {
t.Error("couldn't lookup previously registered flow")
}

reg.UnregisterFlow(id1)

if f := lookupFlow(reg, id1, 0); f != nil {
if _, ok := lookupFlow(reg, id1, 0); ok {
t.Error("looked up unregistered flow")
}

// -- Lookup with timeout, register in the meantime. --

go func() {
time.Sleep(jiffy)
reg.RegisterFlow(ctx, id1, f1, nil /* inboundStreams */)
reg.RegisterFlow(ctx, id1, f1, nil /* inboundStreams */, flowStreamDefaultTimeout)
}()

if f := lookupFlow(reg, id1, 10*jiffy); f != f1 {
if fe, _ := lookupFlow(reg, id1, 10*jiffy); fe.flow != f1 {
t.Error("couldn't lookup registered flow (with wait)")
}

if f := lookupFlow(reg, id1, 0); f != f1 {
if fe, _ := lookupFlow(reg, id1, 0); fe.flow != f1 {
t.Error("couldn't lookup registered flow")
}

Expand All @@ -100,21 +106,21 @@ func TestFlowRegistry(t *testing.T) {
wg.Add(2)

go func() {
if f := lookupFlow(reg, id2, 10*jiffy); f != f2 {
if fe, _ := lookupFlow(reg, id2, 10*jiffy); fe.flow != f2 {
t.Error("couldn't lookup registered flow (with wait)")
}
wg.Done()
}()

go func() {
if f := lookupFlow(reg, id2, 10*jiffy); f != f2 {
if fe, _ := lookupFlow(reg, id2, 10*jiffy); fe.flow != f2 {
t.Error("couldn't lookup registered flow (with wait)")
}
wg.Done()
}()

time.Sleep(jiffy)
reg.RegisterFlow(ctx, id2, f2, nil /* inboundStreams */)
reg.RegisterFlow(ctx, id2, f2, nil /* inboundStreams */, flowStreamDefaultTimeout)
wg.Wait()

// -- Multiple lookups, with the first one failing. --
Expand All @@ -125,32 +131,74 @@ func TestFlowRegistry(t *testing.T) {
wg1.Add(1)
wg2.Add(1)
go func() {
if f := lookupFlow(reg, id3, jiffy); f != nil {
if _, ok := lookupFlow(reg, id3, jiffy); ok {
t.Error("expected lookup to fail")
}
wg1.Done()
}()

go func() {
if f := lookupFlow(reg, id3, 10*jiffy); f != f3 {
if fe, _ := lookupFlow(reg, id3, 10*jiffy); fe.flow != f3 {
t.Error("couldn't lookup registered flow (with wait)")
}
wg2.Done()
}()

wg1.Wait()
reg.RegisterFlow(ctx, id3, f3, nil /* inboundStreams */)
reg.RegisterFlow(ctx, id3, f3, nil /* inboundStreams */, flowStreamDefaultTimeout)
wg2.Wait()

// -- Lookup with huge timeout, register in the meantime. --

go func() {
time.Sleep(jiffy)
reg.RegisterFlow(ctx, id4, f4, nil /* inboundStreams */)
reg.RegisterFlow(ctx, id4, f4, nil /* inboundStreams */, flowStreamDefaultTimeout)
}()

// This should return in a jiffy.
if f := lookupFlow(reg, id4, time.Hour); f != f4 {
if fe, _ := lookupFlow(reg, id4, time.Hour); fe.flow != f4 {
t.Error("couldn't lookup registered flow (with wait)")
}
}

// Test that, if inbound streams are not connected within the timeout, errors
// are propagated to their consumers future attempts to connect them fail.
func TestStreamConnectionTimeout(t *testing.T) {
defer leaktest.AfterTest(t)()
reg := makeFlowRegistry()

jiffy := time.Nanosecond

// Register a flow with a very low timeout. After it times out, we'll attempt
// to connect a stream, but it'll be too late.
id1 := FlowID{uuid.MakeV4()}
f1 := &Flow{}
streamID1 := StreamID(1)
consumer := &RowBuffer{}
wg := &sync.WaitGroup{}
wg.Add(1)
inboundStreams := map[StreamID]*inboundStreamInfo{
streamID1: {receiver: consumer, waitGroup: wg},
}
reg.RegisterFlow(context.TODO(), id1, f1, inboundStreams, jiffy)

testutils.SucceedsSoon(t, func() error {
fe, ok := lookupFlow(reg, id1, jiffy)
if !ok {
t.Fatalf("couldn't find flow entry")
}
if !fe.inboundStreams[streamID1].timedOut {
return errors.Errorf("not timed out yet")
}
return nil
})

if !consumer.Closed {
t.Fatalf("expected consumer to have been closed when the flow timed out")
}

if _, _, err := reg.ConnectInboundStream(id1, streamID1); !testutils.IsError(
err, "came too late") {
t.Fatalf("expected %q, got: %v", "came too late", err)
}
}

0 comments on commit 395b3f6

Please sign in to comment.