Skip to content

Commit

Permalink
add parts.ArrowCompact
Browse files Browse the repository at this point in the history
This commit adds `parts.ArrowCompact` which joints multiple arrow parts into a
single new part.
  • Loading branch information
gernest committed Dec 20, 2023
1 parent caf08d8 commit 88bd738
Showing 1 changed file with 209 additions and 0 deletions.
209 changes: 209 additions & 0 deletions parts/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ package parts

import (
"bytes"
"sort"

"github.com/apache/arrow/go/v14/arrow"
"github.com/apache/arrow/go/v14/arrow/array"
"github.com/apache/arrow/go/v14/arrow/memory"
"github.com/apache/arrow/go/v14/arrow/util"
"github.com/parquet-go/parquet-go"

"github.com/polarsignals/frostdb/dynparquet"
Expand Down Expand Up @@ -140,3 +144,208 @@ func (p *arrowPart) OverlapsWith(schema *dynparquet.Schema, otherPart Part) (boo

return schema.Cmp(a, d) <= 0 && schema.Cmp(c, b) <= 0, nil
}

type ArrowCompact struct {
mem memory.Allocator
schema *dynparquet.Schema
fields []arrow.Field
seen map[string]struct{}
byName map[string]array.Builder
}

func NewArrowCompact(mem memory.Allocator, schema *dynparquet.Schema) *ArrowCompact {
return &ArrowCompact{
mem: mem,
schema: schema,
fields: make([]arrow.Field, 0, 64),
seen: make(map[string]struct{}),
byName: make(map[string]array.Builder),
}
}

func (a *ArrowCompact) Compact(parts []Part, opts ...Option) (newParts []Part, preSize int64, postSize int64, err error) {

Check failure on line 166 in parts/arrow.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)
a.fields = a.fields[:0]
clear(a.seen)
clear(a.byName)
for _, part := range parts {
schema := part.Record().Schema()
for _, f := range schema.Fields() {
if _, ok := a.seen[f.Name]; ok {
continue
}
a.fields = append(a.fields, f)
a.seen[f.Name] = struct{}{}
}
}
// To be consistent we make sure that fields are always sorted
sort.Slice(a.fields, func(i, j int) bool {
return a.fields[i].Name < a.fields[j].Name
})
schema := arrow.NewSchema(a.fields, nil)
r := array.NewRecordBuilder(a.mem, schema)

for i := range a.fields {
a.byName[a.fields[i].Name] = r.Field(i)
}
defer r.Release()
seen := make(map[string]struct{})
for _, part := range parts {
clear(seen)
record := part.Record()
preSize += util.TotalRecordSize(record)
for i := 0; i < int(record.NumCols()); i++ {
name := record.ColumnName(i)
appendColumn(a.byName[name], record.Column(i))
seen[name] = struct{}{}
}
for n, b := range a.byName {
if _, ok := seen[n]; !ok {
// This dynamic column was not found in the current appended part. Fill the
// column with nulls.
appendNulls(b, int(record.NumRows()))
}
}
}
result := r.NewRecord()
postSize = util.TotalRecordSize(result)
newParts = []Part{
NewArrowPart(0, result, uint64(postSize),
a.schema, opts...),
}
return
}

// appends array a to array builder b .It is named appendColumn because it is
// used to build record columns.
//
// supports
// - all base type
// - nullable base types
// - repeated arrays of all base types(nullable and non nullable)
func appendColumn(b array.Builder, a arrow.Array) {
switch e := b.(type) {
case *array.Int64Builder:
appendArray[int64](e, a.(*array.Int64))
case *array.Float64Builder:
appendArray[float64](e, a.(*array.Float64))
case *array.BooleanBuilder:
appendArray[bool](e, a.(*array.Boolean))
case *array.StringBuilder:
appendArray[[]byte](e, &wrapString{String: a.(*array.String)})
case *array.BinaryDictionaryBuilder:
e.Reserve(a.Len())
d := a.(*array.Dictionary)
v := d.Dictionary().(*array.String)
for i := 0; i < a.Len(); i++ {
if d.IsNull(i) {
e.AppendNull()
continue
}
_ = e.AppendString(v.Value(d.GetValueIndex(i)))
}
case *array.ListBuilder:
switch value := e.ValueBuilder().(type) {
case *array.Int64Builder:
appendList[int64](e, value, a.(*array.List), func(a arrow.Array) baseArray[int64] {
return a.(*array.Int64)
})
case *array.Float64Builder:
appendList[float64](e, value, a.(*array.List), func(a arrow.Array) baseArray[float64] {
return a.(*array.Float64)
})
case *array.BooleanBuilder:
appendList[bool](e, value, a.(*array.List), func(a arrow.Array) baseArray[bool] {
return a.(*array.Boolean)
})
case *array.StringBuilder:
appendList[[]byte](e, value, a.(*array.List), func(a arrow.Array) baseArray[[]byte] {
return &wrapString{
String: a.(*array.String),
}
})
case *array.BinaryDictionaryBuilder:
ls := a.(*array.List)
values := ls.ListValues()
for i := 0; i < a.Len(); i++ {
if ls.IsNull(i) {
e.AppendNull()
continue
}
start, end := ls.ValueOffsets(i)
va := array.NewSlice(values, start, end).(*array.Dictionary)
de := va.Dictionary().(*array.String)
value.Reserve(va.Len())
for j := 0; j < va.Len(); j++ {
if va.IsNull(j) {
value.AppendNull()
continue
}
_ = value.AppendString(de.Value(va.GetValueIndex(j)))
}
va.Release()
}
}
}
}

type arrayBuilder[T any] interface {
Reserve(int)
AppendNull()
UnsafeAppend(T)
}

type baseArray[T any] interface {
Len() int
IsNull(int) bool
Value(int) T
Release()
}

type wrapString struct {
*array.String
}

func (a *wrapString) Value(i int) []byte {
return []byte(a.String.Value(i))
}

func appendList[T any](
b *array.ListBuilder,
valueBuild arrayBuilder[T],
ls *array.List,
a func(arrow.Array) baseArray[T],
) {
values := ls.ListValues()
for i := 0; i < ls.Len(); i++ {
if ls.IsNull(i) {
b.AppendNull()
continue
}
b.Append(true)
start, end := ls.ValueOffsets(i)
v := a(array.NewSlice(values, start, end))
valueBuild.Reserve(v.Len())
for j := 0; j < v.Len(); j++ {
valueBuild.UnsafeAppend(v.Value(j))
}
v.Release()
}
}

func appendArray[T any](b arrayBuilder[T], a baseArray[T]) {
b.Reserve(a.Len())
for i := 0; i < a.Len(); i++ {
if a.IsNull(i) {
b.AppendNull()
continue
}
b.UnsafeAppend(a.Value(i))
}
}

func appendNulls(b array.Builder, n int) {
b.Reserve(n)
for i := 0; i < n; i++ {
b.UnsafeAppendBoolToBitmap(true)
}
}

0 comments on commit 88bd738

Please sign in to comment.