diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index 6c57889e4022..f4fa0858f0ac 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -218,7 +218,7 @@ func (c *Client) resolvePool(ctx context.Context, settings *streamSettings, stre } // No existing pool available, create one for the location and add to shared pools. - pool, err := c.createPool(ctx, loc, nil, streamFunc) + pool, err := c.createPool(ctx, loc, streamFunc) if err != nil { return nil, err } @@ -227,7 +227,7 @@ func (c *Client) resolvePool(ctx context.Context, settings *streamSettings, stre } // createPool builds a connectionPool. -func (c *Client) createPool(ctx context.Context, location string, settings *streamSettings, streamFunc streamClientFunc) (*connectionPool, error) { +func (c *Client) createPool(ctx context.Context, location string, streamFunc streamClientFunc) (*connectionPool, error) { cCtx, cancel := context.WithCancel(ctx) if c.cfg == nil { @@ -238,20 +238,6 @@ func (c *Client) createPool(ctx context.Context, location string, settings *stre // add location header to the retained pool context. cCtx = metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", fmt.Sprintf("write_location=%s", location)) } - fcRequests := c.cfg.defaultInflightRequests - fcBytes := c.cfg.defaultInflightBytes - arOpts := c.cfg.defaultAppendRowsCallOptions - if settings != nil { - if settings.MaxInflightRequests > 0 { - fcRequests = settings.MaxInflightRequests - } - if settings.MaxInflightBytes > 0 { - fcBytes = settings.MaxInflightBytes - } - for _, o := range settings.appendCallOptions { - arOpts = append(arOpts, o) - } - } pool := &connectionPool{ id: newUUID(poolIDPrefix), @@ -259,8 +245,8 @@ func (c *Client) createPool(ctx context.Context, location string, settings *stre ctx: cCtx, cancel: cancel, open: createOpenF(ctx, streamFunc), - callOptions: arOpts, - baseFlowController: newFlowController(fcRequests, fcBytes), + callOptions: c.cfg.defaultAppendRowsCallOptions, + baseFlowController: newFlowController(c.cfg.defaultInflightRequests, c.cfg.defaultInflightBytes), } router := newSharedRouter(c.cfg.useMultiplex, c.cfg.maxMultiplexPoolSize) if err := pool.activateRouter(router); err != nil { diff --git a/bigquery/storage/managedwriter/client_test.go b/bigquery/storage/managedwriter/client_test.go index 393860919fd2..242a8b70a680 100644 --- a/bigquery/storage/managedwriter/client_test.go +++ b/bigquery/storage/managedwriter/client_test.go @@ -58,7 +58,7 @@ func TestCreatePool_Location(t *testing.T) { c := &Client{ cfg: &writerClientConfig{}, } - pool, err := c.createPool(context.Background(), "foo", nil, nil) + pool, err := c.createPool(context.Background(), "foo", nil) if err != nil { t.Fatalf("createPool: %v", err) } @@ -86,18 +86,14 @@ func TestCreatePool_Location(t *testing.T) { // of global configuration and per-writer configuration. func TestCreatePool(t *testing.T) { testCases := []struct { - desc string - cfg *writerClientConfig - settings *streamSettings - wantMaxBytes int - wantMaxRequests int - wantCallOptions int - wantErr bool + desc string + cfg *writerClientConfig + settings *streamSettings + wantMaxBytes int + wantMaxRequests int + wantCallOptions int + wantPoolCallOptions int }{ - { - desc: "no config", - wantErr: true, - }, { desc: "cfg, no settings", cfg: &writerClientConfig{ @@ -130,9 +126,9 @@ func TestCreatePool(t *testing.T) { MaxInflightRequests: 99, MaxInflightBytes: 1024, }, - wantMaxBytes: 1024, - wantMaxRequests: 99, - wantCallOptions: 1, + wantMaxBytes: 1024, + wantMaxRequests: 99, + wantPoolCallOptions: 1, }, { desc: "merge defaults and settings", @@ -145,9 +141,10 @@ func TestCreatePool(t *testing.T) { MaxInflightBytes: 1024, appendCallOptions: []gax.CallOption{gax.WithPath("foo")}, }, - wantMaxBytes: 1024, - wantMaxRequests: 123, - wantCallOptions: 2, + wantMaxBytes: 1024, + wantMaxRequests: 123, + wantCallOptions: 1, + wantPoolCallOptions: 1, }, } @@ -155,26 +152,36 @@ func TestCreatePool(t *testing.T) { c := &Client{ cfg: tc.cfg, } - got, err := c.createPool(context.Background(), "", tc.settings, nil) + pool, err := c.createPool(context.Background(), "", nil) if err != nil { - if !tc.wantErr { - t.Errorf("case %q: createPool errored unexpectedly: %v", tc.desc, err) - } + t.Errorf("case %q: createPool errored unexpectedly: %v", tc.desc, err) continue } - if err == nil && tc.wantErr { - t.Errorf("case %q: expected createPool to error but it did not", tc.desc) - continue + writer := &ManagedStream{ + id: "foo", + streamSettings: tc.settings, + } + if err = pool.addWriter(writer); err != nil { + t.Errorf("case %q: addWriter: %v", tc.desc, err) + } + pw := newPendingWrite(context.Background(), writer, nil, nil, "", "") + gotConn, err := pool.selectConn(pw) + if err != nil { + t.Errorf("case %q: selectConn: %v", tc.desc, err) } + // too many go-cmp overrides needed to quickly diff here, look at the interesting fields explicitly. - if gotVal := got.baseFlowController.maxInsertBytes; gotVal != tc.wantMaxBytes { + if gotVal := gotConn.fc.maxInsertBytes; gotVal != tc.wantMaxBytes { t.Errorf("case %q: flowController maxInsertBytes mismatch, got %d want %d", tc.desc, gotVal, tc.wantMaxBytes) } - if gotVal := got.baseFlowController.maxInsertCount; gotVal != tc.wantMaxRequests { + if gotVal := gotConn.fc.maxInsertCount; gotVal != tc.wantMaxRequests { t.Errorf("case %q: flowController maxInsertCount mismatch, got %d want %d", tc.desc, gotVal, tc.wantMaxRequests) } - if gotVal := len(got.callOptions); gotVal != tc.wantCallOptions { + if gotVal := len(gotConn.callOptions); gotVal != tc.wantCallOptions { t.Errorf("case %q: calloption count mismatch, got %d want %d", tc.desc, gotVal, tc.wantCallOptions) } + if gotVal := len(pool.callOptions); gotVal != tc.wantPoolCallOptions { + t.Errorf("case %q: POOL calloption count mismatch, got %d want %d", tc.desc, gotVal, tc.wantPoolCallOptions) + } } } diff --git a/bigquery/storage/managedwriter/connection.go b/bigquery/storage/managedwriter/connection.go index c592aa6ca2db..a51215361370 100644 --- a/bigquery/storage/managedwriter/connection.go +++ b/bigquery/storage/managedwriter/connection.go @@ -56,8 +56,8 @@ type connectionPool struct { // connection. Opening the connection is a stateless operation. open func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) - // We specify one set of calloptions for the pool. - // All connections in the pool open with the same call options. + // We specify default calloptions for the pool. + // Explicit connections may have their own calloptions as well. callOptions []gax.CallOption router poolRouter // poolManager makes the decisions about connections and routing. @@ -119,6 +119,16 @@ func (pool *connectionPool) removeWriter(writer *ManagedStream) error { return detachErr } +func (cp *connectionPool) mergeCallOptions(co *connection) []gax.CallOption { + if co == nil { + return cp.callOptions + } + var mergedOpts []gax.CallOption + mergedOpts = append(mergedOpts, cp.callOptions...) + mergedOpts = append(mergedOpts, co.callOptions...) + return mergedOpts +} + // openWithRetry establishes a new bidi stream and channel pair. It is used by connection objects // when (re)opening the network connection to the backend. // @@ -127,7 +137,7 @@ func (cp *connectionPool) openWithRetry(co *connection) (storagepb.BigQueryWrite r := &unaryRetryer{} for { recordStat(cp.ctx, AppendClientOpenCount, 1) - arc, err := cp.open(cp.callOptions...) + arc, err := cp.open(cp.mergeCallOptions(co)...) if err != nil { bo, shouldRetry := r.Retry(err) if shouldRetry { @@ -172,9 +182,10 @@ type connection struct { id string pool *connectionPool // each connection retains a reference to its owning pool. - fc *flowController // each connection has it's own flow controller. - ctx context.Context // retained context for maintaining the connection, derived from the owning pool. - cancel context.CancelFunc + fc *flowController // each connection has it's own flow controller. + callOptions []gax.CallOption // custom calloptions for this connection. + ctx context.Context // retained context for maintaining the connection, derived from the owning pool. + cancel context.CancelFunc retry *statelessRetryer optimizer sendOptimizer @@ -197,16 +208,32 @@ const ( verboseConnectionMode connectionMode = "VERBOSE" ) -func newConnection(pool *connectionPool, mode connectionMode) *connection { +func newConnection(pool *connectionPool, mode connectionMode, settings *streamSettings) *connection { if pool == nil { return nil } // create and retain a cancellable context. connCtx, cancel := context.WithCancel(pool.ctx) - fc := newFlowController(0, 0) - if pool != nil { - fc = copyFlowController(pool.baseFlowController) + + // Resolve local overrides for flow control and call options + fcRequests := 0 + fcBytes := 0 + var opts []gax.CallOption + + if pool.baseFlowController != nil { + fcRequests = pool.baseFlowController.maxInsertCount + fcBytes = pool.baseFlowController.maxInsertBytes + } + if settings != nil { + if settings.MaxInflightRequests > 0 { + fcRequests = settings.MaxInflightRequests + } + if settings.MaxInflightBytes > 0 { + fcBytes = settings.MaxInflightBytes + } + opts = settings.appendCallOptions } + fc := newFlowController(fcRequests, fcBytes) countLimit, byteLimit := computeLoadThresholds(fc) return &connection{ @@ -218,6 +245,7 @@ func newConnection(pool *connectionPool, mode connectionMode) *connection { optimizer: optimizer(mode), loadBytesThreshold: byteLimit, loadCountThreshold: countLimit, + callOptions: opts, } } diff --git a/bigquery/storage/managedwriter/connection_test.go b/bigquery/storage/managedwriter/connection_test.go index 5c3770c6ae26..fcc6cbc472f1 100644 --- a/bigquery/storage/managedwriter/connection_test.go +++ b/bigquery/storage/managedwriter/connection_test.go @@ -172,7 +172,7 @@ func TestConnectionPool_OpenCallOptionPropagation(t *testing.T) { gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(99)), }, } - conn := newConnection(pool, "") + conn := newConnection(pool, "", nil) pool.openWithRetry(conn) } diff --git a/bigquery/storage/managedwriter/routers.go b/bigquery/storage/managedwriter/routers.go index 69dad8b251d8..9980e8112733 100644 --- a/bigquery/storage/managedwriter/routers.go +++ b/bigquery/storage/managedwriter/routers.go @@ -81,7 +81,7 @@ func (rtr *simpleRouter) writerAttach(writer *ManagedStream) error { defer rtr.mu.Unlock() rtr.writers[writer.id] = struct{}{} if rtr.conn == nil { - rtr.conn = newConnection(rtr.pool, rtr.mode) + rtr.conn = newConnection(rtr.pool, rtr.mode, nil) } return nil } @@ -206,7 +206,7 @@ func (sr *sharedRouter) writerAttach(writer *ManagedStream) error { if pair := sr.exclusiveConns[writer.id]; pair != nil { return fmt.Errorf("writer %q already attached", writer.id) } - sr.exclusiveConns[writer.id] = newConnection(sr.pool, simplexConnectionMode) + sr.exclusiveConns[writer.id] = newConnection(sr.pool, simplexConnectionMode, writer.streamSettings) return nil } @@ -242,9 +242,9 @@ func (sr *sharedRouter) orderAndGrowMultiConns() { return sr.multiConns[i].curLoad() < sr.multiConns[j].curLoad() }) if len(sr.multiConns) == 0 { - sr.multiConns = []*connection{newConnection(sr.pool, multiplexConnectionMode)} + sr.multiConns = []*connection{newConnection(sr.pool, multiplexConnectionMode, nil)} } else if sr.multiConns[0].isLoaded() && len(sr.multiConns) < sr.maxConns { - sr.multiConns = append([]*connection{newConnection(sr.pool, multiplexConnectionMode)}, sr.multiConns...) + sr.multiConns = append([]*connection{newConnection(sr.pool, multiplexConnectionMode, nil)}, sr.multiConns...) } }