Skip to content

Commit

Permalink
coldata: optimize serialization of Bytes
Browse files Browse the repository at this point in the history
This commit changes how we convert `coldata.Bytes` to and from the arrow
format by introducing our own "arrow-like" (which is arrow-compatible) format.
We call this "arrow-like" because we're abusing the arrow format to get the
best speed, possibly at the cost of increased allocations (when Bytes vector
has been modified in-place many times via `Set`s at arbitrary positions with
values of different lengths).

In particular, the arrow format represents bytes values via two slices - the
flat `[]byte` buffer and the offsets where `len(offsets) = n + 1` (where `n`
is the number of elements). ith element is then
`buffer[offsets[i]:offsets[i+1]`. However, we squash `[]element` and the buffer
for non-inlined values into that flat byte slice, and we only need two
positions in `offsets` to indicate the boundary between the two as well as the
total data length. As a result, we have the following representation (which
defeats the spirit of the arrow format but doesn't cause any issues anywhere):
```
      buffer = [<b.elements as []byte><b.buffer]
     offsets = [0, 0, ..., 0, len(<b.elements as []byte>), len(<b.elements as []byte>) + len(buffer)]
```

This increases the conversion by 2-10x, and the full benchmark results are
[here](https://gist.github.com/yuzefovich/2474e806663ed5ba8cf31ef8a426962c).

Epic: None

Release note: None
  • Loading branch information
yuzefovich committed Jan 3, 2023
1 parent 107e78c commit b353d18
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 49 deletions.
118 changes: 91 additions & 27 deletions pkg/col/coldata/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package coldata
import (
"encoding/binary"
"fmt"
"reflect"
"strings"
"unsafe"

Expand Down Expand Up @@ -541,42 +542,105 @@ func (b *Bytes) String() string {
return builder.String()
}

// BytesFromArrowSerializationFormat takes an Arrow byte slice and accompanying
// offsets and populates b.
func BytesFromArrowSerializationFormat(b *Bytes, data []byte, offsets []int32) {
numElements := len(offsets) - 1
// TODO(yuzefovich): we can come up with better strategy here. For example,
// we could estimate the average size of values and possibly not inline all
// of them, or we could at least ensure that b.buffer is large enough to
// hold some guess on the non-inlined total footprint.
if cap(b.elements) < numElements {
b.elements = make([]element, numElements)
b.buffer = b.buffer[:0]
} else {
b.Reset()
b.elements = b.elements[:numElements]
}
for i := 0; i < numElements; i++ {
b.Set(i, data[offsets[i]:offsets[i+1]])
}
// elementsAsBytes unsafely casts b.elements[:n] to []byte.
func (b *Bytes) elementsAsBytes(n int) []byte {
var bytes []byte
elementsHeader := (*reflect.SliceHeader)(unsafe.Pointer(&b.elements))
bytesHeader := (*reflect.SliceHeader)(unsafe.Pointer(&bytes))
bytesHeader.Data = elementsHeader.Data
bytesHeader.Len = int(ElementSize) * n
bytesHeader.Cap = int(ElementSize) * n
return bytes
}

// ToArrowSerializationFormat returns a bytes slice and offsets that are
// Arrow-compatible. n is the number of elements to serialize. The results will
// be appended to the passed-in slices.
func (b *Bytes) ToArrowSerializationFormat(
n int, dataScratch []byte, offsetsScratch []int32,
) ([]byte, []int32) {
var zeroInt32Slice []int32

func init() {
zeroInt32Slice = make([]int32, BatchSize())
}

// Serialize converts b into the "arrow-like" (which is arrow-compatible)
// format.
//
// We call this "arrow-like" because we're abusing the arrow format to get the
// best speed, possibly at the cost of increased allocations (when Bytes vector
// has been modified in-place many times via Sets at arbitrary positions with
// values of different lengths).
//
// In particular, the arrow format represents bytes values via two slices - the
// flat []byte buffer and the offsets where len(offsets) = n + 1 (where n is the
// number of elements). ith element is then buffer[offsets[i]:offsets[i+1].
// However, we squash b.elements (which is []element) and b.buffer to be stored
// in that flat byte slice, and we only need two positions in offsets to
// indicate the boundary between the two as well as the total data length. As a
// result, we have the following representation (which defeats the spirit of the
// arrow format but doesn't cause any issues anywhere):
//
// buffer = [<b.elements as []byte><b.buffer]
// offsets = [0, 0, ..., 0, len(<b.elements as []byte>), len(<b.elements as []byte>) + len(buffer)]
//
// Note: it is assumed that n is not larger than BatchSize().
func (b *Bytes) Serialize(n int, dataScratch []byte, offsetsScratch []int32) ([]byte, []int32) {
if buildutil.CrdbTestBuild {
if n > BatchSize() {
colexecerror.InternalError(errors.AssertionFailedf(
"too many bytes elements to serialize: %d vs BatchSize() of %d", n, BatchSize(),
))
}
}
data := dataScratch[:0]
offsets := offsetsScratch[:0]
offsets = append(offsets, 0)
for _, e := range b.elements[:n] {
data = append(data, e.get(b)...)

// Handle the cases of 0 and 1 elements separately since then we cannot
// store two offsets that we need.
if n == 0 {
offsets = append(offsets, 0)
return data, offsets
} else if n == 1 {
data = append(data, b.Get(0)...)
offsets = append(offsets, 0)
offsets = append(offsets, int32(len(data)))
return data, offsets
}

// Copy over b.elements treated as []byte as well as b.buffer into data.
bytes := b.elementsAsBytes(n)
if cap(data) < len(bytes)+len(b.buffer) {
data = make([]byte, 0, len(bytes)+len(b.buffer))
}
data = append(data, bytes...)
data = append(data, b.buffer...)

// Now populate the offsets slice which conforms to the arrow format and has
// the correct length.
offsets = append(offsets, zeroInt32Slice[:n-1]...)
offsets = append(offsets, int32(len(bytes)))
offsets = append(offsets, int32(len(data)))
return data, offsets
}

// Deserialize updates b according to the "arrow-like" format that was produced
// by Serialize.
func (b *Bytes) Deserialize(data []byte, offsets []int32) {
n := len(offsets) - 1
if cap(b.elements) < n {
b.elements = make([]element, n)
} else {
b.elements = b.elements[:n]
}
b.buffer = b.buffer[:0]
if n == 0 {
return
} else if n == 1 {
b.elements[0] = element{}
b.Set(0, data)
return
}
bytes := b.elementsAsBytes(n)
copy(bytes, data)
b.buffer = append(b.buffer, data[len(bytes):]...)
}

// ProportionalSize calls the method of the same name on bytes-like vectors,
// panicking if not bytes-like.
func ProportionalSize(v Vec, length int64) int64 {
Expand Down
50 changes: 33 additions & 17 deletions pkg/col/coldata/bytes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,12 +510,12 @@ func TestProportionalSize(t *testing.T) {

const letters = "abcdefghijklmnopqrstuvwxyz"

func TestToArrowSerializationFormat(t *testing.T) {
func TestArrowConversion(t *testing.T) {
defer leaktest.AfterTest(t)()

rng, _ := randutil.NewTestRand()
nullChance := 0.2
maxStringLength := 10
maxStringLength := BytesMaxInlineLength * 2
numElements := 1 + rng.Intn(BatchSize())

b := NewBytes(numElements)
Expand All @@ -527,18 +527,23 @@ func TestToArrowSerializationFormat(t *testing.T) {
b.Set(i, element)
}

startIdx := rng.Intn(numElements)
endIdx := startIdx + rng.Intn(numElements-startIdx)
if endIdx == startIdx {
endIdx++
source := b
if rng.Float64() < 0.5 {
// Sometimes use a window into Bytes to increase test coverage.
startIdx := rng.Intn(numElements)
endIdx := startIdx + rng.Intn(numElements-startIdx)
if endIdx == startIdx {
endIdx++
}
source = b.Window(startIdx, endIdx)
}
wind := b.Window(startIdx, endIdx)
n := source.Len()

var data []byte
var offsets []int32
data, offsets = wind.ToArrowSerializationFormat(wind.Len(), data, offsets)
data, offsets = source.Serialize(n, data, offsets)

require.Equal(t, wind.Len(), len(offsets)-1)
require.Equal(t, n, len(offsets)-1)
require.Equal(t, int32(0), offsets[0])
require.Equal(t, len(data), int(offsets[len(offsets)-1]))

Expand All @@ -547,15 +552,26 @@ func TestToArrowSerializationFormat(t *testing.T) {
require.GreaterOrEqualf(t, offsets[i], offsets[i-1], "unexpectedly found decreasing offsets: %v", offsets)
}

converted := NewBytes(n)
if rng.Float64() < 0.5 {
// Make a copy sometimes to simulate data and offsets being sent across
// the wire.
data = append([]byte(nil), data...)
offsets = append([]int32(nil), offsets...)
}
converted.Deserialize(data, offsets)
// Verify that the data contains the correct values.
for i := 0; i < len(offsets)-1; i++ {
element := data[offsets[i]:offsets[i+1]]
if len(element) == 0 {
// Bytes.Get returns a nil []byte value for NULL values whereas the
// slicing above will make it a zero-length []byte value. Override
// the latter to the former.
element = nil
for i := 0; i < n; i++ {
expected, actual := source.Get(i), converted.Get(i)
// When values are NULL or zero-length, they can have different
// representations ([]byte{nil} and []byte{}), so we convert both to the
// former.
if len(expected) == 0 {
expected = nil
}
if len(actual) == 0 {
actual = nil
}
require.Equal(t, wind.Get(i), element)
require.Equal(t, expected, actual)
}
}
8 changes: 4 additions & 4 deletions pkg/col/colserde/arrowbatchconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,11 @@ func (c *ArrowBatchConverter) BatchToArrow(batch coldata.Batch) ([]array.Data, e
}
switch f {
case types.BytesFamily:
values, offsets = vec.Bytes().ToArrowSerializationFormat(n, values, offsets)
values, offsets = vec.Bytes().Serialize(n, values, offsets)
unsafeCastOffsetsArray(offsets, &offsetsBytes)

case types.JsonFamily:
values, offsets = vec.JSON().Bytes.ToArrowSerializationFormat(n, values, offsets)
values, offsets = vec.JSON().Bytes.Serialize(n, values, offsets)
unsafeCastOffsetsArray(offsets, &offsetsBytes)

case types.DecimalFamily:
Expand Down Expand Up @@ -352,11 +352,11 @@ func (c *ArrowBatchConverter) ArrowToBatch(
switch typeconv.TypeFamilyToCanonicalTypeFamily(typ.Family()) {
case types.BytesFamily:
valueBytes, offsets := getValueBytesAndOffsets(d, vec.Nulls(), batchLength)
coldata.BytesFromArrowSerializationFormat(vec.Bytes(), valueBytes, offsets)
vec.Bytes().Deserialize(valueBytes, offsets)

case types.JsonFamily:
valueBytes, offsets := getValueBytesAndOffsets(d, vec.Nulls(), batchLength)
coldata.BytesFromArrowSerializationFormat(&vec.JSON().Bytes, valueBytes, offsets)
vec.JSON().Bytes.Deserialize(valueBytes, offsets)

case types.DecimalFamily:
// TODO(yuzefovich): this serialization is quite inefficient - improve
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/execinfra/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ const MinAcceptedVersion execinfrapb.DistSQLVersion = 71
Please add new entries at the top.
- Version: 71 (MinAcceptedVersion: 71)
- On-wire representation of booleans in the Arrow format has changed.
- On-wire representation of booleans and bytes-like values in the Arrow format
has changed.
- Version: 70 (MinAcceptedVersion: 70)
- HashGroupJoinerSpec has been introduced.
Expand Down

0 comments on commit b353d18

Please sign in to comment.