Skip to content

Commit

Permalink
Serialize time.Time as a timestamp (#321)
Browse files Browse the repository at this point in the history
Serialize time.Time values as Parquet timestamps. The default unit is NANOS and can be changed using the timestamp() struct tag.

type timeColumn struct {
	t1 time.Time
	t2 time.Time `parquet:",timestamp(millisecond)"`
}
  • Loading branch information
Larry Marburger authored Nov 14, 2022
1 parent 4b709ae commit 6c2e5f4
Show file tree
Hide file tree
Showing 14 changed files with 307 additions and 11 deletions.
4 changes: 4 additions & 0 deletions buffer_go18_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func TestGenericBuffer(t *testing.T) {
testGenericBuffer[stringColumn](t)
testGenericBuffer[indexedStringColumn](t)
testGenericBuffer[uuidColumn](t)
testGenericBuffer[timeColumn](t)
testGenericBuffer[timeInMillisColumn](t)
testGenericBuffer[mapColumn](t)
testGenericBuffer[decimalColumn](t)
testGenericBuffer[addressBook](t)
Expand Down Expand Up @@ -108,6 +110,8 @@ func BenchmarkGenericBuffer(b *testing.B) {
benchmarkGenericBuffer[stringColumn](b)
benchmarkGenericBuffer[indexedStringColumn](b)
benchmarkGenericBuffer[uuidColumn](b)
benchmarkGenericBuffer[timeColumn](b)
benchmarkGenericBuffer[timeInMillisColumn](b)
benchmarkGenericBuffer[mapColumn](b)
benchmarkGenericBuffer[decimalColumn](b)
benchmarkGenericBuffer[contact](b)
Expand Down
43 changes: 43 additions & 0 deletions column_buffer_go18.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package parquet
import (
"math/bits"
"reflect"
"time"
"unsafe"

"github.com/segmentio/parquet-go/deprecated"
Expand All @@ -30,6 +31,8 @@ func writeRowsFuncOf(t reflect.Type, schema *Schema, path columnPath) writeRowsF
switch t {
case reflect.TypeOf(deprecated.Int96{}):
return writeRowsFuncOfRequired(t, schema, path)
case reflect.TypeOf(time.Time{}):
return writeRowsFuncOfTime(t, schema, path)
}

switch t.Kind() {
Expand Down Expand Up @@ -393,3 +396,43 @@ func writeRowsFuncOfMap(t reflect.Type, schema *Schema, path columnPath) writeRo
return nil
}
}

func writeRowsFuncOfTime(_ reflect.Type, schema *Schema, path columnPath) writeRowsFunc {
t := reflect.TypeOf(int64(0))
elemSize := uintptr(t.Size())
writeRows := writeRowsFuncOf(t, schema, path)

col, _ := schema.Lookup(path...)
unit := Nanosecond.TimeUnit()
lt := col.Node.Type().LogicalType()
if lt != nil && lt.Timestamp != nil {
unit = lt.Timestamp.Unit
}

return func(columns []ColumnBuffer, rows sparse.Array, levels columnLevels) error {
if rows.Len() == 0 {
return writeRows(columns, rows, levels)
}

times := rows.TimeArray()
for i := 0; i < times.Len(); i++ {
t := times.Index(i)
var val int64
switch {
case unit.Millis != nil:
val = t.UnixMilli()
case unit.Micros != nil:
val = t.UnixMicro()
default:
val = t.UnixNano()
}

a := makeArray(unsafecast.PointerOfValue(reflect.ValueOf(val)), 1, elemSize)
if err := writeRows(columns, a, levels); err != nil {
return err
}
}

return nil
}
}
89 changes: 89 additions & 0 deletions convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package parquet_test
import (
"reflect"
"testing"
"time"

"github.com/segmentio/parquet-go"
)
Expand Down Expand Up @@ -309,3 +310,91 @@ func TestConvert(t *testing.T) {

func newInt64(i int64) *int64 { return &i }
func newString(s string) *string { return &s }

func TestConvertTimestamp(t *testing.T) {
now := time.Unix(42, 0)
ms := now.UnixMilli()
us := now.UnixMicro()
ns := now.UnixNano()

msType := parquet.Timestamp(parquet.Millisecond).Type()
msVal := parquet.ValueOf(ms)
if msVal.Int64() != ms {
t.Errorf("converted value mismatch:\nwant = %+v\ngot = %+v", ms, msVal.Int64())
}

usType := parquet.Timestamp(parquet.Microsecond).Type()
usVal := parquet.ValueOf(us)
if usVal.Int64() != us {
t.Errorf("converted value mismatch:\nwant = %+v\ngot = %+v", us, usVal.Int64())
}

nsType := parquet.Timestamp(parquet.Nanosecond).Type()
nsVal := parquet.ValueOf(ns)
if nsVal.Int64() != ns {
t.Errorf("converted value mismatch:\nwant = %+v\ngot = %+v", ns, nsVal.Int64())
}

var timestampConversionTests = [...]struct {
scenario string
fromType parquet.Type
fromValue parquet.Value
toType parquet.Type
expected int64
}{
{
scenario: "micros to nanos",
fromType: usType,
fromValue: usVal,
toType: nsType,
expected: ns,
},
{
scenario: "millis to nanos",
fromType: msType,
fromValue: msVal,
toType: nsType,
expected: ns,
},
{
scenario: "nanos to micros",
fromType: nsType,
fromValue: nsVal,
toType: usType,
expected: us,
},
{
scenario: "nanos to nanos",
fromType: nsType,
fromValue: nsVal,
toType: nsType,
expected: ns,
},
{
scenario: "int64 to nanos",
fromType: parquet.Int64Type,
fromValue: nsVal,
toType: nsType,
expected: ns,
},
{
scenario: "int64 to int64",
fromType: parquet.Int64Type,
fromValue: nsVal,
toType: parquet.Int64Type,
expected: ns,
},
}

for _, test := range timestampConversionTests {
t.Run(test.scenario, func(t *testing.T) {
a, err := test.toType.ConvertValue(test.fromValue, test.fromType)
if err != nil {
t.Fatal(err)
}
if a.Int64() != test.expected {
t.Errorf("converted value mismatch:\nwant = %+v\ngot = %+v", test.expected, a.Int64())
}
})
}
}
13 changes: 13 additions & 0 deletions internal/quick/quick.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math/rand"
"reflect"
"strings"
"time"
)

var DefaultConfig = Config{
Expand Down Expand Up @@ -70,6 +71,18 @@ func (c *Config) Check(f interface{}) error {
type MakeValueFunc func(reflect.Value, *rand.Rand)

func MakeValueFuncOf(t reflect.Type) MakeValueFunc {
switch t {
case reflect.TypeOf(time.Time{}):
return func(v reflect.Value, r *rand.Rand) {
// TODO: This is a hack to support the matching of times in a precision
// other than nanosecond by generating times rounded to the second. A
// better solution would be to update columns types to add a compare
// function.
sec := r.Int63n(2524608000) // 2050-01-01
v.Set(reflect.ValueOf(time.Unix(sec, 0).UTC()))
}
}

switch t.Kind() {
case reflect.Bool:
return func(v reflect.Value, r *rand.Rand) {
Expand Down
18 changes: 18 additions & 0 deletions parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,24 @@ func (row uuidColumn) generate(prng *rand.Rand) uuidColumn {
return row
}

type timeColumn struct {
Value time.Time
}

func (row timeColumn) generate(prng *rand.Rand) timeColumn {
t := time.Unix(0, prng.Int63()).UTC()
return timeColumn{Value: t}
}

type timeInMillisColumn struct {
Value time.Time `parquet:",timestamp(millisecond)"`
}

func (row timeInMillisColumn) generate(prng *rand.Rand) timeInMillisColumn {
t := time.Unix(0, prng.Int63()).UTC()
return timeInMillisColumn{Value: t}
}

type decimalColumn struct {
Value int64 `parquet:",decimal(0:3)"`
}
Expand Down
4 changes: 4 additions & 0 deletions reader_go18_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func TestGenericReader(t *testing.T) {
testGenericReader[stringColumn](t)
testGenericReader[indexedStringColumn](t)
testGenericReader[uuidColumn](t)
testGenericReader[timeColumn](t)
testGenericReader[timeInMillisColumn](t)
testGenericReader[mapColumn](t)
testGenericReader[decimalColumn](t)
testGenericReader[addressBook](t)
Expand Down Expand Up @@ -98,6 +100,8 @@ func BenchmarkGenericReader(b *testing.B) {
benchmarkGenericReader[stringColumn](b)
benchmarkGenericReader[indexedStringColumn](b)
benchmarkGenericReader[uuidColumn](b)
benchmarkGenericReader[timeColumn](b)
benchmarkGenericReader[timeInMillisColumn](b)
benchmarkGenericReader[mapColumn](b)
benchmarkGenericReader[decimalColumn](b)
benchmarkGenericReader[contact](b)
Expand Down
10 changes: 10 additions & 0 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ var readerTests = []struct {
model: uuidColumn{},
},

{
scenario: "time.Time",
model: timeColumn{},
},

{
scenario: "time.Time in ms",
model: timeInMillisColumn{},
},

{
scenario: "DECIMAL",
model: decimalColumn{},
Expand Down
6 changes: 4 additions & 2 deletions row.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,13 +499,15 @@ func deconstructFuncOfLeaf(columnIndex int16, node Node) (int16, deconstructFunc
if columnIndex > MaxColumnIndex {
panic("row cannot be deconstructed because it has more than 127 columns")
}
kind := node.Type().Kind()
typ := node.Type()
kind := typ.Kind()
lt := typ.LogicalType()
valueColumnIndex := ^columnIndex
return columnIndex + 1, func(row Row, levels levels, value reflect.Value) Row {
v := Value{}

if value.IsValid() {
v = makeValue(kind, value)
v = makeValue(kind, lt, value)
}

v.repetitionLevel = levels.repetitionLevel
Expand Down
14 changes: 13 additions & 1 deletion schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/google/uuid"
"github.com/segmentio/parquet-go/compress"
Expand Down Expand Up @@ -488,6 +489,8 @@ func nodeOf(t reflect.Type, tag []string) Node {
return Leaf(Int96Type)
case reflect.TypeOf(uuid.UUID{}):
return UUID()
case reflect.TypeOf(time.Time{}):
return Timestamp(Nanosecond)
}

var n Node
Expand Down Expand Up @@ -823,7 +826,16 @@ func makeNodeOf(t reflect.Type, name string, tag []string) Node {
}
setNode(Timestamp(timeUnit))
default:
throwInvalidTag(t, name, option)
switch t {
case reflect.TypeOf(time.Time{}):
timeUnit, err := parseTimestampArgs(args)
if err != nil {
throwInvalidTag(t, name, option)
}
setNode(Timestamp(timeUnit))
default:
throwInvalidTag(t, name, option)
}
}
default:
throwUnknownTag(t, name, option)
Expand Down
22 changes: 21 additions & 1 deletion sparse/array.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package sparse

import "unsafe"
import (
"time"
"unsafe"
)

type Array struct{ array }

Expand All @@ -25,6 +28,7 @@ func (a Array) Uint32Array() Uint32Array { return Uint32Array{a.array} }
func (a Array) Uint64Array() Uint64Array { return Uint64Array{a.array} }
func (a Array) Uint128Array() Uint128Array { return Uint128Array{a.array} }
func (a Array) StringArray() StringArray { return StringArray{a.array} }
func (a Array) TimeArray() TimeArray { return TimeArray{a.array} }

type array struct {
ptr unsafe.Pointer
Expand Down Expand Up @@ -290,3 +294,19 @@ func (a StringArray) Len() int { return int(a.len) }
func (a StringArray) Index(i int) string { return *(*string)(a.index(i)) }
func (a StringArray) Slice(i, j int) StringArray { return StringArray{a.slice(i, j)} }
func (a StringArray) UnsafeArray() Array { return Array{a.array} }

type TimeArray struct{ array }

func MakeTimeArray(values []time.Time) TimeArray {
const sizeOfTime = unsafe.Sizeof(time.Time{})
return TimeArray{makeArray(*(*unsafe.Pointer)(unsafe.Pointer(&values)), uintptr(len(values)), sizeOfTime)}
}

func UnsafeTimeArray(base unsafe.Pointer, length int, offset uintptr) TimeArray {
return TimeArray{makeArray(base, uintptr(length), offset)}
}

func (a TimeArray) Len() int { return int(a.len) }
func (a TimeArray) Index(i int) time.Time { return *(*time.Time)(a.index(i)) }
func (a TimeArray) Slice(i, j int) TimeArray { return TimeArray{a.slice(i, j)} }
func (a TimeArray) UnsafeArray() Array { return Array{a.array} }
Loading

0 comments on commit 6c2e5f4

Please sign in to comment.