Skip to content

Commit f684e16

Browse files
authored
fix(bigquery/storage/managedwriter): fix option propagation (#7669)
1 parent cf06802 commit f684e16

File tree

5 files changed

+82
-61
lines changed

5 files changed

+82
-61
lines changed

β€Žbigquery/storage/managedwriter/client.go

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ func (c *Client) resolvePool(ctx context.Context, settings *streamSettings, stre
218218
}
219219

220220
// No existing pool available, create one for the location and add to shared pools.
221-
pool, err := c.createPool(ctx, loc, nil, streamFunc)
221+
pool, err := c.createPool(ctx, loc, streamFunc)
222222
if err != nil {
223223
return nil, err
224224
}
@@ -227,7 +227,7 @@ func (c *Client) resolvePool(ctx context.Context, settings *streamSettings, stre
227227
}
228228

229229
// createPool builds a connectionPool.
230-
func (c *Client) createPool(ctx context.Context, location string, settings *streamSettings, streamFunc streamClientFunc) (*connectionPool, error) {
230+
func (c *Client) createPool(ctx context.Context, location string, streamFunc streamClientFunc) (*connectionPool, error) {
231231
cCtx, cancel := context.WithCancel(ctx)
232232

233233
if c.cfg == nil {
@@ -238,29 +238,15 @@ func (c *Client) createPool(ctx context.Context, location string, settings *stre
238238
// add location header to the retained pool context.
239239
cCtx = metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", fmt.Sprintf("write_location=%s", location))
240240
}
241-
fcRequests := c.cfg.defaultInflightRequests
242-
fcBytes := c.cfg.defaultInflightBytes
243-
arOpts := c.cfg.defaultAppendRowsCallOptions
244-
if settings != nil {
245-
if settings.MaxInflightRequests > 0 {
246-
fcRequests = settings.MaxInflightRequests
247-
}
248-
if settings.MaxInflightBytes > 0 {
249-
fcBytes = settings.MaxInflightBytes
250-
}
251-
for _, o := range settings.appendCallOptions {
252-
arOpts = append(arOpts, o)
253-
}
254-
}
255241

256242
pool := &connectionPool{
257243
id: newUUID(poolIDPrefix),
258244
location: location,
259245
ctx: cCtx,
260246
cancel: cancel,
261247
open: createOpenF(ctx, streamFunc),
262-
callOptions: arOpts,
263-
baseFlowController: newFlowController(fcRequests, fcBytes),
248+
callOptions: c.cfg.defaultAppendRowsCallOptions,
249+
baseFlowController: newFlowController(c.cfg.defaultInflightRequests, c.cfg.defaultInflightBytes),
264250
}
265251
router := newSharedRouter(c.cfg.useMultiplex, c.cfg.maxMultiplexPoolSize)
266252
if err := pool.activateRouter(router); err != nil {

β€Žbigquery/storage/managedwriter/client_test.go

Lines changed: 35 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func TestCreatePool_Location(t *testing.T) {
5858
c := &Client{
5959
cfg: &writerClientConfig{},
6060
}
61-
pool, err := c.createPool(context.Background(), "foo", nil, nil)
61+
pool, err := c.createPool(context.Background(), "foo", nil)
6262
if err != nil {
6363
t.Fatalf("createPool: %v", err)
6464
}
@@ -86,18 +86,14 @@ func TestCreatePool_Location(t *testing.T) {
8686
// of global configuration and per-writer configuration.
8787
func TestCreatePool(t *testing.T) {
8888
testCases := []struct {
89-
desc string
90-
cfg *writerClientConfig
91-
settings *streamSettings
92-
wantMaxBytes int
93-
wantMaxRequests int
94-
wantCallOptions int
95-
wantErr bool
89+
desc string
90+
cfg *writerClientConfig
91+
settings *streamSettings
92+
wantMaxBytes int
93+
wantMaxRequests int
94+
wantCallOptions int
95+
wantPoolCallOptions int
9696
}{
97-
{
98-
desc: "no config",
99-
wantErr: true,
100-
},
10197
{
10298
desc: "cfg, no settings",
10399
cfg: &writerClientConfig{
@@ -130,9 +126,9 @@ func TestCreatePool(t *testing.T) {
130126
MaxInflightRequests: 99,
131127
MaxInflightBytes: 1024,
132128
},
133-
wantMaxBytes: 1024,
134-
wantMaxRequests: 99,
135-
wantCallOptions: 1,
129+
wantMaxBytes: 1024,
130+
wantMaxRequests: 99,
131+
wantPoolCallOptions: 1,
136132
},
137133
{
138134
desc: "merge defaults and settings",
@@ -145,36 +141,47 @@ func TestCreatePool(t *testing.T) {
145141
MaxInflightBytes: 1024,
146142
appendCallOptions: []gax.CallOption{gax.WithPath("foo")},
147143
},
148-
wantMaxBytes: 1024,
149-
wantMaxRequests: 123,
150-
wantCallOptions: 2,
144+
wantMaxBytes: 1024,
145+
wantMaxRequests: 123,
146+
wantCallOptions: 1,
147+
wantPoolCallOptions: 1,
151148
},
152149
}
153150

154151
for _, tc := range testCases {
155152
c := &Client{
156153
cfg: tc.cfg,
157154
}
158-
got, err := c.createPool(context.Background(), "", tc.settings, nil)
155+
pool, err := c.createPool(context.Background(), "", nil)
159156
if err != nil {
160-
if !tc.wantErr {
161-
t.Errorf("case %q: createPool errored unexpectedly: %v", tc.desc, err)
162-
}
157+
t.Errorf("case %q: createPool errored unexpectedly: %v", tc.desc, err)
163158
continue
164159
}
165-
if err == nil && tc.wantErr {
166-
t.Errorf("case %q: expected createPool to error but it did not", tc.desc)
167-
continue
160+
writer := &ManagedStream{
161+
id: "foo",
162+
streamSettings: tc.settings,
163+
}
164+
if err = pool.addWriter(writer); err != nil {
165+
t.Errorf("case %q: addWriter: %v", tc.desc, err)
166+
}
167+
pw := newPendingWrite(context.Background(), writer, nil, nil, "", "")
168+
gotConn, err := pool.selectConn(pw)
169+
if err != nil {
170+
t.Errorf("case %q: selectConn: %v", tc.desc, err)
168171
}
172+
169173
// too many go-cmp overrides needed to quickly diff here, look at the interesting fields explicitly.
170-
if gotVal := got.baseFlowController.maxInsertBytes; gotVal != tc.wantMaxBytes {
174+
if gotVal := gotConn.fc.maxInsertBytes; gotVal != tc.wantMaxBytes {
171175
t.Errorf("case %q: flowController maxInsertBytes mismatch, got %d want %d", tc.desc, gotVal, tc.wantMaxBytes)
172176
}
173-
if gotVal := got.baseFlowController.maxInsertCount; gotVal != tc.wantMaxRequests {
177+
if gotVal := gotConn.fc.maxInsertCount; gotVal != tc.wantMaxRequests {
174178
t.Errorf("case %q: flowController maxInsertCount mismatch, got %d want %d", tc.desc, gotVal, tc.wantMaxRequests)
175179
}
176-
if gotVal := len(got.callOptions); gotVal != tc.wantCallOptions {
180+
if gotVal := len(gotConn.callOptions); gotVal != tc.wantCallOptions {
177181
t.Errorf("case %q: calloption count mismatch, got %d want %d", tc.desc, gotVal, tc.wantCallOptions)
178182
}
183+
if gotVal := len(pool.callOptions); gotVal != tc.wantPoolCallOptions {
184+
t.Errorf("case %q: POOL calloption count mismatch, got %d want %d", tc.desc, gotVal, tc.wantPoolCallOptions)
185+
}
179186
}
180187
}

β€Žbigquery/storage/managedwriter/connection.go

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ type connectionPool struct {
5656
// connection. Opening the connection is a stateless operation.
5757
open func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error)
5858

59-
// We specify one set of calloptions for the pool.
60-
// All connections in the pool open with the same call options.
59+
// We specify default calloptions for the pool.
60+
// Explicit connections may have their own calloptions as well.
6161
callOptions []gax.CallOption
6262

6363
router poolRouter // poolManager makes the decisions about connections and routing.
@@ -119,6 +119,16 @@ func (pool *connectionPool) removeWriter(writer *ManagedStream) error {
119119
return detachErr
120120
}
121121

122+
func (cp *connectionPool) mergeCallOptions(co *connection) []gax.CallOption {
123+
if co == nil {
124+
return cp.callOptions
125+
}
126+
var mergedOpts []gax.CallOption
127+
mergedOpts = append(mergedOpts, cp.callOptions...)
128+
mergedOpts = append(mergedOpts, co.callOptions...)
129+
return mergedOpts
130+
}
131+
122132
// openWithRetry establishes a new bidi stream and channel pair. It is used by connection objects
123133
// when (re)opening the network connection to the backend.
124134
//
@@ -127,7 +137,7 @@ func (cp *connectionPool) openWithRetry(co *connection) (storagepb.BigQueryWrite
127137
r := &unaryRetryer{}
128138
for {
129139
recordStat(cp.ctx, AppendClientOpenCount, 1)
130-
arc, err := cp.open(cp.callOptions...)
140+
arc, err := cp.open(cp.mergeCallOptions(co)...)
131141
if err != nil {
132142
bo, shouldRetry := r.Retry(err)
133143
if shouldRetry {
@@ -172,9 +182,10 @@ type connection struct {
172182
id string
173183
pool *connectionPool // each connection retains a reference to its owning pool.
174184

175-
fc *flowController // each connection has it's own flow controller.
176-
ctx context.Context // retained context for maintaining the connection, derived from the owning pool.
177-
cancel context.CancelFunc
185+
fc *flowController // each connection has it's own flow controller.
186+
callOptions []gax.CallOption // custom calloptions for this connection.
187+
ctx context.Context // retained context for maintaining the connection, derived from the owning pool.
188+
cancel context.CancelFunc
178189

179190
retry *statelessRetryer
180191
optimizer sendOptimizer
@@ -197,16 +208,32 @@ const (
197208
verboseConnectionMode connectionMode = "VERBOSE"
198209
)
199210

200-
func newConnection(pool *connectionPool, mode connectionMode) *connection {
211+
func newConnection(pool *connectionPool, mode connectionMode, settings *streamSettings) *connection {
201212
if pool == nil {
202213
return nil
203214
}
204215
// create and retain a cancellable context.
205216
connCtx, cancel := context.WithCancel(pool.ctx)
206-
fc := newFlowController(0, 0)
207-
if pool != nil {
208-
fc = copyFlowController(pool.baseFlowController)
217+
218+
// Resolve local overrides for flow control and call options
219+
fcRequests := 0
220+
fcBytes := 0
221+
var opts []gax.CallOption
222+
223+
if pool.baseFlowController != nil {
224+
fcRequests = pool.baseFlowController.maxInsertCount
225+
fcBytes = pool.baseFlowController.maxInsertBytes
226+
}
227+
if settings != nil {
228+
if settings.MaxInflightRequests > 0 {
229+
fcRequests = settings.MaxInflightRequests
230+
}
231+
if settings.MaxInflightBytes > 0 {
232+
fcBytes = settings.MaxInflightBytes
233+
}
234+
opts = settings.appendCallOptions
209235
}
236+
fc := newFlowController(fcRequests, fcBytes)
210237
countLimit, byteLimit := computeLoadThresholds(fc)
211238

212239
return &connection{
@@ -218,6 +245,7 @@ func newConnection(pool *connectionPool, mode connectionMode) *connection {
218245
optimizer: optimizer(mode),
219246
loadBytesThreshold: byteLimit,
220247
loadCountThreshold: countLimit,
248+
callOptions: opts,
221249
}
222250
}
223251

β€Žbigquery/storage/managedwriter/connection_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ func TestConnectionPool_OpenCallOptionPropagation(t *testing.T) {
172172
gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(99)),
173173
},
174174
}
175-
conn := newConnection(pool, "")
175+
conn := newConnection(pool, "", nil)
176176
pool.openWithRetry(conn)
177177
}
178178

β€Žbigquery/storage/managedwriter/routers.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func (rtr *simpleRouter) writerAttach(writer *ManagedStream) error {
8181
defer rtr.mu.Unlock()
8282
rtr.writers[writer.id] = struct{}{}
8383
if rtr.conn == nil {
84-
rtr.conn = newConnection(rtr.pool, rtr.mode)
84+
rtr.conn = newConnection(rtr.pool, rtr.mode, nil)
8585
}
8686
return nil
8787
}
@@ -206,7 +206,7 @@ func (sr *sharedRouter) writerAttach(writer *ManagedStream) error {
206206
if pair := sr.exclusiveConns[writer.id]; pair != nil {
207207
return fmt.Errorf("writer %q already attached", writer.id)
208208
}
209-
sr.exclusiveConns[writer.id] = newConnection(sr.pool, simplexConnectionMode)
209+
sr.exclusiveConns[writer.id] = newConnection(sr.pool, simplexConnectionMode, writer.streamSettings)
210210
return nil
211211
}
212212

@@ -242,9 +242,9 @@ func (sr *sharedRouter) orderAndGrowMultiConns() {
242242
return sr.multiConns[i].curLoad() < sr.multiConns[j].curLoad()
243243
})
244244
if len(sr.multiConns) == 0 {
245-
sr.multiConns = []*connection{newConnection(sr.pool, multiplexConnectionMode)}
245+
sr.multiConns = []*connection{newConnection(sr.pool, multiplexConnectionMode, nil)}
246246
} else if sr.multiConns[0].isLoaded() && len(sr.multiConns) < sr.maxConns {
247-
sr.multiConns = append([]*connection{newConnection(sr.pool, multiplexConnectionMode)}, sr.multiConns...)
247+
sr.multiConns = append([]*connection{newConnection(sr.pool, multiplexConnectionMode, nil)}, sr.multiConns...)
248248
}
249249
}
250250

0 commit comments

Comments
 (0)