Skip to content

Commit ce9d29b

Browse files
authored
feat(bigquery/storage/managedwriter): allow overriding proto conversion mapping (#12579)
Currently the usage of `bigquery.InferSchema` and `adapt. StorageSchemaToProto2Descriptor` methods to generate proto descriptors with tables that have fields of type `TIMESTAMP` can have compatibility issues, since the original struct has a `time.Time` field and in the proto descriptor it becomes an `INT64`. This PR adds an option to convert to Google's Timestamp Well Known Type (WKT), which is also accepted by the Storage Write API. I think we can't make it the default because some customer might be relying on unmarshalling JSON data with timestamps in the `INT64` format ( unix timestamp ) instead of a RFC3339 formatted timestamp string already. Also we discussed adding options to the `StorageSchemaToProto2Descriptor` method before, on the improvement issue related to CDC helpers: https://togithub.com/googleapis/google-cloud-go/issues/10721 Naming is hard, but not sure what is the best name for the `WithTimestampWellKnownType` method. Open to suggestions. Fixes #12569 Supersedes #12578
1 parent 904cad6 commit ce9d29b

File tree

3 files changed

+353
-48
lines changed

3 files changed

+353
-48
lines changed
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package adapt
16+
17+
import (
18+
"strings"
19+
20+
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
21+
"cloud.google.com/go/internal/optional"
22+
"google.golang.org/protobuf/types/descriptorpb"
23+
)
24+
25+
// ProtoConversionOption to customize proto descriptor conversion.
26+
type ProtoConversionOption interface {
27+
applyCustomClientOpt(*customConfig)
28+
}
29+
30+
// type for collecting custom adapt Option values.
31+
type customConfig struct {
32+
protoMappingOverrides protoMappingOverrides
33+
useProto3 bool
34+
}
35+
36+
// ProtoMapping can be used to override protobuf types used when
37+
// converting from a BigQuery Schema to a Protobuf Descriptor.
38+
// See [WithProtoMapping] option.
39+
type ProtoMapping struct {
40+
// FieldPath should be in the `fieldA.subFieldB.anotherSubFieldC`.
41+
FieldPath string
42+
// FieldType is the BigQuery Table field type to be overrided
43+
FieldType storagepb.TableFieldSchema_Type
44+
// TypeName is the full qualified path name for the protobuf type.
45+
// Example: ".google.protobuf.Timestamp", ".google.protobuf.Duration", etc
46+
TypeName string
47+
// Type is the final DescriptorProto Type
48+
Type descriptorpb.FieldDescriptorProto_Type
49+
}
50+
51+
type protoMappingOverrides []ProtoMapping
52+
53+
func (o *protoMappingOverrides) getByField(field *storagepb.TableFieldSchema, path string) *ProtoMapping {
54+
var foundOverride *ProtoMapping
55+
for _, override := range *o {
56+
if override.FieldPath == path { // only early return for specific override by path
57+
return &override
58+
}
59+
if override.FieldType == field.Type {
60+
foundOverride = &override
61+
}
62+
}
63+
return foundOverride
64+
}
65+
66+
type customOption struct {
67+
protoOverride *ProtoMapping
68+
useProto3 optional.Bool
69+
}
70+
71+
// WithProtoMapping allow to set an override on which field descriptor proto type
72+
// is going to be used for the given BigQuery Table field type or field path.
73+
// See https://cloud.google.com/bigquery/docs/supported-data-types#supported_protocol_buffer_data_types for accepted types
74+
// by the BigQuery Storage Write API.
75+
//
76+
// Examples:
77+
//
78+
// // WithTimestampAsTimestamp defines that table fields of type Timestamp, are mapped
79+
// // as Google's WKT timestamppb.Timestamp.
80+
// func WithTimestampAsTimestamp() Option {
81+
// return WithProtoMapping(ProtoMapping{
82+
// FieldType: storagepb.TableFieldSchema_TIMESTAMP,
83+
// TypeName: "google.protobuf.Timestamp",
84+
// Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE,
85+
// })
86+
// }
87+
//
88+
// // WithIntervalAsDuration defines that table fields of type Interval, are mapped
89+
// // as Google's WKT durationpb.Duration
90+
// func WithIntervalAsDuration() Option {
91+
// return WithProtoMapping(ProtoMapping{
92+
// FieldType: storagepb.TableFieldSchema_INTERVAL,
93+
// TypeName: "google.protobuf.Duration",
94+
// Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE,
95+
// })
96+
// }
97+
func WithProtoMapping(protoMapping ProtoMapping) ProtoConversionOption {
98+
if !strings.HasPrefix(protoMapping.TypeName, ".") && protoMapping.TypeName != "" {
99+
protoMapping.TypeName = "." + protoMapping.TypeName
100+
}
101+
return &customOption{protoOverride: &protoMapping}
102+
}
103+
104+
// internal option to set proto 2 syntax option
105+
func withProto2() ProtoConversionOption {
106+
return &customOption{useProto3: false}
107+
}
108+
109+
// internal option to set proto 3 syntax option
110+
func withProto3() ProtoConversionOption {
111+
return &customOption{useProto3: true}
112+
}
113+
114+
func (o *customOption) applyCustomClientOpt(cfg *customConfig) {
115+
if o.protoOverride != nil {
116+
cfg.protoMappingOverrides = append(cfg.protoMappingOverrides, *o.protoOverride)
117+
}
118+
if o.useProto3 != nil {
119+
cfg.useProto3 = optional.ToBool(o.useProto3)
120+
}
121+
}

β€Žbigquery/storage/managedwriter/adapt/protoconversion.go

Lines changed: 87 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import (
2525
"google.golang.org/protobuf/reflect/protodesc"
2626
"google.golang.org/protobuf/reflect/protoreflect"
2727
"google.golang.org/protobuf/types/descriptorpb"
28+
"google.golang.org/protobuf/types/known/durationpb"
29+
"google.golang.org/protobuf/types/known/timestamppb"
2830
"google.golang.org/protobuf/types/known/wrapperspb"
2931
)
3032

@@ -114,6 +116,21 @@ var bqTypeToWrapperMap = map[storagepb.TableFieldSchema_Type]string{
114116
// filename used by well known types proto
115117
var wellKnownTypesWrapperName = "google/protobuf/wrappers.proto"
116118

119+
// filename used by timestamp and duration proto
120+
var extraWellKnownTypesPerTypeName = map[string]struct {
121+
name string
122+
fileDescriptor *descriptorpb.FileDescriptorProto
123+
}{
124+
".google.protobuf.Timestamp": {
125+
name: "google/protobuf/timestamp.proto",
126+
fileDescriptor: protodesc.ToFileDescriptorProto(timestamppb.File_google_protobuf_timestamp_proto),
127+
},
128+
".google.protobuf.Duration": {
129+
name: "google/protobuf/duration.proto",
130+
fileDescriptor: protodesc.ToFileDescriptorProto(durationpb.File_google_protobuf_duration_proto),
131+
},
132+
}
133+
117134
var rangeTypesPrefix = "rangemessage_range_"
118135

119136
// dependencyCache is used to reduce the number of unique messages we generate by caching based on the tableschema.
@@ -180,7 +197,7 @@ func (dm *dependencyCache) add(schema *storagepb.TableSchema, descriptor protore
180197
return nil
181198
}
182199

183-
func (dm *dependencyCache) addRangeByElementType(typ storagepb.TableFieldSchema_Type, useProto3 bool) (protoreflect.MessageDescriptor, error) {
200+
func (dm *dependencyCache) addRangeByElementType(typ storagepb.TableFieldSchema_Type, cfg *customConfig) (protoreflect.MessageDescriptor, error) {
184201
if md, present := dm.rangeTypes[typ]; present {
185202
// already added, do nothing.
186203
return md, nil
@@ -212,7 +229,7 @@ func (dm *dependencyCache) addRangeByElementType(typ storagepb.TableFieldSchema_
212229
// we put the range types outside the hierarchical namespace as they're effectively BQ-specific well-known types.
213230
msgTypeName := fmt.Sprintf("%s%s", rangeTypesPrefix, strings.ToLower(typ.String()))
214231
// use a new dependency cache, as we don't want to taint the main one due to matching schema
215-
md, err := storageSchemaToDescriptorInternal(ts, msgTypeName, newDependencyCache(), useProto3)
232+
md, err := storageSchemaToDescriptorInternal(ts, msgTypeName, newDependencyCache(), cfg)
216233
if err != nil {
217234
return nil, fmt.Errorf("failed to generate range descriptor %q: %v", msgTypeName, err)
218235
}
@@ -230,22 +247,34 @@ func (dm *dependencyCache) getRange(typ storagepb.TableFieldSchema_Type) protore
230247

231248
// StorageSchemaToProto2Descriptor builds a protoreflect.Descriptor for a given table schema using proto2 syntax.
232249
func StorageSchemaToProto2Descriptor(inSchema *storagepb.TableSchema, scope string) (protoreflect.Descriptor, error) {
233-
dc := newDependencyCache()
234-
// TODO: b/193064992 tracks support for wrapper types. In the interim, disable wrapper usage.
235-
return storageSchemaToDescriptorInternal(inSchema, scope, dc, false)
250+
return StorageSchemaToProtoDescriptorWithOptions(inSchema, scope, withProto2())
236251
}
237252

238253
// StorageSchemaToProto3Descriptor builds a protoreflect.Descriptor for a given table schema using proto3 syntax.
239254
//
240255
// NOTE: Currently the write API doesn't yet support proto3 behaviors (default value, wrapper types, etc), but this is provided for
241256
// completeness.
242257
func StorageSchemaToProto3Descriptor(inSchema *storagepb.TableSchema, scope string) (protoreflect.Descriptor, error) {
258+
return StorageSchemaToProtoDescriptorWithOptions(inSchema, scope, withProto3())
259+
}
260+
261+
// StorageSchemaToProtoDescriptorWithOptions builds a protoreflect.Descriptor for a given table schema with
262+
// extra configuration options. Uses proto2 syntax by default.
263+
func StorageSchemaToProtoDescriptorWithOptions(inSchema *storagepb.TableSchema, scope string, opts ...ProtoConversionOption) (protoreflect.Descriptor, error) {
243264
dc := newDependencyCache()
244-
return storageSchemaToDescriptorInternal(inSchema, scope, dc, true)
265+
cfg := &customConfig{
266+
useProto3: false,
267+
protoMappingOverrides: protoMappingOverrides{},
268+
}
269+
for _, opt := range opts {
270+
opt.applyCustomClientOpt(cfg)
271+
}
272+
// TODO: b/193064992 tracks support for wrapper types. In the interim, disable wrapper usage.
273+
return storageSchemaToDescriptorInternal(inSchema, scope, dc, cfg)
245274
}
246275

247276
// Internal implementation of the conversion code.
248-
func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope string, cache *dependencyCache, useProto3 bool) (protoreflect.MessageDescriptor, error) {
277+
func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope string, cache *dependencyCache, cfg *customConfig) (protoreflect.MessageDescriptor, error) {
249278
if inSchema == nil {
250279
return nil, newConversionError(scope, fmt.Errorf("no input schema was provided"))
251280
}
@@ -277,14 +306,14 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
277306
deps = append(deps, foundDesc.ParentFile())
278307
}
279308
// Construct field descriptor for the message.
280-
fdp := tableFieldSchemaToFieldDescriptorProto(f, fNumber, string(foundDesc.FullName()), useProto3)
309+
fdp := tableFieldSchemaToFieldDescriptorProto(f, fNumber, string(foundDesc.FullName()), cfg)
281310
fields = append(fields, fdp)
282311
} else {
283312
// Wrap the current struct's fields in a TableSchema outer message, and then build the submessage.
284313
ts := &storagepb.TableSchema{
285314
Fields: f.GetFields(),
286315
}
287-
desc, err := storageSchemaToDescriptorInternal(ts, currentScope, cache, useProto3)
316+
desc, err := storageSchemaToDescriptorInternal(ts, currentScope, cache, cfg)
288317
if err != nil {
289318
return nil, newConversionError(currentScope, fmt.Errorf("couldn't convert message: %w", err))
290319
}
@@ -295,7 +324,7 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
295324
if err != nil {
296325
return nil, newConversionError(currentScope, fmt.Errorf("failed to add descriptor to dependency cache: %w", err))
297326
}
298-
fdp := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, useProto3)
327+
fdp := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, cfg)
299328
fields = append(fields, fdp)
300329
}
301330
} else {
@@ -305,7 +334,7 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
305334
if ret == nil {
306335
return nil, fmt.Errorf("field %q is a RANGE, but doesn't include RangeElementType info", f.GetName())
307336
}
308-
foundDesc, err := cache.addRangeByElementType(ret.GetType(), useProto3)
337+
foundDesc, err := cache.addRangeByElementType(ret.GetType(), cfg)
309338
if err != nil {
310339
return nil, err
311340
}
@@ -323,7 +352,7 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
323352
}
324353
}
325354
}
326-
fd := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, useProto3)
355+
fd := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, cfg)
327356
fields = append(fields, fd)
328357
}
329358
}
@@ -335,6 +364,11 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
335364

