Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsublite): enforce minimum publisher and subscriber timeout of 2 minutes #7746

Merged
merged 6 commits into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions pubsublite/internal/wire/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ const (
// publish request (containing a batch of messages) in bytes. Must be lower
// than the gRPC limit of 4 MiB.
MaxPublishRequestBytes int = 3.5 * 1024 * 1024

// MinTimeout is the minimum timeout value that can be set for publisher and
// subscriber settings.
MinTimeout = 2 * time.Minute
)

// FrameworkType is the user-facing API for Cloud Pub/Sub Lite.
Expand All @@ -51,7 +55,7 @@ type PublishSettings struct {
ByteThreshold int

// The maximum time that the client will attempt to establish a publish stream
// connection to the server. Must be > 0.
// connection to the server. Must be >= 2 minutes.
//
// The timeout is exceeded, the publisher will terminate with the last error
// that occurred while trying to reconnect. Note that if the timeout duration
Expand Down Expand Up @@ -98,8 +102,8 @@ func validatePublishSettings(settings PublishSettings) error {
if settings.DelayThreshold <= 0 {
return errors.New("pubsublite: invalid publish settings. DelayThreshold duration must be > 0")
}
if settings.Timeout <= 0 {
return errors.New("pubsublite: invalid publish settings. Timeout duration must be > 0")
if settings.Timeout < MinTimeout {
return errors.New("pubsublite: invalid publish settings. Timeout duration must be >= 2 minutes")
codyoss marked this conversation as resolved.
Show resolved Hide resolved
}
if settings.CountThreshold <= 0 {
return errors.New("pubsublite: invalid publish settings. CountThreshold must be > 0")
Expand Down Expand Up @@ -131,7 +135,7 @@ type ReceiveSettings struct {
MaxOutstandingBytes int

// The maximum time that the client will attempt to establish a subscribe
// stream connection to the server. Must be > 0.
// stream connection to the server. Must be >= 2 minutes.
//
// The timeout is exceeded, the subscriber will terminate with the last error
// that occurred while trying to reconnect.
Expand Down Expand Up @@ -161,8 +165,8 @@ func validateReceiveSettings(settings ReceiveSettings) error {
if settings.MaxOutstandingBytes <= 0 {
return errors.New("pubsublite: invalid receive settings. MaxOutstandingBytes must be > 0")
}
if settings.Timeout <= 0 {
return errors.New("pubsublite: invalid receive settings. Timeout duration must be > 0")
if settings.Timeout < MinTimeout {
return errors.New("pubsublite: invalid receive settings. Timeout duration must be >= 2 minutes")
}
if len(settings.Partitions) > 0 {
var void struct{}
Expand Down
31 changes: 29 additions & 2 deletions pubsublite/internal/wire/settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ func TestValidatePublishSettings(t *testing.T) {
},
wantErr: false,
},
{
desc: "valid: min",
mutateSettings: func(settings *PublishSettings) {
settings.CountThreshold = 1
settings.ByteThreshold = 1
settings.DelayThreshold = time.Nanosecond
settings.Timeout = 2 * time.Minute
settings.BufferedByteLimit = 1
},
wantErr: false,
},
{
desc: "invalid: zero CountThreshold",
mutateSettings: func(settings *PublishSettings) {
Expand Down Expand Up @@ -78,9 +89,9 @@ func TestValidatePublishSettings(t *testing.T) {
wantErr: true,
},
{
desc: "invalid: zero Timeout",
desc: "invalid: low Timeout",
mutateSettings: func(settings *PublishSettings) {
settings.Timeout = time.Duration(0)
settings.Timeout = 2*time.Minute - time.Nanosecond
},
wantErr: true,
},
Expand Down Expand Up @@ -127,6 +138,15 @@ func TestValidateReceiveSettings(t *testing.T) {
},
wantErr: false,
},
{
desc: "valid: min",
mutateSettings: func(settings *ReceiveSettings) {
settings.MaxOutstandingMessages = 11
settings.MaxOutstandingBytes = 1
settings.Timeout = 2 * time.Minute
},
wantErr: false,
},
{
desc: "invalid: zero MaxOutstandingMessages",
mutateSettings: func(settings *ReceiveSettings) {
Expand All @@ -141,6 +161,13 @@ func TestValidateReceiveSettings(t *testing.T) {
},
wantErr: true,
},
{
desc: "invalid: low Timeout",
mutateSettings: func(settings *ReceiveSettings) {
settings.Timeout = 2*time.Minute - time.Nanosecond
},
wantErr: true,
},
{
desc: "invalid: negative partition",
mutateSettings: func(settings *ReceiveSettings) {
Expand Down
36 changes: 24 additions & 12 deletions pubsublite/pscompat/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package pscompat

import (
"log"
"time"

"cloud.google.com/go/internal/optional"
Expand Down Expand Up @@ -66,7 +67,8 @@ type PublishSettings struct {

// The maximum time that the client will attempt to open a publish stream
// to the server. If Timeout is 0, it will be treated as
// DefaultPublishSettings.Timeout. Otherwise must be > 0.
// DefaultPublishSettings.Timeout, otherwise will be clamped to a minimum of
// 2 minutes.
//
// If your application has a low tolerance to backend unavailability, set
// Timeout to a lower duration to detect and handle. When the timeout is
Expand All @@ -75,10 +77,9 @@ type PublishSettings struct {
// backends. Note that if the timeout duration is long, ErrOverflow may occur
// first.
//
// It is not recommended to set Timeout below 2 minutes. If no failover
// operations need to be performed by the application, it is recommended to
// just use the default timeout value to avoid the PublisherClient terminating
// during short periods of backend unavailability.
// If no failover operations need to be performed by the application, it is
// recommended to just use the default timeout value to avoid the
// PublisherClient terminating during short periods of backend unavailability.
Timeout time.Duration

// The maximum number of bytes that the publisher will keep in memory before
Expand Down Expand Up @@ -147,7 +148,12 @@ func (s *PublishSettings) toWireSettings() wire.PublishSettings {
wireSettings.ByteThreshold = s.ByteThreshold
}
if s.Timeout != 0 {
wireSettings.Timeout = s.Timeout
if s.Timeout >= wire.MinTimeout {
wireSettings.Timeout = s.Timeout
} else {
log.Println("WARNING: Using minimum PublishSettings.Timeout of 2 minutes. A lower value will cause an error in the future.")
wireSettings.Timeout = wire.MinTimeout
}
}
if s.BufferedByteLimit != 0 {
wireSettings.BufferedByteLimit = s.BufferedByteLimit
Expand Down Expand Up @@ -224,18 +230,19 @@ type ReceiveSettings struct {

// The maximum time that the client will attempt to open a subscribe stream
// to the server. If Timeout is 0, it will be treated as
// DefaultReceiveSettings.Timeout. Otherwise must be > 0.
// DefaultReceiveSettings.Timeout, otherwise will be clamped to a minimum of
// 2 minutes.
//
// If your application has a low tolerance to backend unavailability, set
// Timeout to a lower duration to detect and handle. When the timeout is
// exceeded, the SubscriberClient will terminate with ErrBackendUnavailable
// and details of the last error that occurred while trying to reconnect to
// backends.
//
// It is not recommended to set Timeout below 2 minutes. If no failover
// operations need to be performed by the application, it is recommended to
// just use the default timeout value to avoid the SubscriberClient
// terminating during short periods of backend unavailability.
// If no failover operations need to be performed by the application, it is
// recommended to just use the default timeout value to avoid the
// SubscriberClient terminating during short periods of backend
// unavailability.
Timeout time.Duration

// The topic partition numbers (zero-indexed) to receive messages from.
Expand Down Expand Up @@ -280,7 +287,12 @@ func (s *ReceiveSettings) toWireSettings() wire.ReceiveSettings {
wireSettings.MaxOutstandingBytes = s.MaxOutstandingBytes
}
if s.Timeout != 0 {
wireSettings.Timeout = s.Timeout
if s.Timeout >= wire.MinTimeout {
wireSettings.Timeout = s.Timeout
} else {
log.Println("WARNING: Using minimum ReceiveSettings.Timeout of 2 minutes. A lower value will cause an error in the future.")
wireSettings.Timeout = wire.MinTimeout
}
}
return wireSettings
}
12 changes: 6 additions & 6 deletions pubsublite/pscompat/settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ func TestPublishSettingsToWireSettings(t *testing.T) {
DelayThreshold: 2,
CountThreshold: 3,
ByteThreshold: 4,
Timeout: 5,
Timeout: wire.MinTimeout,
BufferedByteLimit: 6,
EnableIdempotence: true,
},
wantSettings: wire.PublishSettings{
DelayThreshold: 2,
CountThreshold: 3,
ByteThreshold: 4,
Timeout: 5,
Timeout: wire.MinTimeout,
BufferedByteLimit: 6,
EnableIdempotence: true,
ConfigPollPeriod: wire.DefaultPublishSettings.ConfigPollPeriod,
Expand All @@ -80,7 +80,7 @@ func TestPublishSettingsToWireSettings(t *testing.T) {
DelayThreshold: -2,
CountThreshold: -3,
ByteThreshold: -4,
Timeout: -5,
Timeout: wire.MinTimeout,
BufferedByteLimit: -6,
EnableIdempotence: false,
ConfigPollPeriod: wire.DefaultPublishSettings.ConfigPollPeriod,
Expand Down Expand Up @@ -122,13 +122,13 @@ func TestReceiveSettingsToWireSettings(t *testing.T) {
settings: ReceiveSettings{
MaxOutstandingMessages: 2,
MaxOutstandingBytes: 3,
Timeout: 4,
Timeout: wire.MinTimeout,
Partitions: []int{5, 6},
},
wantSettings: wire.ReceiveSettings{
MaxOutstandingMessages: 2,
MaxOutstandingBytes: 3,
Timeout: 4,
Timeout: wire.MinTimeout,
Partitions: []int{5, 6},
Framework: wire.FrameworkCloudPubSubShim,
},
Expand All @@ -144,7 +144,7 @@ func TestReceiveSettingsToWireSettings(t *testing.T) {
wantSettings: wire.ReceiveSettings{
MaxOutstandingMessages: -2,
MaxOutstandingBytes: -3,
Timeout: -4,
Timeout: wire.MinTimeout,
Partitions: []int{-5, -6},
Framework: wire.FrameworkCloudPubSubShim,
},
Expand Down