Skip to content

Commit 7d085b4

Browse files
authored
feat(bigquery/storage/managedwriter): decouple connections and writers (#7314)
1 parent 8df979e commit 7d085b4

14 files changed

+1495
-1065
lines changed

β€Žbigquery/storage/managedwriter/appendresult.go

Lines changed: 61 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,11 @@ package managedwriter
1616

1717
import (
1818
"context"
19-
"fmt"
2019

2120
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
2221
"github.com/googleapis/gax-go/v2/apierror"
2322
grpcstatus "google.golang.org/grpc/status"
2423
"google.golang.org/protobuf/proto"
25-
"google.golang.org/protobuf/types/descriptorpb"
2624
)
2725

2826
// NoStreamOffset is a sentinel value for signalling we're not tracking
@@ -31,9 +29,6 @@ const NoStreamOffset int64 = -1
3129

3230
// AppendResult tracks the status of a batch of data rows.
3331
type AppendResult struct {
34-
// rowData contains the serialized row data.
35-
rowData [][]byte
36-
3732
ready chan struct{}
3833

3934
// if the append failed without a response, this will retain a reference to the error.
@@ -46,10 +41,9 @@ type AppendResult struct {
4641
totalAttempts int
4742
}
4843

49-
func newAppendResult(data [][]byte) *AppendResult {
44+
func newAppendResult() *AppendResult {
5045
return &AppendResult{
51-
ready: make(chan struct{}),
52-
rowData: data,
46+
ready: make(chan struct{}),
5347
}
5448
}
5549

@@ -138,7 +132,7 @@ func (ar *AppendResult) offset(ctx context.Context) int64 {
138132
func (ar *AppendResult) UpdatedSchema(ctx context.Context) (*storagepb.TableSchema, error) {
139133
select {
140134
case <-ctx.Done():
141-
return nil, fmt.Errorf("context done")
135+
return nil, ctx.Err()
142136
case <-ar.Ready():
143137
if ar.response != nil {
144138
if schema := ar.response.GetUpdatedSchema(); schema != nil {
@@ -155,7 +149,7 @@ func (ar *AppendResult) UpdatedSchema(ctx context.Context) (*storagepb.TableSche
155149
func (ar *AppendResult) TotalAttempts(ctx context.Context) (int, error) {
156150
select {
157151
case <-ctx.Done():
158-
return 0, fmt.Errorf("context done")
152+
return 0, ctx.Err()
159153
case <-ar.Ready():
160154
return ar.totalAttempts, nil
161155
}
@@ -168,12 +162,18 @@ type pendingWrite struct {
168162
// used is to inform routing decisions.
169163
writer *ManagedStream
170164

171-
request *storagepb.AppendRowsRequest
172-
// for schema evolution cases, accept a new schema
173-
newSchema *descriptorpb.DescriptorProto
174-
result *AppendResult
165+
// We store the request as it's simplex-optimized form, as statistically that's the most
166+
// likely outcome when processing requests and it allows us to be efficient on send.
167+
// We retain the additional information to build the complete request in the related fields.
168+
req *storagepb.AppendRowsRequest
169+
descVersion *descriptorVersion // schema at time of creation
170+
traceID string
171+
writeStreamID string
172+
173+
// Reference to the AppendResult which is exposed to the user.
174+
result *AppendResult
175175

176-
// this is used by the flow controller.
176+
// Flow control is based on the unoptimized request size.
177177
reqSize int
178178

179179
// retains the original request context, primarily for checking against
@@ -188,43 +188,66 @@ type pendingWrite struct {
188188
// to the pending results for later consumption. The provided context is
189189
// embedded in the pending write, as the write may be retried and we want
190190
// to respect the original context for expiry/cancellation etc.
191-
func newPendingWrite(ctx context.Context, appends [][]byte) *pendingWrite {
191+
func newPendingWrite(ctx context.Context, src *ManagedStream, req *storagepb.AppendRowsRequest, curDescVersion *descriptorVersion, writeStreamID, traceID string) *pendingWrite {
192192
pw := &pendingWrite{
193-
request: &storagepb.AppendRowsRequest{
194-
Rows: &storagepb.AppendRowsRequest_ProtoRows{
195-
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
196-
Rows: &storagepb.ProtoRows{
197-
SerializedRows: appends,
198-
},
199-
},
200-
},
201-
},
202-
result: newAppendResult(appends),
193+
writer: src,
194+
result: newAppendResult(),
203195
reqCtx: ctx,
196+
197+
req: req,
198+
descVersion: curDescVersion,
199+
writeStreamID: writeStreamID,
200+
traceID: traceID,
201+
}
202+
// Compute the approx size for flow control purposes.
203+
pw.reqSize = proto.Size(pw.req) + len(writeStreamID) + len(traceID)
204+
if pw.descVersion != nil {
205+
pw.reqSize += proto.Size(pw.descVersion.descriptorProto)
204206
}
205-
// We compute the size now for flow controller purposes, though
206-
// the actual request size may be slightly larger (e.g. the first
207-
// request in a new stream bears schema and stream id).
208-
pw.reqSize = proto.Size(pw.request)
209207
return pw
210208
}
211209

212210
// markDone propagates finalization of an append request to the associated
213211
// AppendResult.
214-
func (pw *pendingWrite) markDone(resp *storagepb.AppendRowsResponse, err error, fc *flowController) {
212+
func (pw *pendingWrite) markDone(resp *storagepb.AppendRowsResponse, err error) {
213+
// First, propagate necessary state from the pendingWrite to the final result.
215214
if resp != nil {
216215
pw.result.response = resp
217216
}
218217
pw.result.err = err
219-
// Record the final attempts in the result for the user.
220218
pw.result.totalAttempts = pw.attemptCount
221219

220+
// Close the result's ready channel.
222221
close(pw.result.ready)
223-
// Clear the reference to the request.
224-
pw.request = nil
225-
// if there's a flow controller, signal release. The only time this should be nil is when
226-
// encountering issues with flow control during enqueuing the initial request.
227-
if fc != nil {
228-
fc.release(pw.reqSize)
222+
// Cleanup references remaining on the write explicitly.
223+
pw.req = nil
224+
pw.descVersion = nil
225+
pw.writer = nil
226+
pw.reqCtx = nil
227+
}
228+
229+
func (pw *pendingWrite) constructFullRequest(addTrace bool) *storagepb.AppendRowsRequest {
230+
req := &storagepb.AppendRowsRequest{}
231+
if pw.req != nil {
232+
req = proto.Clone(pw.req).(*storagepb.AppendRowsRequest)
233+
}
234+
if addTrace {
235+
req.TraceId = buildTraceID(&streamSettings{TraceID: pw.traceID})
236+
}
237+
req.WriteStream = pw.writeStreamID
238+
if pw.descVersion != nil {
239+
ps := &storagepb.ProtoSchema{
240+
ProtoDescriptor: pw.descVersion.descriptorProto,
241+
}
242+
if pr := req.GetProtoRows(); pr != nil {
243+
pr.WriterSchema = ps
244+
} else {
245+
req.Rows = &storagepb.AppendRowsRequest_ProtoRows{
246+
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
247+
WriterSchema: ps,
248+
},
249+
}
250+
}
229251
}
252+
return req
230253
}

β€Žbigquery/storage/managedwriter/appendresult_test.go

Lines changed: 117 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
package managedwriter
1616

1717
import (
18-
"bytes"
1918
"context"
2019
"fmt"
2120
"testing"
@@ -24,42 +23,36 @@ import (
2423
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
2524
"github.com/google/go-cmp/cmp"
2625
"google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
26+
"google.golang.org/protobuf/proto"
2727
"google.golang.org/protobuf/testing/protocmp"
28+
"google.golang.org/protobuf/types/descriptorpb"
2829
"google.golang.org/protobuf/types/known/wrapperspb"
2930
)
3031

31-
func TestAppendResult(t *testing.T) {
32-
33-
wantRowBytes := [][]byte{[]byte("rowdata")}
34-
35-
gotAR := newAppendResult(wantRowBytes)
36-
if len(gotAR.rowData) != len(wantRowBytes) {
37-
t.Fatalf("length mismatch, got %d want %d elements", len(gotAR.rowData), len(wantRowBytes))
38-
}
39-
for i := 0; i < len(gotAR.rowData); i++ {
40-
if !bytes.Equal(gotAR.rowData[i], wantRowBytes[i]) {
41-
t.Errorf("mismatch in row data %d, got %q want %q", i, gotAR.rowData, wantRowBytes)
42-
}
43-
}
44-
}
45-
4632
func TestPendingWrite(t *testing.T) {
4733
ctx := context.Background()
48-
wantRowData := [][]byte{
49-
[]byte("row1"),
50-
[]byte("row2"),
51-
[]byte("row3"),
34+
wantReq := &storagepb.AppendRowsRequest{
35+
Rows: &storagepb.AppendRowsRequest_ProtoRows{
36+
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
37+
Rows: &storagepb.ProtoRows{
38+
SerializedRows: [][]byte{
39+
[]byte("row1"),
40+
[]byte("row2"),
41+
[]byte("row3"),
42+
},
43+
},
44+
},
45+
},
5246
}
5347

5448
// verify no offset behavior
55-
pending := newPendingWrite(ctx, wantRowData)
56-
if pending.request.GetOffset() != nil {
57-
t.Errorf("request should have no offset, but is present: %q", pending.request.GetOffset().GetValue())
49+
pending := newPendingWrite(ctx, nil, wantReq, nil, "", "")
50+
if pending.req.GetOffset() != nil {
51+
t.Errorf("request should have no offset, but is present: %q", pending.req.GetOffset().GetValue())
5852
}
5953

60-
gotRowCount := len(pending.request.GetProtoRows().GetRows().GetSerializedRows())
61-
if gotRowCount != len(wantRowData) {
62-
t.Errorf("pendingWrite request mismatch, got %d rows, want %d rows", gotRowCount, len(wantRowData))
54+
if diff := cmp.Diff(pending.req, wantReq, protocmp.Transform()); diff != "" {
55+
t.Errorf("request mismatch: -got, +want:\n%s", diff)
6356
}
6457

6558
// Verify request is not acknowledged.
@@ -71,37 +64,28 @@ func TestPendingWrite(t *testing.T) {
7164
}
7265

7366
// Mark completed, verify result.
74-
pending.markDone(&storage.AppendRowsResponse{}, nil, nil)
67+
pending.markDone(&storage.AppendRowsResponse{}, nil)
7568
if gotOff := pending.result.offset(ctx); gotOff != NoStreamOffset {
7669
t.Errorf("mismatch on completed AppendResult without offset: got %d want %d", gotOff, NoStreamOffset)
7770
}
7871
if pending.result.err != nil {
7972
t.Errorf("mismatch in error on AppendResult, got %v want nil", pending.result.err)
8073
}
81-
gotData := pending.result.rowData
82-
if len(gotData) != len(wantRowData) {
83-
t.Errorf("length mismatch on appendresult, got %d, want %d", len(gotData), len(wantRowData))
84-
}
85-
for i := 0; i < len(gotData); i++ {
86-
if !bytes.Equal(gotData[i], wantRowData[i]) {
87-
t.Errorf("row %d mismatch in data: got %q want %q", i, gotData[i], wantRowData[i])
88-
}
89-
}
9074

9175
// Create new write to verify error result.
92-
pending = newPendingWrite(ctx, wantRowData)
76+
pending = newPendingWrite(ctx, nil, wantReq, nil, "", "")
9377

9478
// Manually invoke option to apply offset to request.
9579
// This would normally be appied as part of the AppendRows() method on the managed stream.
9680
wantOffset := int64(101)
9781
f := WithOffset(wantOffset)
9882
f(pending)
9983

100-
if pending.request.GetOffset() == nil {
84+
if pending.req.GetOffset() == nil {
10185
t.Errorf("expected offset, got none")
10286
}
103-
if pending.request.GetOffset().GetValue() != wantOffset {
104-
t.Errorf("offset mismatch, got %d wanted %d", pending.request.GetOffset().GetValue(), wantOffset)
87+
if pending.req.GetOffset().GetValue() != wantOffset {
88+
t.Errorf("offset mismatch, got %d wanted %d", pending.req.GetOffset().GetValue(), wantOffset)
10589
}
10690

10791
// Verify completion behavior with an error.
@@ -116,19 +100,10 @@ func TestPendingWrite(t *testing.T) {
116100
},
117101
},
118102
}
119-
pending.markDone(testResp, wantErr, nil)
103+
pending.markDone(testResp, wantErr)
120104

121-
if pending.request != nil {
122-
t.Errorf("expected request to be cleared, is present: %#v", pending.request)
123-
}
124-
gotData = pending.result.rowData
125-
if len(gotData) != len(wantRowData) {
126-
t.Errorf("length mismatch in data: got %d, want %d", len(gotData), len(wantRowData))
127-
}
128-
for i := 0; i < len(gotData); i++ {
129-
if !bytes.Equal(gotData[i], wantRowData[i]) {
130-
t.Errorf("row %d mismatch in data: got %q want %q", i, gotData[i], wantRowData[i])
131-
}
105+
if pending.req != nil {
106+
t.Errorf("expected request to be cleared, is present: %#v", pending.req)
132107
}
133108

134109
select {
@@ -153,3 +128,93 @@ func TestPendingWrite(t *testing.T) {
153128
}
154129
}
155130
}
131+
132+
func TestPendingWrite_ConstructFullRequest(t *testing.T) {
133+
134+
testDP := &descriptorpb.DescriptorProto{Name: proto.String("foo")}
135+
testDV := newDescriptorVersion(testDP)
136+
testEmptyTraceID := buildTraceID(&streamSettings{})
137+
138+
for _, tc := range []struct {
139+
desc string
140+
pw *pendingWrite
141+
addTrace bool
142+
want *storagepb.AppendRowsRequest
143+
}{
144+
{
145+
desc: "nil request",
146+
pw: &pendingWrite{
147+
descVersion: testDV,
148+
},
149+
want: &storagepb.AppendRowsRequest{
150+
Rows: &storagepb.AppendRowsRequest_ProtoRows{
151+
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
152+
WriterSchema: &storagepb.ProtoSchema{
153+
ProtoDescriptor: testDP,
154+
},
155+
},
156+
},
157+
},
158+
},
159+
{
160+
desc: "empty req w/trace",
161+
pw: &pendingWrite{
162+
req: &storagepb.AppendRowsRequest{},
163+
descVersion: testDV,
164+
},
165+
addTrace: true,
166+
want: &storagepb.AppendRowsRequest{
167+
Rows: &storagepb.AppendRowsRequest_ProtoRows{
168+
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
169+
WriterSchema: &storagepb.ProtoSchema{
170+
ProtoDescriptor: testDP,
171+
},
172+
},
173+
},
174+
TraceId: testEmptyTraceID,
175+
},
176+
},
177+
{
178+
desc: "basic req",
179+
pw: &pendingWrite{
180+
req: &storagepb.AppendRowsRequest{},
181+
descVersion: testDV,
182+
},
183+
want: &storagepb.AppendRowsRequest{
184+
Rows: &storagepb.AppendRowsRequest_ProtoRows{
185+
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
186+
WriterSchema: &storagepb.ProtoSchema{
187+
ProtoDescriptor: testDP,
188+
},
189+
},
190+
},
191+
},
192+
},
193+
{
194+
desc: "everything w/trace",
195+
pw: &pendingWrite{
196+
req: &storagepb.AppendRowsRequest{},
197+
descVersion: testDV,
198+
traceID: "foo",
199+
writeStreamID: "streamid",
200+
},
201+
addTrace: true,
202+
want: &storagepb.AppendRowsRequest{
203+
WriteStream: "streamid",
204+
Rows: &storagepb.AppendRowsRequest_ProtoRows{
205+
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
206+
WriterSchema: &storagepb.ProtoSchema{
207+
ProtoDescriptor: testDP,
208+
},
209+
},
210+
},
211+
TraceId: buildTraceID(&streamSettings{TraceID: "foo"}),
212+
},
213+
},
214+
} {
215+
got := tc.pw.constructFullRequest(tc.addTrace)
216+
if diff := cmp.Diff(got, tc.want, protocmp.Transform()); diff != "" {
217+
t.Errorf("%s diff: %s", tc.desc, diff)
218+
}
219+
}
220+
}

0 commit comments

Comments
 (0)