336365
// Use the local dependencies to generate a list of filenames.
337366
depNames := []string{wellKnownTypesWrapperName}
367+
for _, override := range cfg.protoMappingOverrides {
368+
if dep, found := extraWellKnownTypesPerTypeName[override.TypeName]; found {
369+
depNames = append(depNames, dep.name)
370+
}
371+
}
338372
for _, d := range deps {
339373
depNames = append(depNames, d.ParentFile().Path())
340374
}
@@ -346,7 +380,7 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
346380
Syntax: proto.String("proto3"),
347381
Dependency: depNames,
348382
}
349-
if !useProto3 {
383+
if !cfg.useProto3 {
350384
fdp.Syntax = proto.String("proto2")
351385
}
352386

@@ -357,6 +391,11 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
357391
fdp,
358392
protodesc.ToFileDescriptorProto(wrapperspb.File_google_protobuf_wrappers_proto),
359393
}
394+
for _, override := range cfg.protoMappingOverrides {
395+
if dep, found := extraWellKnownTypesPerTypeName[override.TypeName]; found {
396+
fdpList = append(fdpList, dep.fileDescriptor)
397+
}
398+
}
360399
fdpList = append(fdpList, cache.getFileDescriptorProtos()...)
361400

362401
fds := &descriptorpb.FileDescriptorSet{
@@ -402,7 +441,7 @@ func messageDependsOnFile(msg protoreflect.MessageDescriptor, file protoreflect.
402441
// For proto2, we propagate the mode->label annotation as expected.
403442
//
404443
// Messages are always nullable, and repeated fields are as well.
405-
func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, idx int32, scope string, useProto3 bool) *descriptorpb.FieldDescriptorProto {
444+
func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, idx int32, scope string, cfg *customConfig) *descriptorpb.FieldDescriptorProto {
406445
name := field.GetName()
407446
var fdp *descriptorpb.FieldDescriptorProto
408447

@@ -411,46 +450,35 @@ func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, i
411450
Name: proto.String(name),
412451
Number: proto.Int32(idx),
413452
TypeName: proto.String(scope),
414-
Label: convertModeToLabel(field.GetMode(), useProto3),
453+
Label: convertModeToLabel(field.GetMode(), cfg.useProto3),
415454
}
416455
} else if field.GetType() == storagepb.TableFieldSchema_RANGE {
417456
fdp = &descriptorpb.FieldDescriptorProto{
418457
Name: proto.String(name),
419458
Number: proto.Int32(idx),
420459
TypeName: proto.String(fmt.Sprintf("%s%s", rangeTypesPrefix, strings.ToLower(field.GetRangeElementType().GetType().String()))),
421-
Label: convertModeToLabel(field.GetMode(), useProto3),
460+
Label: convertModeToLabel(field.GetMode(), cfg.useProto3),
422461
}
423462
} else {
424-
// For (REQUIRED||REPEATED) fields for proto3, or all cases for proto2, we can use the expected scalar types.
425-
if field.GetMode() != storagepb.TableFieldSchema_NULLABLE || !useProto3 {
426-
outType := bqTypeToFieldTypeMap[field.GetType()]
427-
fdp = &descriptorpb.FieldDescriptorProto{
428-
Name: proto.String(name),
429-
Number: proto.Int32(idx),
430-
Type: outType.Enum(),
431-
Label: convertModeToLabel(field.GetMode(), useProto3),
432-
}
463+
typeName, outType, label := resolveType(scope, field, cfg)
464+
fdp = &descriptorpb.FieldDescriptorProto{
465+
Name: proto.String(name),
466+
Number: proto.Int32(idx),
467+
Type: outType.Enum(),
468+
TypeName: typeName,
469+
Label: label,
470+
}
433471

434-
// Special case: proto2 repeated fields may benefit from using packed annotation.
435-
if field.GetMode() == storagepb.TableFieldSchema_REPEATED && !useProto3 {
436-
for _, v := range packedTypes {
437-
if outType == v {
438-
fdp.Options = &descriptorpb.FieldOptions{
439-
Packed: proto.Bool(true),
440-
}
441-
break
472+
// Special case: proto2 repeated fields may benefit from using packed annotation.
473+
if field.GetMode() == storagepb.TableFieldSchema_REPEATED && !cfg.useProto3 {
474+
for _, v := range packedTypes {
475+
if outType == v {
476+
fdp.Options = &descriptorpb.FieldOptions{
477+
Packed: proto.Bool(true),
442478
}
479+
break
443480
}
444481
}
445-
} else {
446-
// For NULLABLE proto3 fields, use a wrapper type.
447-
fdp = &descriptorpb.FieldDescriptorProto{
448-
Name: proto.String(name),
449-
Number: proto.Int32(idx),
450-
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
451-
TypeName: proto.String(bqTypeToWrapperMap[field.GetType()]),
452-
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
453-
}
454482
}
455483
}
456484
if nameRequiresAnnotation(name) {
@@ -468,6 +496,23 @@ func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, i
468496
return fdp
469497
}
470498

499+
func resolveType(scope string, field *storagepb.TableFieldSchema, cfg *customConfig) (*string, descriptorpb.FieldDescriptorProto_Type, *descriptorpb.FieldDescriptorProto_Label) {
500+
path := strings.TrimPrefix(strings.ReplaceAll(scope, "__", "."), "root.")
501+
if override := cfg.protoMappingOverrides.getByField(field, path); override != nil {
502+
var typeName *string
503+
if override.TypeName != "" {
504+
typeName = proto.String(override.TypeName)
505+
}
506+
return typeName, override.Type, convertModeToLabel(field.GetMode(), cfg.useProto3)
507+
}
508+
// For (REQUIRED||REPEATED) fields for proto3, or all cases for proto2, we can use the expected scalar types.
509+
if field.GetMode() != storagepb.TableFieldSchema_NULLABLE || !cfg.useProto3 {
510+
return nil, bqTypeToFieldTypeMap[field.GetType()], convertModeToLabel(field.GetMode(), cfg.useProto3)
511+
}
512+
// For NULLABLE proto3 fields, use a wrapper type.
513+
return proto.String(bqTypeToWrapperMap[field.GetType()]), descriptorpb.FieldDescriptorProto_TYPE_MESSAGE, descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
514+
}
515+
471516
// nameRequiresAnnotation determines whether a field name requires unicode-annotation.
472517
func nameRequiresAnnotation(in string) bool {
473518
return !protoreflect.Name(in).IsValid()

0 commit comments

Comments
 (0)