diff --git a/parts/arrow.go b/parts/arrow.go index 16ff96a910..2d14554a5f 100644 --- a/parts/arrow.go +++ b/parts/arrow.go @@ -2,8 +2,12 @@ package parts import ( "bytes" + "sort" "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/arrow/memory" + "github.com/apache/arrow/go/v14/arrow/util" "github.com/parquet-go/parquet-go" "github.com/polarsignals/frostdb/dynparquet" @@ -140,3 +144,208 @@ func (p *arrowPart) OverlapsWith(schema *dynparquet.Schema, otherPart Part) (boo return schema.Cmp(a, d) <= 0 && schema.Cmp(c, b) <= 0, nil } + +type ArrowCompact struct { + mem memory.Allocator + schema *dynparquet.Schema + fields []arrow.Field + seen map[string]struct{} + byName map[string]array.Builder +} + +func NewArrowCompact(mem memory.Allocator, schema *dynparquet.Schema) *ArrowCompact { + return &ArrowCompact{ + mem: mem, + schema: schema, + fields: make([]arrow.Field, 0, 64), + seen: make(map[string]struct{}), + byName: make(map[string]array.Builder), + } +} + +func (a *ArrowCompact) Compact(parts []Part, opts ...Option) (newParts []Part, preSize int64, postSize int64, err error) { + a.fields = a.fields[:0] + clear(a.seen) + clear(a.byName) + for _, part := range parts { + schema := part.Record().Schema() + for _, f := range schema.Fields() { + if _, ok := a.seen[f.Name]; ok { + continue + } + a.fields = append(a.fields, f) + a.seen[f.Name] = struct{}{} + } + } + // To be consistent we make sure that fields are always sorted + sort.Slice(a.fields, func(i, j int) bool { + return a.fields[i].Name < a.fields[j].Name + }) + schema := arrow.NewSchema(a.fields, nil) + r := array.NewRecordBuilder(a.mem, schema) + + for i := range a.fields { + a.byName[a.fields[i].Name] = r.Field(i) + } + defer r.Release() + seen := make(map[string]struct{}) + for _, part := range parts { + clear(seen) + record := part.Record() + preSize += util.TotalRecordSize(record) + for i := 0; i < int(record.NumCols()); i++ { + name := record.ColumnName(i) + appendColumn(a.byName[name], record.Column(i)) + seen[name] = struct{}{} + } + for n, b := range a.byName { + if _, ok := seen[n]; !ok { + // This dynamic column was not found in the current appended part. Fill the + // column with nulls. + appendNulls(b, int(record.NumRows())) + } + } + } + result := r.NewRecord() + postSize = util.TotalRecordSize(result) + newParts = []Part{ + NewArrowPart(0, result, uint64(postSize), + a.schema, opts...), + } + return +} + +// appends array a to array builder b .It is named appendColumn because it is +// used to build record columns. +// +// supports +// - all base type +// - nullable base types +// - repeated arrays of all base types(nullable and non nullable) +func appendColumn(b array.Builder, a arrow.Array) { + switch e := b.(type) { + case *array.Int64Builder: + appendArray[int64](e, a.(*array.Int64)) + case *array.Float64Builder: + appendArray[float64](e, a.(*array.Float64)) + case *array.BooleanBuilder: + appendArray[bool](e, a.(*array.Boolean)) + case *array.StringBuilder: + appendArray[[]byte](e, &wrapString{String: a.(*array.String)}) + case *array.BinaryDictionaryBuilder: + e.Reserve(a.Len()) + d := a.(*array.Dictionary) + v := d.Dictionary().(*array.String) + for i := 0; i < a.Len(); i++ { + if d.IsNull(i) { + e.AppendNull() + continue + } + _ = e.AppendString(v.Value(d.GetValueIndex(i))) + } + case *array.ListBuilder: + switch value := e.ValueBuilder().(type) { + case *array.Int64Builder: + appendList[int64](e, value, a.(*array.List), func(a arrow.Array) baseArray[int64] { + return a.(*array.Int64) + }) + case *array.Float64Builder: + appendList[float64](e, value, a.(*array.List), func(a arrow.Array) baseArray[float64] { + return a.(*array.Float64) + }) + case *array.BooleanBuilder: + appendList[bool](e, value, a.(*array.List), func(a arrow.Array) baseArray[bool] { + return a.(*array.Boolean) + }) + case *array.StringBuilder: + appendList[[]byte](e, value, a.(*array.List), func(a arrow.Array) baseArray[[]byte] { + return &wrapString{ + String: a.(*array.String), + } + }) + case *array.BinaryDictionaryBuilder: + ls := a.(*array.List) + values := ls.ListValues() + for i := 0; i < a.Len(); i++ { + if ls.IsNull(i) { + e.AppendNull() + continue + } + start, end := ls.ValueOffsets(i) + va := array.NewSlice(values, start, end).(*array.Dictionary) + de := va.Dictionary().(*array.String) + value.Reserve(va.Len()) + for j := 0; j < va.Len(); j++ { + if va.IsNull(j) { + value.AppendNull() + continue + } + _ = value.AppendString(de.Value(va.GetValueIndex(j))) + } + va.Release() + } + } + } +} + +type arrayBuilder[T any] interface { + Reserve(int) + AppendNull() + UnsafeAppend(T) +} + +type baseArray[T any] interface { + Len() int + IsNull(int) bool + Value(int) T + Release() +} + +type wrapString struct { + *array.String +} + +func (a *wrapString) Value(i int) []byte { + return []byte(a.String.Value(i)) +} + +func appendList[T any]( + b *array.ListBuilder, + valueBuild arrayBuilder[T], + ls *array.List, + a func(arrow.Array) baseArray[T], +) { + values := ls.ListValues() + for i := 0; i < ls.Len(); i++ { + if ls.IsNull(i) { + b.AppendNull() + continue + } + b.Append(true) + start, end := ls.ValueOffsets(i) + v := a(array.NewSlice(values, start, end)) + valueBuild.Reserve(v.Len()) + for j := 0; j < v.Len(); j++ { + valueBuild.UnsafeAppend(v.Value(j)) + } + v.Release() + } +} + +func appendArray[T any](b arrayBuilder[T], a baseArray[T]) { + b.Reserve(a.Len()) + for i := 0; i < a.Len(); i++ { + if a.IsNull(i) { + b.AppendNull() + continue + } + b.UnsafeAppend(a.Value(i)) + } +} + +func appendNulls(b array.Builder, n int) { + b.Reserve(n) + for i := 0; i < n; i++ { + b.UnsafeAppendBoolToBitmap(true) + } +}