Skip to content

Commit

Permalink
fix(pubsublite): mitigate gRPC stream connection issues (#5382)
Browse files Browse the repository at this point in the history
Mitigates hanging streams by detecting idle streams and reconnecting after a timeout (default 10min for partition assignment streams, 2min for all others). If the user has specified a lower timeout in settings, this will be used for publish, subscribe and commit streams.

The StreamIdleTimer is restarted when the client receives a response on the stream. The stream is reinitialized when the timeout expires. For publish and commit streams, the timeout will still expire even if there is no user activity.
  • Loading branch information
tmdiep authored Feb 8, 2022
1 parent 4b8b4ab commit 8763ef3
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 50 deletions.
3 changes: 2 additions & 1 deletion pubsublite/internal/wire/assigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"reflect"
"sort"
"time"

"github.com/google/uuid"
"google.golang.org/grpc"
Expand Down Expand Up @@ -113,7 +114,7 @@ func newAssigner(ctx context.Context, assignmentClient *vkit.PartitionAssignment
receiveAssignment: receiver,
metadata: newPubsubMetadata(),
}
a.stream = newRetryableStream(ctx, a, settings.Timeout, reflect.TypeOf(pb.PartitionAssignment{}))
a.stream = newRetryableStream(ctx, a, settings.Timeout, 10*time.Minute, reflect.TypeOf(pb.PartitionAssignment{}))
a.metadata.AddClientInfo(settings.Framework)
return a, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pubsublite/internal/wire/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func newCommitter(ctx context.Context, cursor *vkit.CursorClient, settings Recei
acks: acks,
cursorTracker: newCommitCursorTracker(acks),
}
c.stream = newRetryableStream(ctx, c, settings.Timeout, reflect.TypeOf(pb.StreamingCommitCursorResponse{}))
c.stream = newRetryableStream(ctx, c, settings.Timeout, streamIdleTimeout(settings.Timeout), reflect.TypeOf(pb.StreamingCommitCursorResponse{}))
c.metadata.AddClientInfo(settings.Framework)

backgroundTask := c.commitOffsetToStream
Expand Down
2 changes: 1 addition & 1 deletion pubsublite/internal/wire/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (f *singlePartitionPublisherFactory) New(partition int) *singlePartitionPub
metadata: newPubsubMetadata(),
}
pp.batcher = newPublishMessageBatcher(&f.settings, partition, pp.onNewBatch)
pp.stream = newRetryableStream(f.ctx, pp, f.settings.Timeout, reflect.TypeOf(pb.PublishResponse{}))
pp.stream = newRetryableStream(f.ctx, pp, f.settings.Timeout, streamIdleTimeout(f.settings.Timeout), reflect.TypeOf(pb.PublishResponse{}))
pp.metadata.AddTopicRoutingMetadata(pp.topic)
pp.metadata.AddClientInfo(f.settings.Framework)
return pp
Expand Down
104 changes: 89 additions & 15 deletions pubsublite/internal/wire/request_timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,47 @@ import (
"time"
)

type requestTimerStatus int
// minDuration returns the minimum of two durations.
func minDuration(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}

type timerStatus int

const (
requestTimerNew requestTimerStatus = iota
requestTimerStopped
requestTimerTriggered
timerActive timerStatus = iota
timerStopped
timerTriggered
)

// requestTimer bounds the duration of a request and executes `onTimeout` if
// the timer is triggered.
// requestTimer is a one-shot timer used to bound the duration of a request. It
// executes `onTimeout` if the timeout expires.
type requestTimer struct {
onTimeout func()
timeoutErr error
timer *time.Timer
mu sync.Mutex
status requestTimerStatus
status timerStatus
}

