Skip to content

Commit

Permalink
feat(bigquery/storage/managedwriter): expose connection multiplexing …
Browse files Browse the repository at this point in the history
…as experimental (#7673)

This PR exposes the necessary options to control the new experimental multiplexing features within the managedwriter package, and attempts to document them sufficiently for correct consumption.

Towards: https://togithub.com/googleapis/google-cloud-go/issues/7103
  • Loading branch information
shollyman authored Apr 7, 2023
1 parent e660af8 commit 3b8bfb4
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 30 deletions.
35 changes: 35 additions & 0 deletions bigquery/storage/managedwriter/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,5 +208,40 @@ With write retries enabled, failed writes will be automatically attempted a fini
In support of the retry changes, the AppendResult returned as part of an append call now includes
TotalAttempts(), which returns the number of times that specific append was enqueued to the service.
Values larger than 1 are indicative of a specific append being enqueued multiple times.
# Connection Sharing (Multiplexing)
Note: This feature is EXPERIMENTAL and subject to change.
The BigQuery Write API enforces a limit on the number of concurrent open connections, documented
here: https://cloud.google.com/bigquery/quotas#write-api-limits
Users can now choose to enable connection sharing (multiplexing) when using ManagedStream writers
that use default streams. The intent of this feature is to simplify connection management for users
who wish to write to many tables, at a cardinality beyond the open connection quota. Please note that
explicit streams (Committed, Buffered, and Pending) cannot leverage the connection sharing feature.
Multiplexing features are controlled by the package-specific custom ClientOption options exposed within
this package. Additionally, some of the connection-related WriterOptions that can be specified when
constructing ManagedStream writers are ignored for writers that leverage the shared multiplex connections.
At a high level, multiplexing uses some heuristics based on the flow control of the shared connections
to infer whether the pool should add additional connections up to a user-specific limit per region,
and attempts to balance traffic from writers to those connections.
To enable multiplexing for writes to default streams, simply instantiate the client with the desired options:
ctx := context.Background()
client, err := managedwriter.NewClient(ctx, projectID,
WithMultiplexing,
WithMultiplexPoolLimit(3),
)
if err != nil {
// TODO: Handle error.
}
Special Consideration: Users who would like to utilize many connections associated with a single Client
may benefit from setting the WithGRPCConnectionPool ClientOption, documented here:
https://pkg.go.dev/google.golang.org/api/option#WithGRPCConnectionPool
*/
package managedwriter
5 changes: 4 additions & 1 deletion bigquery/storage/managedwriter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1345,7 +1345,10 @@ func testProtoNormalization(ctx context.Context, t *testing.T, mwClient *Client,
}

func TestIntegration_MultiplexWrites(t *testing.T) {
mwClient, bqClient := getTestClients(context.Background(), t, enableMultiplex(true, 5))
mwClient, bqClient := getTestClients(context.Background(), t,
WithMultiplexing(),
WithMultiplexPoolLimit(2),
)
defer mwClient.Close()
defer bqClient.Close()

Expand Down
99 changes: 80 additions & 19 deletions bigquery/storage/managedwriter/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ func newWriterClientConfig(opts ...option.ClientOption) *writerClientConfig {
wOpt.ApplyWriterOpt(conf)
}
}

// Normalize the config to ensure we're dealing with sane values.
if conf.useMultiplex {
if conf.maxMultiplexPoolSize < 1 {
conf.maxMultiplexPoolSize = 1
}
}
if conf.defaultInflightBytes < 0 {
conf.defaultInflightBytes = 0
}
if conf.defaultInflightRequests < 0 {
conf.defaultInflightRequests = 0
}
return conf
}

Expand All @@ -48,31 +61,55 @@ type writerClientOption interface {
ApplyWriterOpt(*writerClientConfig)
}

