Skip to content

Commit

Permalink
Refactor encoding APIs (part 1) (parquet-go#172)
Browse files Browse the repository at this point in the history
* refactor encoding APIs

* add methods to all encodings

* remove CanEncode method

* port writer to new encoding APIs

* remove parquet.Page.WriteTo + fix tests

* remove encoding.Encoder

* refactor page reading to use the new encoding APIs

* allow RLE encoding on boolean data pages

* dont panic on mismatchin offsets and number of values in byte array pages

* fix all tests

* remove filePages2

* remove ColumnReader

* remove encoding.Decoder

* remove encoding.ByteArray

* Go 1.17 fixes

* add missing functions on Go 1.17

* add missing functions on Go 1.17 (2)

* fix colum index on Go 1.17

* Update encoding/notsupported.go

Co-authored-by: Kevin Burke <[email protected]>

* Update encoding/notsupported.go

Co-authored-by: Kevin Burke <[email protected]>

* PR feedback

* Update writer.go

Co-authored-by: Kevin Burke <[email protected]>

* Update writer.go

Co-authored-by: Kevin Burke <[email protected]>

* fix typos

* Update writer.go

Co-authored-by: Kevin Burke <[email protected]>

* Update column.go

Co-authored-by: Kevin Burke <[email protected]>

* fix bug when using delta binary encoding on sequences of values using more than 32 bits per item

* Refactor encoding fuzz tests (parquet-go#174)

* add fuzzing for delta encoding

* add fuzzing for RLE encoding

* add fuzzing for byte-stream-split encoding

* remove: encoding/fuzz_test.go

* PR feedback: improve readability of encoding/fuzz.encode function signature

Co-authored-by: Kevin Burke <[email protected]>
  • Loading branch information
Achille and kevinburkesegment authored May 11, 2022
1 parent 35af418 commit 4f4d804
Show file tree
Hide file tree
Showing 92 changed files with 4,059 additions and 6,694 deletions.
181 changes: 94 additions & 87 deletions bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"io"

"github.com/segmentio/parquet-go/bloom"
"github.com/segmentio/parquet-go/bloom/xxhash"
"github.com/segmentio/parquet-go/deprecated"
"github.com/segmentio/parquet-go/encoding"
"github.com/segmentio/parquet-go/encoding/plain"
"github.com/segmentio/parquet-go/format"
"github.com/segmentio/parquet-go/internal/bits"
)
Expand Down Expand Up @@ -80,9 +82,13 @@ type BloomFilterColumn interface {
// filter.
Hash() bloom.Hash

// NewFilter constructs a new bloom filter configured to hold the given
// number of values and bits of filter per value.
NewFilter(numValues int64, bitsPerValue uint) bloom.MutableFilter
// Returns an encoding which can be used to write columns of values to the
// filter.
Encoding() encoding.Encoding

// Returns the size of the filter needed to encode values in the filter,
// assuming each value will be encoded with the given number of bits.
Size(numValues int64, bitsPerValue uint) int
}

// SplitBlockFilter constructs a split block bloom filter object for the column
Expand All @@ -91,10 +97,11 @@ func SplitBlockFilter(path ...string) BloomFilterColumn { return splitBlockFilte

type splitBlockFilter []string

func (f splitBlockFilter) Path() []string { return f }
func (f splitBlockFilter) Hash() bloom.Hash { return bloom.XXH64{} }
func (f splitBlockFilter) NewFilter(numValues int64, bitsPerValue uint) bloom.MutableFilter {
return make(bloom.SplitBlockFilter, bloom.NumSplitBlocksOf(numValues, bitsPerValue))
func (f splitBlockFilter) Path() []string { return f }
func (f splitBlockFilter) Hash() bloom.Hash { return bloom.XXH64{} }
func (f splitBlockFilter) Encoding() encoding.Encoding { return splitBlockEncoding{} }
func (f splitBlockFilter) Size(numValues int64, bitsPerValue uint) int {
return bloom.BlockSize * bloom.NumSplitBlocksOf(numValues, bitsPerValue)
}

// Creates a header from the given bloom filter.
Expand Down Expand Up @@ -124,128 +131,128 @@ func searchBloomFilterColumn(filters []BloomFilterColumn, path columnPath) Bloom
return nil
}

// bloomFilterEncoder is an adapter type which implements the encoding.Encoder
// interface on top of a bloom filter.
type bloomFilterEncoder struct {
filter bloom.MutableFilter
hash bloom.Hash
keys [128]uint64
}

func newBloomFilterEncoder(filter bloom.MutableFilter, hash bloom.Hash) *bloomFilterEncoder {
return &bloomFilterEncoder{filter: filter, hash: hash}
}
const (
// Size of the stack buffer used to perform bulk operations on bloom filters.
//
// This value was determined as being a good default empirically,
// 128 x uint64 makes a 1KiB buffer which amortizes the cost of calling
// methods of bloom filters while not causing too much stack growth either.
filterEncodeBufferSize = 128
)

func (e *bloomFilterEncoder) Bytes() []byte {
return e.filter.Bytes()
type splitBlockEncoding struct {
encoding.NotSupported
}

func (e *bloomFilterEncoder) Reset(io.Writer) {
e.filter.Reset()
func (splitBlockEncoding) EncodeBoolean(dst []byte, src []bool) ([]byte, error) {
splitBlockEncodeUint8(bloom.MakeSplitBlockFilter(dst), bits.BoolToBytes(src))
return dst, nil
}

func (e *bloomFilterEncoder) SetBitWidth(int) {
func (splitBlockEncoding) EncodeInt32(dst []byte, src []int32) ([]byte, error) {
splitBlockEncodeUint32(bloom.MakeSplitBlockFilter(dst), bits.Int32ToUint32(src))
return dst, nil
}

func (e *bloomFilterEncoder) EncodeBoolean(data []bool) error {
return e.insert8(bits.BoolToBytes(data))
func (splitBlockEncoding) EncodeInt64(dst []byte, src []int64) ([]byte, error) {
splitBlockEncodeUint64(bloom.MakeSplitBlockFilter(dst), bits.Int64ToUint64(src))
return dst, nil
}

func (e *bloomFilterEncoder) EncodeInt8(data []int8) error {
return e.insert8(bits.Int8ToBytes(data))
func (e splitBlockEncoding) EncodeInt96(dst []byte, src []deprecated.Int96) ([]byte, error) {
splitBlockEncodeFixedLenByteArray(bloom.MakeSplitBlockFilter(dst), deprecated.Int96ToBytes(src), 12)
return dst, nil
}

func (e *bloomFilterEncoder) EncodeInt16(data []int16) error {
return e.insert16(bits.Int16ToUint16(data))
func (splitBlockEncoding) EncodeFloat(dst []byte, src []float32) ([]byte, error) {
splitBlockEncodeUint32(bloom.MakeSplitBlockFilter(dst), bits.Float32ToUint32(src))
return dst, nil
}

func (e *bloomFilterEncoder) EncodeInt32(data []int32) error {
return e.insert32(bits.Int32ToUint32(data))
func (splitBlockEncoding) EncodeDouble(dst []byte, src []float64) ([]byte, error) {
splitBlockEncodeUint64(bloom.MakeSplitBlockFilter(dst), bits.Float64ToUint64(src))
return dst, nil
}

func (e *bloomFilterEncoder) EncodeInt64(data []int64) error {
return e.insert64(bits.Int64ToUint64(data))
}
func (splitBlockEncoding) EncodeByteArray(dst, src []byte) ([]byte, error) {
filter := bloom.MakeSplitBlockFilter(dst)
buffer := make([]uint64, 0, filterEncodeBufferSize)

func (e *bloomFilterEncoder) EncodeInt96(data []deprecated.Int96) error {
return e.EncodeFixedLenByteArray(12, deprecated.Int96ToBytes(data))
}
err := plain.RangeByteArrays(src, func(value []byte) error {
if len(buffer) == cap(buffer) {
filter.InsertBulk(buffer)
buffer = buffer[:0]
}
buffer = append(buffer, xxhash.Sum64(value))
return nil
})

func (e *bloomFilterEncoder) EncodeFloat(data []float32) error {
return e.insert32(bits.Float32ToUint32(data))
filter.InsertBulk(buffer)
return dst, err
}

func (e *bloomFilterEncoder) EncodeDouble(data []float64) error {
return e.insert64(bits.Float64ToUint64(data))
func (splitBlockEncoding) EncodeFixedLenByteArray(dst, src []byte, size int) ([]byte, error) {
filter := bloom.MakeSplitBlockFilter(dst)
if size == 16 {
splitBlockEncodeUint128(filter, bits.BytesToUint128(src))
} else {
splitBlockEncodeFixedLenByteArray(filter, src, size)
}
return dst, nil
}

func (e *bloomFilterEncoder) EncodeByteArray(data encoding.ByteArrayList) error {
data.Range(func(v []byte) bool { e.insert(v); return true })
return nil
}
func splitBlockEncodeFixedLenByteArray(filter bloom.SplitBlockFilter, data []byte, size int) {
buffer := make([]uint64, 0, filterEncodeBufferSize)

func (e *bloomFilterEncoder) EncodeFixedLenByteArray(size int, data []byte) error {
if size == 16 {
return e.insert128(bits.BytesToUint128(data))
}
for i, j := 0, size; j <= len(data); {
e.insert(data[i:j])
if len(buffer) == cap(buffer) {
filter.InsertBulk(buffer)
buffer = buffer[:0]
}
buffer = append(buffer, xxhash.Sum64(data[i:j]))
i += size
j += size
}
return nil
}

func (e *bloomFilterEncoder) insert(value []byte) {
e.filter.Insert(e.hash.Sum64(value))
filter.InsertBulk(buffer)
}

func (e *bloomFilterEncoder) insert8(data []uint8) error {
k := e.keys[:]
for i := 0; i < len(data); {
n := e.hash.MultiSum64Uint8(k, data[i:])
e.filter.InsertBulk(k[:n:n])
i += n
}
return nil
}
func splitBlockEncodeUint8(filter bloom.SplitBlockFilter, values []uint8) {
buffer := make([]uint64, filterEncodeBufferSize)

func (e *bloomFilterEncoder) insert16(data []uint16) error {
k := e.keys[:]
for i := 0; i < len(data); {
n := e.hash.MultiSum64Uint16(k, data[i:])
e.filter.InsertBulk(k[:n:n])
for i := 0; i < len(values); {
n := xxhash.MultiSum64Uint8(buffer, values[i:])
filter.InsertBulk(buffer[:n])
i += n
}
return nil
}

func (e *bloomFilterEncoder) insert32(data []uint32) error {
k := e.keys[:]
for i := 0; i < len(data); {
n := e.hash.MultiSum64Uint32(k, data[i:])
e.filter.InsertBulk(k[:n:n])
func splitBlockEncodeUint32(filter bloom.SplitBlockFilter, values []uint32) {
buffer := make([]uint64, filterEncodeBufferSize)

for i := 0; i < len(values); {
n := xxhash.MultiSum64Uint32(buffer, values[i:])
filter.InsertBulk(buffer[:n])
i += n
}
return nil
}

func (e *bloomFilterEncoder) insert64(data []uint64) error {
k := e.keys[:]
for i := 0; i < len(data); {
n := e.hash.MultiSum64Uint64(k, data[i:])
e.filter.InsertBulk(k[:n:n])
func splitBlockEncodeUint64(filter bloom.SplitBlockFilter, values []uint64) {
buffer := make([]uint64, filterEncodeBufferSize)

for i := 0; i < len(values); {
n := xxhash.MultiSum64Uint64(buffer, values[i:])
filter.InsertBulk(buffer[:n])
i += n
}
return nil
}

func (e *bloomFilterEncoder) insert128(data [][16]byte) error {
k := e.keys[:]
for i := 0; i < len(data); {
n := e.hash.MultiSum64Uint128(k, data[i:])
e.filter.InsertBulk(k[:n:n])
func splitBlockEncodeUint128(filter bloom.SplitBlockFilter, values [][16]byte) {
buffer := make([]uint64, filterEncodeBufferSize)

for i := 0; i < len(values); {
n := xxhash.MultiSum64Uint128(buffer, values[i:])
filter.InsertBulk(buffer[:n])
i += n
}
return nil
}
8 changes: 8 additions & 0 deletions bloom/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ type MutableFilter interface {
// to a storage medium.
type SplitBlockFilter []Block

// MakeSplitBlockFilter constructs a SplitBlockFilter value from the data byte
// slice.
func MakeSplitBlockFilter(data []byte) SplitBlockFilter {
p := *(*unsafe.Pointer)(unsafe.Pointer(&data))
n := len(data) / BlockSize
return unsafe.Slice((*Block)(p), n)
}

// NumSplitBlocksOf returns the number of blocks in a filter intended to hold
// the given number of values and bits of filter per value.
//
Expand Down
Loading

0 comments on commit 4f4d804

Please sign in to comment.