Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions go/parquet/internal/encoding/levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package encoding
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"math/bits"

Expand All @@ -28,7 +29,6 @@ import (
"github.com/apache/arrow/go/v15/parquet"
format "github.com/apache/arrow/go/v15/parquet/internal/gen-go/parquet"
"github.com/apache/arrow/go/v15/parquet/internal/utils"
"golang.org/x/xerrors"
)

// LevelEncoder is for handling the encoding of Definition and Repetition levels
Expand Down Expand Up @@ -194,12 +194,12 @@ func (l *LevelDecoder) SetData(encoding parquet.Encoding, maxLvl int16, nbuffere
switch encoding {
case parquet.Encodings.RLE:
if len(data) < 4 {
return 0, xerrors.New("parquet: received invalid levels (corrupt data page?)")
return 0, errors.New("parquet: received invalid levels (corrupt data page?)")
}

nbytes := int32(binary.LittleEndian.Uint32(data[:4]))
if nbytes < 0 || nbytes > int32(len(data)-4) {
return 0, xerrors.New("parquet: received invalid number of bytes (corrupt data page?)")
return 0, errors.New("parquet: received invalid number of bytes (corrupt data page?)")
}

buf := data[4:]
Expand All @@ -212,12 +212,12 @@ func (l *LevelDecoder) SetData(encoding parquet.Encoding, maxLvl int16, nbuffere
case parquet.Encodings.BitPacked:
nbits, ok := overflow.Mul(nbuffered, l.bitWidth)
if !ok {
return 0, xerrors.New("parquet: number of buffered values too large (corrupt data page?)")
return 0, errors.New("parquet: number of buffered values too large (corrupt data page?)")
}

nbytes := bitutil.BytesForBits(int64(nbits))
if nbytes < 0 || nbytes > int64(len(data)) {
return 0, xerrors.New("parquet: recieved invalid number of bytes (corrupt data page?)")
return 0, errors.New("parquet: received invalid number of bytes (corrupt data page?)")
}
if l.bit == nil {
l.bit = utils.NewBitReader(bytes.NewReader(data))
Expand All @@ -234,7 +234,7 @@ func (l *LevelDecoder) SetData(encoding parquet.Encoding, maxLvl int16, nbuffere
// run length encoding.
func (l *LevelDecoder) SetDataV2(nbytes int32, maxLvl int16, nbuffered int, data []byte) error {
if nbytes < 0 {
return xerrors.New("parquet: invalid page header (corrupt data page?)")
return errors.New("parquet: invalid page header (corrupt data page?)")
}

l.maxLvl = maxLvl
Expand Down
10 changes: 7 additions & 3 deletions go/parquet/pqarrow/encode_arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type ArrowColumnWriter struct {
//
// Using an arrow column writer is a convenience to avoid having to process the arrow array yourself
// and determine the correct definition and repetition levels manually.
func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *SchemaManifest, rgw file.RowGroupWriter, col int) (ArrowColumnWriter, error) {
func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *SchemaManifest, rgw file.RowGroupWriter, leafColIdx int) (ArrowColumnWriter, error) {
if data.Len() == 0 {
return ArrowColumnWriter{leafCount: calcLeafCount(data.DataType()), rgw: rgw}, nil
}
Expand Down Expand Up @@ -118,7 +118,7 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch
// which is the one this instance will start writing for
// colIdx := rgw.CurrentColumn() + 1

schemaField, err := manifest.GetColumnField(col)
schemaField, err := manifest.GetColumnField(leafColIdx)
if err != nil {
return ArrowColumnWriter{}, err
}
Expand Down Expand Up @@ -153,7 +153,11 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch
values += chunkWriteSize
}

return ArrowColumnWriter{builders: builders, leafCount: leafCount, rgw: rgw, colIdx: col}, nil
return ArrowColumnWriter{builders: builders, leafCount: leafCount, rgw: rgw, colIdx: leafColIdx}, nil
}

func (acw *ArrowColumnWriter) LeafCount() int {
return acw.leafCount
}

func (acw *ArrowColumnWriter) Write(ctx context.Context) error {
Expand Down
12 changes: 9 additions & 3 deletions go/parquet/pqarrow/encode_arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,12 @@ func TestWriteArrowCols(t *testing.T) {
srgw := writer.AppendRowGroup()
ctx := pqarrow.NewArrowWriteContext(context.TODO(), nil)

colIdx := 0
for i := int64(0); i < tbl.NumCols(); i++ {
acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, int(i))
acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, colIdx)
require.NoError(t, err)
require.NoError(t, acw.Write(ctx))
colIdx = colIdx + acw.LeafCount()
}
require.NoError(t, srgw.Close())
require.NoError(t, writer.Close())
Expand Down Expand Up @@ -249,10 +251,12 @@ func TestWriteArrowInt96(t *testing.T) {
srgw := writer.AppendRowGroup()
ctx := pqarrow.NewArrowWriteContext(context.TODO(), &props)

colIdx := 0
for i := int64(0); i < tbl.NumCols(); i++ {
acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, int(i))
acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, colIdx)
require.NoError(t, err)
require.NoError(t, acw.Write(ctx))
colIdx += acw.LeafCount()
}
require.NoError(t, srgw.Close())
require.NoError(t, writer.Close())
Expand Down Expand Up @@ -306,11 +310,13 @@ func writeTableToBuffer(t *testing.T, mem memory.Allocator, tbl arrow.Table, row
for offset < tbl.NumRows() {
sz := utils.Min(rowGroupSize, tbl.NumRows()-offset)
srgw := writer.AppendRowGroup()
colIdx := 0
for i := 0; i < int(tbl.NumCols()); i++ {
col := tbl.Column(i)
acw, err := pqarrow.NewArrowColumnWriter(col.Data(), offset, sz, manifest, srgw, i)
acw, err := pqarrow.NewArrowColumnWriter(col.Data(), offset, sz, manifest, srgw, colIdx)
require.NoError(t, err)
require.NoError(t, acw.Write(ctx))
colIdx = colIdx + acw.LeafCount()
}
srgw.Close()
offset += sz
Expand Down
4 changes: 2 additions & 2 deletions go/parquet/pqarrow/path_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (n *listNode) fillForLast(rng, childRng *elemRange, ctx *pathWriteCtx) iter
fillRepLevels(int(childRng.size()), n.repLevel, ctx)
// once we've reached this point the following preconditions should hold:
// 1. there are no more repeated path nodes to deal with
// 2. all elements in |range| reperesent contiguous elements in the child
// 2. all elements in |range| represent contiguous elements in the child
// array (null values would have shortened the range to ensure all
// remaining list elements are present, though they may be empty)
// 3. no element of range spans a parent list (intermediate list nodes
Expand All @@ -225,7 +225,7 @@ func (n *listNode) fillForLast(rng, childRng *elemRange, ctx *pathWriteCtx) iter

// this is the start of a new list. we can be sure that it only applies to the
// previous list (and doesn't jump to the start of any list further up in nesting
// due to the contraints mentioned earlier)
// due to the constraints mentioned earlier)
ctx.AppendRepLevel(n.prevRepLevel)
ctx.AppendRepLevels(int(sizeCheck.size())-1, n.repLevel)
childRng.end = sizeCheck.end
Expand Down