// enableMultiplex enables multiplex behavior in the client.
// maxSize indicates the maximum number of shared multiplex connections
// in a given location/region
// WithMultiplexing is an EXPERIMENTAL option that controls connection sharing
// when instantiating the Client. Only writes to default streams can leverage the
// multiplex pool. Internally, the client maintains a pool of connections per BigQuery
// destination region, and will grow the pool to it's maximum allowed size if there's
// sufficient traffic on the shared connection(s).
//
// TODO: export this as part of the multiplex feature launch.
func enableMultiplex(enable bool, maxSize int) option.ClientOption {
return &enableMultiplexSetting{useMultiplex: enable, maxSize: maxSize}
// This ClientOption is EXPERIMENTAL and subject to change.
func WithMultiplexing() option.ClientOption {
return &enableMultiplexSetting{useMultiplex: true}
}

type enableMultiplexSetting struct {
internaloption.EmbeddableAdapter
useMultiplex bool
maxSize int
}

func (s *enableMultiplexSetting) ApplyWriterOpt(c *writerClientConfig) {
c.useMultiplex = s.useMultiplex
}

// WithMultiplexPoolLimit is an EXPERIMENTAL option that sets the maximum
// shared multiplex pool size when instantiating the Client. If multiplexing
// is not enabled, this setting is ignored. By default, the limit is a single
// shared connection. This limit is applied per destination region.
//
// This ClientOption is EXPERIMENTAL and subject to change.
func WithMultiplexPoolLimit(maxSize int) option.ClientOption {
return &maxMultiplexPoolSizeSetting{maxSize: maxSize}
}

type maxMultiplexPoolSizeSetting struct {
internaloption.EmbeddableAdapter
maxSize int
}

func (s *maxMultiplexPoolSizeSetting) ApplyWriterOpt(c *writerClientConfig) {
c.maxMultiplexPoolSize = s.maxSize
}

// defaultMaxInflightRequests sets the default flow controller limit for requests for
// all AppendRows connections created by this client.
// WithDefaultInflightRequests is an EXPERIMENTAL ClientOption for controlling
// the default limit of how many individual AppendRows write requests can
// be in flight on a connection at a time. This limit is enforced on all connections
// created by the instantiated Client.
//
// TODO: export this as part of the multiplex feature launch.
func defaultMaxInflightRequests(n int) option.ClientOption {
// Note: the WithMaxInflightRequests WriterOption can still be used to control
// the behavior for individual ManagedStream writers when not using multiplexing.
//
// This ClientOption is EXPERIMENTAL and subject to change.
func WithDefaultInflightRequests(n int) option.ClientOption {
return &defaultInflightRequestsSetting{maxRequests: n}
}

Expand All @@ -85,11 +122,16 @@ func (s *defaultInflightRequestsSetting) ApplyWriterOpt(c *writerClientConfig) {
c.defaultInflightRequests = s.maxRequests
}

// defaultMaxInflightBytes sets the default flow controller limit for bytes for
// all AppendRows connections created by this client.
// WithDefaultInflightBytes is an EXPERIMENTAL ClientOption for controlling
// the default byte limit for how many individual AppendRows write requests can
// be in flight on a connection at a time. This limit is enforced on all connections
// created by the instantiated Client.
//
// Note: the WithMaxInflightBytes WriterOption can still be used to control
// the behavior for individual ManagedStream writers when not using multiplexing.
//
// TODO: export this as part of the multiplex feature launch.
func defaultMaxInflightBytes(n int) option.ClientOption {
// This ClientOption is EXPERIMENTAL and subject to change.
func WithDefaultInflightBytes(n int) option.ClientOption {
return &defaultInflightBytesSetting{maxBytes: n}
}

Expand All @@ -102,11 +144,18 @@ func (s *defaultInflightBytesSetting) ApplyWriterOpt(c *writerClientConfig) {
c.defaultInflightBytes = s.maxBytes
}

// defaultAppendRowsCallOptions sets a gax.CallOption passed when opening
// the AppendRows bidi connection.
// WithDefaultAppendRowsCallOption is an EXPERIMENTAL ClientOption for controlling
// the gax.CallOptions passed when opening the underlying AppendRows bidi
// stream connections used by this library to communicate with the BigQuery
// Storage service. This option is propagated to all
// connections created by the instantiated Client.
//
// TODO: export this as part of the multiplex feature launch.
func defaultAppendRowsCallOption(o gax.CallOption) option.ClientOption {
// Note: the WithAppendRowsCallOption WriterOption can still be used to control
// the behavior for individual ManagedStream writers that don't participate
// in multiplexing.
//
// This ClientOption is EXPERIMENTAL and subject to change.
func WithDefaultAppendRowsCallOption(o gax.CallOption) option.ClientOption {
return &defaultAppendRowsCallOptionSetting{opt: o}
}

Expand Down Expand Up @@ -152,13 +201,21 @@ func WithDestinationTable(destTable string) WriterOption {
}

// WithMaxInflightRequests bounds the inflight appends on the write connection.
//
// Note: See the WithDefaultInflightRequests ClientOption for setting a default
// when instantiating a client, rather than setting this limit per-writer.
// This WriterOption is ignored for ManagedStreams that participate in multiplexing.
func WithMaxInflightRequests(n int) WriterOption {
return func(ms *ManagedStream) {
ms.streamSettings.MaxInflightRequests = n
}
}

// WithMaxInflightBytes bounds the inflight append request bytes on the write connection.
//
// Note: See the WithDefaultInflightBytes ClientOption for setting a default
// when instantiating a client, rather than setting this limit per-writer.
// This WriterOption is ignored for ManagedStreams that participate in multiplexing.
func WithMaxInflightBytes(n int) WriterOption {
return func(ms *ManagedStream) {
ms.streamSettings.MaxInflightBytes = n
Expand Down Expand Up @@ -191,6 +248,10 @@ func WithDataOrigin(dataOrigin string) WriterOption {

// WithAppendRowsCallOption is used to supply additional call options to the ManagedStream when
// it opens the underlying append stream.
//
// Note: See the DefaultAppendRowsCallOption ClientOption for setting defaults
// when instantiating a client, rather than setting this limit per-writer. This WriterOption
// is ignored for ManagedStream writers that participate in multiplexing.
func WithAppendRowsCallOption(o gax.CallOption) WriterOption {
return func(ms *ManagedStream) {
ms.streamSettings.appendCallOptions = append(ms.streamSettings.appendCallOptions, o)
Expand Down
45 changes: 35 additions & 10 deletions bigquery/storage/managedwriter/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,28 @@ func TestCustomClientOptions(t *testing.T) {
want: &writerClientConfig{},
},
{
desc: "multiplex",
desc: "multiplex enable",
options: []option.ClientOption{
enableMultiplex(true, 4),
WithMultiplexing(),
},
want: &writerClientConfig{
useMultiplex: true,
maxMultiplexPoolSize: 4,
maxMultiplexPoolSize: 1,
},
},
{
desc: "multiplex max",
options: []option.ClientOption{
WithMultiplexPoolLimit(99),
},
want: &writerClientConfig{
maxMultiplexPoolSize: 99,
},
},
{
desc: "default requests",
options: []option.ClientOption{
defaultMaxInflightRequests(42),
WithDefaultInflightRequests(42),
},
want: &writerClientConfig{
defaultInflightRequests: 42,
Expand All @@ -57,7 +66,7 @@ func TestCustomClientOptions(t *testing.T) {
{
desc: "default bytes",
options: []option.ClientOption{
defaultMaxInflightBytes(123),
WithDefaultInflightBytes(123),
},
want: &writerClientConfig{
defaultInflightBytes: 123,
Expand All @@ -66,21 +75,37 @@ func TestCustomClientOptions(t *testing.T) {
{
desc: "default call options",
options: []option.ClientOption{
defaultAppendRowsCallOption(gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1))),
WithDefaultAppendRowsCallOption(gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1))),
},
want: &writerClientConfig{
defaultAppendRowsCallOptions: []gax.CallOption{
gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)),
},
},
},
{
desc: "unusual values",
options: []option.ClientOption{
WithMultiplexing(),
WithMultiplexPoolLimit(-8),
WithDefaultInflightBytes(-1),
WithDefaultInflightRequests(-99),
},
want: &writerClientConfig{
useMultiplex: true,
maxMultiplexPoolSize: 1,
defaultInflightRequests: 0,
defaultInflightBytes: 0,
},
},
{
desc: "multiple options",
options: []option.ClientOption{
enableMultiplex(true, 10),
defaultMaxInflightRequests(99),
defaultMaxInflightBytes(12345),
defaultAppendRowsCallOption(gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1))),
WithMultiplexing(),
WithMultiplexPoolLimit(10),
WithDefaultInflightRequests(99),
WithDefaultInflightBytes(12345),
WithDefaultAppendRowsCallOption(gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1))),
},
want: &writerClientConfig{
useMultiplex: true,
Expand Down

0 comments on commit 3b8bfb4

Please sign in to comment.