Skip to content

Commit 31040e8

Browse files
authored
fix(bigquery): RowIterator.Schema not filled when using Storage Read API (#7671)
1 parent 0135b60 commit 31040e8

File tree

2 files changed

+79
-1
lines changed

2 files changed

+79
-1
lines changed

β€Žbigquery/storage_integration_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"time"
2323

2424
"cloud.google.com/go/internal/testutil"
25+
"github.com/google/go-cmp/cmp"
2526
"google.golang.org/api/iterator"
2627
)
2728

@@ -233,10 +234,22 @@ func TestIntegration_StorageReadQueryOrdering(t *testing.T) {
233234
t.Fatal(err)
234235
}
235236

237+
var firstValue S
238+
err = it.Next(&firstValue)
239+
if err != nil {
240+
t.Fatal(err)
241+
}
242+
243+
if cmp.Equal(firstValue, S{}) {
244+
t.Fatalf("user defined struct was not filled with data")
245+
}
246+
236247
total, err := countIteratorRows(it)
237248
if err != nil {
238249
t.Fatal(err)
239250
}
251+
total++ // as we read the first value separately
252+
240253
bqSession := it.arrowIterator.session.bqSession
241254
if len(bqSession.Streams) == 0 {
242255
t.Fatalf("%s: expected to use at least one stream but found %d", tc.name, len(bqSession.Streams))
@@ -263,6 +276,56 @@ func TestIntegration_StorageReadQueryOrdering(t *testing.T) {
263276
}
264277
}
265278

279+
func TestIntegration_StorageReadQueryStruct(t *testing.T) {
280+
if client == nil {
281+
t.Skip("Integration tests skipped")
282+
}
283+
ctx := context.Background()
284+
table := "`bigquery-public-data.samples.wikipedia`"
285+
sql := fmt.Sprintf(`SELECT id, title, timestamp, comment FROM %s LIMIT 1000`, table)
286+
q := storageOptimizedClient.Query(sql)
287+
q.forceStorageAPI = true
288+
q.DisableQueryCache = true
289+
it, err := q.Read(ctx)
290+
if err != nil {
291+
t.Fatal(err)
292+
}
293+
if !it.IsAccelerated() {
294+
t.Fatal("expected query to use Storage API")
295+
}
296+
297+
type S struct {
298+
ID int64
299+
Title string
300+
Timestamp int64
301+
Comment NullString
302+
}
303+
304+
total := uint64(0)
305+
for {
306+
var dst S
307+
err := it.Next(&dst)
308+
if err == iterator.Done {
309+
break
310+
}
311+
if err != nil {
312+
t.Fatalf("failed to fetch via storage API: %v", err)
313+
}
314+
if cmp.Equal(dst, S{}) {
315+
t.Fatalf("user defined struct was not filled with data")
316+
}
317+
total++
318+
}
319+
320+
bqSession := it.arrowIterator.session.bqSession
321+
if len(bqSession.Streams) == 0 {
322+
t.Fatalf("should use more than one stream but found %d", len(bqSession.Streams))
323+
}
324+
if total != it.TotalRows {
325+
t.Fatalf("should have read %d rows, but read %d", it.TotalRows, total)
326+
}
327+
}
328+
266329
func TestIntegration_StorageReadQueryMorePages(t *testing.T) {
267330
if client == nil {
268331
t.Skip("Integration tests skipped")
@@ -287,10 +350,22 @@ func TestIntegration_StorageReadQueryMorePages(t *testing.T) {
287350
Forks NullInt64
288351
}
289352

353+
var firstValue S
354+
err = it.Next(&firstValue)
355+
if err != nil {
356+
t.Fatal(err)
357+
}
358+
359+
if cmp.Equal(firstValue, S{}) {
360+
t.Fatalf("user defined struct was not filled with data")
361+
}
362+
290363
total, err := countIteratorRows(it)
291364
if err != nil {
292365
t.Fatal(err)
293366
}
367+
total++ // as we read the first value separately
368+
294369
bqSession := it.arrowIterator.session.bqSession
295370
if len(bqSession.Streams) == 0 {
296371
t.Fatalf("should use more than one stream but found %d", len(bqSession.Streams))

β€Žbigquery/storage_iterator.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ func newStorageRowIteratorFromTable(ctx context.Context, table *Table, ordered b
6161
return nil, err
6262
}
6363
it.arrowIterator.schema = md.Schema
64+
it.Schema = md.Schema
6465
return it, nil
6566
}
6667

@@ -163,7 +164,9 @@ func nextFuncForStorageIterator(it *RowIterator) func() error {
163164
if err != nil {
164165
return err
165166
}
166-
167+
if it.Schema == nil {
168+
it.Schema = it.arrowIterator.schema
169+
}
167170
rows, err := arrowIt.decoder.decodeArrowRecords(record)
168171
if err != nil {
169172
return err

0 commit comments

Comments
 (0)