func newRequestTimer(duration time.Duration, onTimeout func(), timeoutErr error) *requestTimer {
func newRequestTimer(timeout time.Duration, onTimeout func(), timeoutErr error) *requestTimer {
rt := &requestTimer{
onTimeout: onTimeout,
timeoutErr: timeoutErr,
status: requestTimerNew,
status: timerActive,
}
rt.timer = time.AfterFunc(duration, rt.onTriggered)
rt.timer = time.AfterFunc(timeout, rt.onTriggered)
return rt
}

func (rt *requestTimer) onTriggered() {
rt.mu.Lock()
defer rt.mu.Unlock()
if rt.status == requestTimerNew {
rt.status = requestTimerTriggered
if rt.status == timerActive {
rt.status = timerTriggered
rt.onTimeout()
}
}
Expand All @@ -60,8 +68,8 @@ func (rt *requestTimer) onTriggered() {
func (rt *requestTimer) Stop() {
rt.mu.Lock()
defer rt.mu.Unlock()
if rt.status == requestTimerNew {
rt.status = requestTimerStopped
if rt.status == timerActive {
rt.status = timerStopped
rt.timer.Stop()
}
}
Expand All @@ -71,8 +79,74 @@ func (rt *requestTimer) Stop() {
func (rt *requestTimer) ResolveError(originalErr error) error {
rt.mu.Lock()
defer rt.mu.Unlock()
if rt.status == requestTimerTriggered {
if rt.status == timerTriggered {
return rt.timeoutErr
}
return originalErr
}

// streamIdleTimer is an approximate timer used to detect idle streams.
// `onTimeout` may be called up to (timeout / pollDivisor) after `timeout` has
// expired.
type streamIdleTimer struct {
timeout time.Duration
onTimeout func()
task *periodicTask
mu sync.Mutex
status timerStatus
startTime time.Time
}

const (
pollDivisor = 4
maxPollInterval = time.Minute
)

// newStreamIdleTimer creates an unstarted timer.
func newStreamIdleTimer(timeout time.Duration, onTimeout func()) *streamIdleTimer {
st := &streamIdleTimer{
timeout: timeout,
onTimeout: onTimeout,
status: timerStopped,
}
st.task = newPeriodicTask(minDuration(timeout/pollDivisor, maxPollInterval), st.onPoll)
st.task.Start()
return st
}

// Restart the timer. Should be called when there is stream activity.
func (st *streamIdleTimer) Restart() {
st.mu.Lock()
defer st.mu.Unlock()
st.status = timerActive
st.startTime = time.Now()
}

// Stop the timer to prevent it from expiring.
func (st *streamIdleTimer) Stop() {
st.mu.Lock()
defer st.mu.Unlock()
st.status = timerStopped
}

// Shutdown should be called when the timer is no longer used.
func (st *streamIdleTimer) Shutdown() {
st.Stop()
st.task.Stop()
}

func (st *streamIdleTimer) onPoll() {
timeoutExpired := func() bool {
st.mu.Lock()
defer st.mu.Unlock()
// Note: time.Since() uses monotonic clock readings.
if st.status == timerActive && time.Since(st.startTime) > st.timeout {
st.status = timerTriggered
return true
}
return false
}()
if timeoutExpired {
st.onTimeout()
}
}
82 changes: 82 additions & 0 deletions pubsublite/internal/wire/request_timer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,43 @@ package wire

import (
"errors"
"fmt"
"testing"
"time"

"cloud.google.com/go/pubsublite/internal/test"
)

func TestMinDuration(t *testing.T) {
for _, tc := range []struct {
a time.Duration
b time.Duration
want time.Duration
}{
{
a: 10 * time.Millisecond,
b: 10 * time.Millisecond,
want: 10 * time.Millisecond,
},
{
a: 10 * time.Millisecond,
b: 9 * time.Millisecond,
want: 9 * time.Millisecond,
},
{
a: 5 * time.Millisecond,
b: 5 * time.Second,
want: 5 * time.Millisecond,
},
} {
t.Run(fmt.Sprintf("%s %s", tc.a, tc.b), func(t *testing.T) {
if got := minDuration(tc.a, tc.b); got != tc.want {
t.Errorf("minDuration(%v, %v): got %v, want %v", tc.a, tc.b, got, tc.want)
}
})
}
}

func TestRequestTimerStop(t *testing.T) {
const timeout = 5 * time.Millisecond
onTimeout := func() {
Expand Down Expand Up @@ -59,3 +90,54 @@ func TestRequestTimerExpires(t *testing.T) {
t.Errorf("ResolveError() got err: %v, want err: %v", gotErr, timeoutErr)
}
}

func TestStreamIdleTimerExpires(t *testing.T) {
const timeout = 5 * time.Millisecond
expired := test.NewCondition("timer expired")

st := newStreamIdleTimer(timeout, expired.SetDone)
defer st.Shutdown()
st.Restart()
expired.WaitUntilDone(t, serviceTestWaitTimeout)
}

func TestStreamIdleTimerRestart(t *testing.T) {
const timeout = 20 * time.Millisecond
const delta = 15 * time.Millisecond
expired := test.NewCondition("timer expired")

st := newStreamIdleTimer(timeout, expired.SetDone)
defer st.Shutdown()
st.Restart()
time.Sleep(delta)
expired.VerifyNotDone(t)
st.Restart()
time.Sleep(delta)
expired.VerifyNotDone(t)
expired.WaitUntilDone(t, serviceTestWaitTimeout)
}

func TestStreamIdleTimerStop(t *testing.T) {
const timeout = 5 * time.Millisecond
onTimeout := func() {
t.Error("onTimeout should not be called")
}

st := newStreamIdleTimer(timeout, onTimeout)
defer st.Shutdown()
st.Restart()
st.Stop()
time.Sleep(2 * timeout)
}

func TestStreamIdleTimerShutdown(t *testing.T) {
const timeout = 5 * time.Millisecond
onTimeout := func() {
t.Error("onTimeout should not be called")
}

st := newStreamIdleTimer(timeout, onTimeout)
st.Restart()
st.Shutdown()
time.Sleep(2 * timeout)
}
Loading

0 comments on commit 8763ef3

Please sign in to comment.