From 6c2e5f4fd45d84275ad51d20b01b34ae15f12de1 Mon Sep 17 00:00:00 2001 From: Larry Marburger Date: Mon, 14 Nov 2022 13:14:18 -0500 Subject: [PATCH] Serialize time.Time as a timestamp (#321) Serialize time.Time values as Parquet timestamps. The default unit is NANOS and can be changed using the timestamp() struct tag. type timeColumn struct { t1 time.Time t2 time.Time `parquet:",timestamp(millisecond)"` } --- buffer_go18_test.go | 4 ++ column_buffer_go18.go | 43 ++++++++++++++++++++ convert_test.go | 89 +++++++++++++++++++++++++++++++++++++++++ internal/quick/quick.go | 13 ++++++ parquet_test.go | 18 +++++++++ reader_go18_test.go | 4 ++ reader_test.go | 10 +++++ row.go | 6 ++- schema.go | 14 ++++++- sparse/array.go | 22 +++++++++- type.go | 50 ++++++++++++++++++++++- value.go | 34 +++++++++++++--- value_test.go | 9 +++++ writer_go18_test.go | 2 + 14 files changed, 307 insertions(+), 11 deletions(-) diff --git a/buffer_go18_test.go b/buffer_go18_test.go index aea08f9..5833d6c 100644 --- a/buffer_go18_test.go +++ b/buffer_go18_test.go @@ -25,6 +25,8 @@ func TestGenericBuffer(t *testing.T) { testGenericBuffer[stringColumn](t) testGenericBuffer[indexedStringColumn](t) testGenericBuffer[uuidColumn](t) + testGenericBuffer[timeColumn](t) + testGenericBuffer[timeInMillisColumn](t) testGenericBuffer[mapColumn](t) testGenericBuffer[decimalColumn](t) testGenericBuffer[addressBook](t) @@ -108,6 +110,8 @@ func BenchmarkGenericBuffer(b *testing.B) { benchmarkGenericBuffer[stringColumn](b) benchmarkGenericBuffer[indexedStringColumn](b) benchmarkGenericBuffer[uuidColumn](b) + benchmarkGenericBuffer[timeColumn](b) + benchmarkGenericBuffer[timeInMillisColumn](b) benchmarkGenericBuffer[mapColumn](b) benchmarkGenericBuffer[decimalColumn](b) benchmarkGenericBuffer[contact](b) diff --git a/column_buffer_go18.go b/column_buffer_go18.go index c500392..539392c 100644 --- a/column_buffer_go18.go +++ b/column_buffer_go18.go @@ -5,6 +5,7 @@ package parquet import ( "math/bits" "reflect" + "time" "unsafe" "github.com/segmentio/parquet-go/deprecated" @@ -30,6 +31,8 @@ func writeRowsFuncOf(t reflect.Type, schema *Schema, path columnPath) writeRowsF switch t { case reflect.TypeOf(deprecated.Int96{}): return writeRowsFuncOfRequired(t, schema, path) + case reflect.TypeOf(time.Time{}): + return writeRowsFuncOfTime(t, schema, path) } switch t.Kind() { @@ -393,3 +396,43 @@ func writeRowsFuncOfMap(t reflect.Type, schema *Schema, path columnPath) writeRo return nil } } + +func writeRowsFuncOfTime(_ reflect.Type, schema *Schema, path columnPath) writeRowsFunc { + t := reflect.TypeOf(int64(0)) + elemSize := uintptr(t.Size()) + writeRows := writeRowsFuncOf(t, schema, path) + + col, _ := schema.Lookup(path...) + unit := Nanosecond.TimeUnit() + lt := col.Node.Type().LogicalType() + if lt != nil && lt.Timestamp != nil { + unit = lt.Timestamp.Unit + } + + return func(columns []ColumnBuffer, rows sparse.Array, levels columnLevels) error { + if rows.Len() == 0 { + return writeRows(columns, rows, levels) + } + + times := rows.TimeArray() + for i := 0; i < times.Len(); i++ { + t := times.Index(i) + var val int64 + switch { + case unit.Millis != nil: + val = t.UnixMilli() + case unit.Micros != nil: + val = t.UnixMicro() + default: + val = t.UnixNano() + } + + a := makeArray(unsafecast.PointerOfValue(reflect.ValueOf(val)), 1, elemSize) + if err := writeRows(columns, a, levels); err != nil { + return err + } + } + + return nil + } +} diff --git a/convert_test.go b/convert_test.go index a92657b..fe69b32 100644 --- a/convert_test.go +++ b/convert_test.go @@ -3,6 +3,7 @@ package parquet_test import ( "reflect" "testing" + "time" "github.com/segmentio/parquet-go" ) @@ -309,3 +310,91 @@ func TestConvert(t *testing.T) { func newInt64(i int64) *int64 { return &i } func newString(s string) *string { return &s } + +func TestConvertTimestamp(t *testing.T) { + now := time.Unix(42, 0) + ms := now.UnixMilli() + us := now.UnixMicro() + ns := now.UnixNano() + + msType := parquet.Timestamp(parquet.Millisecond).Type() + msVal := parquet.ValueOf(ms) + if msVal.Int64() != ms { + t.Errorf("converted value mismatch:\nwant = %+v\ngot = %+v", ms, msVal.Int64()) + } + + usType := parquet.Timestamp(parquet.Microsecond).Type() + usVal := parquet.ValueOf(us) + if usVal.Int64() != us { + t.Errorf("converted value mismatch:\nwant = %+v\ngot = %+v", us, usVal.Int64()) + } + + nsType := parquet.Timestamp(parquet.Nanosecond).Type() + nsVal := parquet.ValueOf(ns) + if nsVal.Int64() != ns { + t.Errorf("converted value mismatch:\nwant = %+v\ngot = %+v", ns, nsVal.Int64()) + } + + var timestampConversionTests = [...]struct { + scenario string + fromType parquet.Type + fromValue parquet.Value + toType parquet.Type + expected int64 + }{ + { + scenario: "micros to nanos", + fromType: usType, + fromValue: usVal, + toType: nsType, + expected: ns, + }, + { + scenario: "millis to nanos", + fromType: msType, + fromValue: msVal, + toType: nsType, + expected: ns, + }, + { + scenario: "nanos to micros", + fromType: nsType, + fromValue: nsVal, + toType: usType, + expected: us, + }, + { + scenario: "nanos to nanos", + fromType: nsType, + fromValue: nsVal, + toType: nsType, + expected: ns, + }, + { + scenario: "int64 to nanos", + fromType: parquet.Int64Type, + fromValue: nsVal, + toType: nsType, + expected: ns, + }, + { + scenario: "int64 to int64", + fromType: parquet.Int64Type, + fromValue: nsVal, + toType: parquet.Int64Type, + expected: ns, + }, + } + + for _, test := range timestampConversionTests { + t.Run(test.scenario, func(t *testing.T) { + a, err := test.toType.ConvertValue(test.fromValue, test.fromType) + if err != nil { + t.Fatal(err) + } + if a.Int64() != test.expected { + t.Errorf("converted value mismatch:\nwant = %+v\ngot = %+v", test.expected, a.Int64()) + } + }) + } +} diff --git a/internal/quick/quick.go b/internal/quick/quick.go index 2bb51f2..188cb8b 100644 --- a/internal/quick/quick.go +++ b/internal/quick/quick.go @@ -6,6 +6,7 @@ import ( "math/rand" "reflect" "strings" + "time" ) var DefaultConfig = Config{ @@ -70,6 +71,18 @@ func (c *Config) Check(f interface{}) error { type MakeValueFunc func(reflect.Value, *rand.Rand) func MakeValueFuncOf(t reflect.Type) MakeValueFunc { + switch t { + case reflect.TypeOf(time.Time{}): + return func(v reflect.Value, r *rand.Rand) { + // TODO: This is a hack to support the matching of times in a precision + // other than nanosecond by generating times rounded to the second. A + // better solution would be to update columns types to add a compare + // function. + sec := r.Int63n(2524608000) // 2050-01-01 + v.Set(reflect.ValueOf(time.Unix(sec, 0).UTC())) + } + } + switch t.Kind() { case reflect.Bool: return func(v reflect.Value, r *rand.Rand) { diff --git a/parquet_test.go b/parquet_test.go index 88615f0..0671c7f 100644 --- a/parquet_test.go +++ b/parquet_test.go @@ -136,6 +136,24 @@ func (row uuidColumn) generate(prng *rand.Rand) uuidColumn { return row } +type timeColumn struct { + Value time.Time +} + +func (row timeColumn) generate(prng *rand.Rand) timeColumn { + t := time.Unix(0, prng.Int63()).UTC() + return timeColumn{Value: t} +} + +type timeInMillisColumn struct { + Value time.Time `parquet:",timestamp(millisecond)"` +} + +func (row timeInMillisColumn) generate(prng *rand.Rand) timeInMillisColumn { + t := time.Unix(0, prng.Int63()).UTC() + return timeInMillisColumn{Value: t} +} + type decimalColumn struct { Value int64 `parquet:",decimal(0:3)"` } diff --git a/reader_go18_test.go b/reader_go18_test.go index 64c3bc3..191efc6 100644 --- a/reader_go18_test.go +++ b/reader_go18_test.go @@ -26,6 +26,8 @@ func TestGenericReader(t *testing.T) { testGenericReader[stringColumn](t) testGenericReader[indexedStringColumn](t) testGenericReader[uuidColumn](t) + testGenericReader[timeColumn](t) + testGenericReader[timeInMillisColumn](t) testGenericReader[mapColumn](t) testGenericReader[decimalColumn](t) testGenericReader[addressBook](t) @@ -98,6 +100,8 @@ func BenchmarkGenericReader(b *testing.B) { benchmarkGenericReader[stringColumn](b) benchmarkGenericReader[indexedStringColumn](b) benchmarkGenericReader[uuidColumn](b) + benchmarkGenericReader[timeColumn](b) + benchmarkGenericReader[timeInMillisColumn](b) benchmarkGenericReader[mapColumn](b) benchmarkGenericReader[decimalColumn](b) benchmarkGenericReader[contact](b) diff --git a/reader_test.go b/reader_test.go index ddd5f48..fdb4aca 100644 --- a/reader_test.go +++ b/reader_test.go @@ -89,6 +89,16 @@ var readerTests = []struct { model: uuidColumn{}, }, + { + scenario: "time.Time", + model: timeColumn{}, + }, + + { + scenario: "time.Time in ms", + model: timeInMillisColumn{}, + }, + { scenario: "DECIMAL", model: decimalColumn{}, diff --git a/row.go b/row.go index 602fdb0..e1e2b5d 100644 --- a/row.go +++ b/row.go @@ -499,13 +499,15 @@ func deconstructFuncOfLeaf(columnIndex int16, node Node) (int16, deconstructFunc if columnIndex > MaxColumnIndex { panic("row cannot be deconstructed because it has more than 127 columns") } - kind := node.Type().Kind() + typ := node.Type() + kind := typ.Kind() + lt := typ.LogicalType() valueColumnIndex := ^columnIndex return columnIndex + 1, func(row Row, levels levels, value reflect.Value) Row { v := Value{} if value.IsValid() { - v = makeValue(kind, value) + v = makeValue(kind, lt, value) } v.repetitionLevel = levels.repetitionLevel diff --git a/schema.go b/schema.go index 00c90b8..d79cbeb 100644 --- a/schema.go +++ b/schema.go @@ -7,6 +7,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/google/uuid" "github.com/segmentio/parquet-go/compress" @@ -488,6 +489,8 @@ func nodeOf(t reflect.Type, tag []string) Node { return Leaf(Int96Type) case reflect.TypeOf(uuid.UUID{}): return UUID() + case reflect.TypeOf(time.Time{}): + return Timestamp(Nanosecond) } var n Node @@ -823,7 +826,16 @@ func makeNodeOf(t reflect.Type, name string, tag []string) Node { } setNode(Timestamp(timeUnit)) default: - throwInvalidTag(t, name, option) + switch t { + case reflect.TypeOf(time.Time{}): + timeUnit, err := parseTimestampArgs(args) + if err != nil { + throwInvalidTag(t, name, option) + } + setNode(Timestamp(timeUnit)) + default: + throwInvalidTag(t, name, option) + } } default: throwUnknownTag(t, name, option) diff --git a/sparse/array.go b/sparse/array.go index 9848589..94285be 100644 --- a/sparse/array.go +++ b/sparse/array.go @@ -1,6 +1,9 @@ package sparse -import "unsafe" +import ( + "time" + "unsafe" +) type Array struct{ array } @@ -25,6 +28,7 @@ func (a Array) Uint32Array() Uint32Array { return Uint32Array{a.array} } func (a Array) Uint64Array() Uint64Array { return Uint64Array{a.array} } func (a Array) Uint128Array() Uint128Array { return Uint128Array{a.array} } func (a Array) StringArray() StringArray { return StringArray{a.array} } +func (a Array) TimeArray() TimeArray { return TimeArray{a.array} } type array struct { ptr unsafe.Pointer @@ -290,3 +294,19 @@ func (a StringArray) Len() int { return int(a.len) } func (a StringArray) Index(i int) string { return *(*string)(a.index(i)) } func (a StringArray) Slice(i, j int) StringArray { return StringArray{a.slice(i, j)} } func (a StringArray) UnsafeArray() Array { return Array{a.array} } + +type TimeArray struct{ array } + +func MakeTimeArray(values []time.Time) TimeArray { + const sizeOfTime = unsafe.Sizeof(time.Time{}) + return TimeArray{makeArray(*(*unsafe.Pointer)(unsafe.Pointer(&values)), uintptr(len(values)), sizeOfTime)} +} + +func UnsafeTimeArray(base unsafe.Pointer, length int, offset uintptr) TimeArray { + return TimeArray{makeArray(base, uintptr(length), offset)} +} + +func (a TimeArray) Len() int { return int(a.len) } +func (a TimeArray) Index(i int) time.Time { return *(*time.Time)(a.index(i)) } +func (a TimeArray) Slice(i, j int) TimeArray { return TimeArray{a.slice(i, j)} } +func (a TimeArray) UnsafeArray() Array { return Array{a.array} } diff --git a/type.go b/type.go index f996f11..e997c98 100644 --- a/type.go +++ b/type.go @@ -1722,11 +1722,57 @@ func (t *timestampType) Decode(dst encoding.Values, src []byte, enc encoding.Enc } func (t *timestampType) AssignValue(dst reflect.Value, src Value) error { - return Int64Type.AssignValue(dst, src) + switch dst.Type() { + case reflect.TypeOf(time.Time{}): + unit := Nanosecond.TimeUnit() + lt := t.LogicalType() + if lt != nil && lt.Timestamp != nil { + unit = lt.Timestamp.Unit + } + + nanos := src.Int64() + switch { + case unit.Millis != nil: + nanos = nanos * 1e6 + case unit.Micros != nil: + nanos = nanos * 1e3 + } + + val := time.Unix(0, nanos).UTC() + dst.Set(reflect.ValueOf(val)) + return nil + default: + return Int64Type.AssignValue(dst, src) + } } func (t *timestampType) ConvertValue(val Value, typ Type) (Value, error) { - return Int64Type.ConvertValue(val, typ) + var sourceTs *format.TimestampType + if typ.LogicalType() != nil { + sourceTs = typ.LogicalType().Timestamp + } + + // Ignore when source is not a timestamp (i.e., Integer) + if sourceTs == nil { + return val, nil + } + + source := timeUnitDuration(sourceTs.Unit) + target := timeUnitDuration(t.Unit) + converted := val.Int64() * source.Nanoseconds() / target.Nanoseconds() + + return ValueOf(converted), nil +} + +func timeUnitDuration(unit format.TimeUnit) time.Duration { + switch { + case unit.Millis != nil: + return time.Millisecond + case unit.Micros != nil: + return time.Microsecond + default: + return time.Nanosecond + } } // List constructs a node of LIST logical type. diff --git a/value.go b/value.go index d3ece01..5cf0053 100644 --- a/value.go +++ b/value.go @@ -8,10 +8,12 @@ import ( "math" "reflect" "strconv" + "time" "unsafe" "github.com/google/uuid" "github.com/segmentio/parquet-go/deprecated" + "github.com/segmentio/parquet-go/format" "github.com/segmentio/parquet-go/internal/unsafecast" ) @@ -158,6 +160,9 @@ func copyValues(dst ValueWriter, src ValueReader, buf []Value) (written int64, e // // The function panics if the Go value cannot be represented in parquet. func ValueOf(v interface{}) Value { + k := Kind(-1) + t := reflect.TypeOf(v) + switch value := v.(type) { case nil: return Value{} @@ -165,11 +170,10 @@ func ValueOf(v interface{}) Value { return makeValueBytes(FixedLenByteArray, value[:]) case deprecated.Int96: return makeValueInt96(value) + case time.Time: + k = Int64 } - k := Kind(-1) - t := reflect.TypeOf(v) - switch t.Kind() { case reflect.Bool: k = Boolean @@ -197,10 +201,30 @@ func ValueOf(v interface{}) Value { panic("cannot create parquet value from go value of type " + t.String()) } - return makeValue(k, reflect.ValueOf(v)) + return makeValue(k, nil, reflect.ValueOf(v)) } -func makeValue(k Kind, v reflect.Value) Value { +func makeValue(k Kind, lt *format.LogicalType, v reflect.Value) Value { + switch v.Type() { + case reflect.TypeOf(time.Time{}): + unit := Nanosecond.TimeUnit() + if lt != nil && lt.Timestamp != nil { + unit = lt.Timestamp.Unit + } + + t := v.Interface().(time.Time) + var val int64 + switch { + case unit.Millis != nil: + val = t.UnixMilli() + case unit.Micros != nil: + val = t.UnixMicro() + default: + val = t.UnixNano() + } + return makeValueInt64(val) + } + switch k { case Boolean: return makeValueBoolean(v.Bool()) diff --git a/value_test.go b/value_test.go index db9d7dd..b5a2989 100644 --- a/value_test.go +++ b/value_test.go @@ -4,6 +4,7 @@ import ( "bytes" "math" "testing" + "time" "unsafe" "github.com/segmentio/parquet-go" @@ -68,6 +69,14 @@ func TestValueClone(t *testing.T) { scenario: "FIXED_LEN_BYTE_ARRAY", values: []interface{}{[1]byte{42}, [16]byte{0: 1}}, }, + + { + scenario: "TIME", + values: []interface{}{ + time.Date(2020, 1, 2, 3, 4, 5, 7, time.UTC), + time.Date(2021, 2, 3, 4, 5, 6, 8, time.UTC), + }, + }, } for _, test := range tests { diff --git a/writer_go18_test.go b/writer_go18_test.go index 3233369..fb7a1ae 100644 --- a/writer_go18_test.go +++ b/writer_go18_test.go @@ -24,6 +24,8 @@ func BenchmarkGenericWriter(b *testing.B) { benchmarkGenericWriter[stringColumn](b) benchmarkGenericWriter[indexedStringColumn](b) benchmarkGenericWriter[uuidColumn](b) + benchmarkGenericWriter[timeColumn](b) + benchmarkGenericWriter[timeInMillisColumn](b) benchmarkGenericWriter[mapColumn](b) benchmarkGenericWriter[decimalColumn](b) benchmarkGenericWriter[contact](b)