diff --git a/.golangci.yml b/.golangci.yml index 871b5d9..1b6b4a9 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -35,6 +35,7 @@ linters: - gocyclo - goerr113 - gomnd + - gosmopolitan - ireturn - nestif - nlreturn diff --git a/README.md b/README.md index e6b6f12..53dce50 100644 --- a/README.md +++ b/README.md @@ -68,31 +68,33 @@ More examples in the [godoc](https://godoc.org/github.com/hamba/avro/v2). #### Types Conversions -| Avro | Go Struct | Go Interface | -|-------------------------|--------------------------------------------------------|--------------------------| -| `null` | `nil` | `nil` | -| `boolean` | `bool` | `bool` | -| `bytes` | `[]byte` | `[]byte` | -| `float` | `float32` | `float32` | -| `double` | `float64` | `float64` | -| `long` | `int64`, `uint32`\* | `int64`, `uint32` | -| `int` | `int`, `int32`, `int16`, `int8`, `uint8`\*, `uint16`\* | `int`, `uint8`, `uint16` | -| `fixed` | `uint64` | `uint64` | -| `string` | `string` | `string` | -| `array` | `[]T` | `[]any` | -| `enum` | `string` | `string` | -| `fixed` | `[n]byte` | `[n]byte` | -| `map` | `map[string]T{}` | `map[string]any` | -| `record` | `struct` | `map[string]any` | -| `union` | *see below* | *see below* | -| `int.date` | `time.Time` | `time.Time` | -| `int.time-millis` | `time.Duration` | `time.Duration` | -| `long.time-micros` | `time.Duration` | `time.Duration` | -| `long.timestamp-millis` | `time.Time` | `time.Time` | -| `long.timestamp-micros` | `time.Time` | `time.Time` | -| `bytes.decimal` | `*big.Rat` | `*big.Rat` | -| `fixed.decimal` | `*big.Rat` | `*big.Rat` | -| `string.uuid` | `string` | `string` | +| Avro | Go Struct | Go Interface | +|-------------------------------|--------------------------------------------------------|--------------------------| +| `null` | `nil` | `nil` | +| `boolean` | `bool` | `bool` | +| `bytes` | `[]byte` | `[]byte` | +| `float` | `float32` | `float32` | +| `double` | `float64` | `float64` | +| `long` | `int64`, `uint32`\* | `int64`, `uint32` | +| `int` | `int`, `int32`, `int16`, `int8`, `uint8`\*, `uint16`\* | `int`, `uint8`, `uint16` | +| `fixed` | `uint64` | `uint64` | +| `string` | `string` | `string` | +| `array` | `[]T` | `[]any` | +| `enum` | `string` | `string` | +| `fixed` | `[n]byte` | `[n]byte` | +| `map` | `map[string]T{}` | `map[string]any` | +| `record` | `struct` | `map[string]any` | +| `union` | *see below* | *see below* | +| `int.date` | `time.Time` | `time.Time` | +| `int.time-millis` | `time.Duration` | `time.Duration` | +| `long.time-micros` | `time.Duration` | `time.Duration` | +| `long.timestamp-millis` | `time.Time` | `time.Time` | +| `long.timestamp-micros` | `time.Time` | `time.Time` | +| `long.local-timestamp-millis` | `time.Time` | `time.Time` | +| `long.local-timestamp-micros` | `time.Time` | `time.Time` | +| `bytes.decimal` | `*big.Rat` | `*big.Rat` | +| `fixed.decimal` | `*big.Rat` | `*big.Rat` | +| `string.uuid` | `string` | `string` | \* Please note that when the Go type is an unsigned integer care must be taken to ensure that information is not lost when converting between the Avro type and Go type. For example, storing a *negative* number in Avro of `int = -100` diff --git a/codec_generic.go b/codec_generic.go index 4265288..2091659 100644 --- a/codec_generic.go +++ b/codec_generic.go @@ -24,7 +24,8 @@ func genericDecode(schema Schema, r *Reader) any { return nil } - // seems generic reader is not compatible with codec + // Generic reader returns a different result from the + // codec in the case of a big.Rat. Handle this. if rTyp.Type1() == ratType { dec := obj.(big.Rat) return &dec @@ -69,14 +70,18 @@ func genericReceiver(schema Schema) (unsafe.Pointer, reflect2.Type, error) { case TimeMicros: var v time.Duration return unsafe.Pointer(&v), reflect2.TypeOf(v), nil - case TimestampMillis: var v time.Time return unsafe.Pointer(&v), reflect2.TypeOf(v), nil - case TimestampMicros: var v time.Time return unsafe.Pointer(&v), reflect2.TypeOf(v), nil + case LocalTimestampMillis: + var v time.Time + return unsafe.Pointer(&v), reflect2.TypeOf(v), nil + case LocalTimestampMicros: + var v time.Time + return unsafe.Pointer(&v), reflect2.TypeOf(v), nil } } var v int64 diff --git a/codec_generic_internal_test.go b/codec_generic_internal_test.go index b04f198..772bc3f 100644 --- a/codec_generic_internal_test.go +++ b/codec_generic_internal_test.go @@ -3,7 +3,6 @@ package avro import ( "bytes" "math/big" - "strconv" "testing" "time" @@ -76,6 +75,20 @@ func TestGenericDecode(t *testing.T) { want: time.Date(2020, 1, 2, 3, 4, 5, 0, time.UTC), wantErr: require.NoError, }, + { + name: "Long Local-Timestamp-Millis", + data: []byte{0x90, 0xB2, 0xAE, 0xC3, 0xEC, 0x5B}, + schema: `{"type":"long","logicalType":"local-timestamp-millis"}`, + want: time.Date(2020, 1, 2, 3, 4, 5, 0, time.Local), + wantErr: require.NoError, + }, + { + name: "Long Local-Timestamp-Micros", + data: []byte{0x80, 0xCD, 0xB7, 0xA2, 0xEE, 0xC7, 0xCD, 0x05}, + schema: `{"type":"long","logicalType":"local-timestamp-micros"}`, + want: time.Date(2020, 1, 2, 3, 4, 5, 0, time.Local), + wantErr: require.NoError, + }, { name: "Float", data: []byte{0x33, 0x33, 0x93, 0x3F}, @@ -197,9 +210,9 @@ func TestGenericDecode(t *testing.T) { }, } - for i, test := range tests { + for _, test := range tests { test := test - t.Run(strconv.Itoa(i), func(t *testing.T) { + t.Run(test.name, func(t *testing.T) { schema := MustParse(test.schema) r := NewReader(bytes.NewReader(test.data), 10) diff --git a/codec_native.go b/codec_native.go index 04cdafe..b99fb21 100644 --- a/codec_native.go +++ b/codec_native.go @@ -10,6 +10,7 @@ import ( "github.com/modern-go/reflect2" ) +//nolint:maintidx // Splitting this would not make it simpler. func createDecoderOfNative(schema Schema, typ reflect2.Type) ValDecoder { converter := resolveConverter(schema.(*PrimitiveSchema).actual) @@ -120,24 +121,29 @@ func createDecoderOfNative(schema Schema, typ reflect2.Type) ValDecoder { st := schema.Type() ls := getLogicalSchema(schema) lt := getLogicalType(schema) - tpy1 := typ.Type1() - Istpy1Time := tpy1.ConvertibleTo(timeType) - Istpy1Rat := tpy1.ConvertibleTo(ratType) + isTime := typ.Type1().ConvertibleTo(timeType) switch { - case Istpy1Time && st == Int && lt == Date: + case isTime && st == Int && lt == Date: return &dateCodec{} - - case Istpy1Time && st == Long && lt == TimestampMillis: + case isTime && st == Long && lt == TimestampMillis: return ×tampMillisCodec{ convert: converter.toLong, } - - case Istpy1Time && st == Long && lt == TimestampMicros: + case isTime && st == Long && lt == TimestampMicros: return ×tampMicrosCodec{ convert: converter.toLong, } - - case Istpy1Rat && st == Bytes && lt == Decimal: + case isTime && st == Long && lt == LocalTimestampMillis: + return ×tampMillisCodec{ + local: true, + convert: converter.toLong, + } + case isTime && st == Long && lt == LocalTimestampMicros: + return ×tampMicrosCodec{ + local: true, + convert: converter.toLong, + } + case typ.Type1().ConvertibleTo(ratType) && st == Bytes && lt == Decimal: dec := ls.(*DecimalLogicalSchema) return &bytesDecimalCodec{ prec: dec.Precision(), scale: dec.Scale(), @@ -228,13 +234,10 @@ func createEncoderOfNative(schema Schema, typ reflect2.Type) ValEncoder { switch { case st == Int && lt == TimeMillis: // time.Duration return &timeMillisCodec{} - case st == Long && lt == TimeMicros: // time.Duration return &timeMicrosCodec{} - case st == Long: return &longCodec[int64]{} - default: break } @@ -243,7 +246,6 @@ func createEncoderOfNative(schema Schema, typ reflect2.Type) ValEncoder { switch schema.Type() { case Double: return &float32DoubleCodec{} - case Float: return &float32Codec{} } @@ -269,24 +271,22 @@ func createEncoderOfNative(schema Schema, typ reflect2.Type) ValEncoder { case reflect.Struct: st := schema.Type() lt := getLogicalType(schema) - tpy1 := typ.Type1() - Istpy1Time := tpy1.ConvertibleTo(timeType) - Istpy1Rat := tpy1.ConvertibleTo(ratType) + isTime := typ.Type1().ConvertibleTo(timeType) switch { - case Istpy1Time && st == Int && lt == Date: + case isTime && st == Int && lt == Date: return &dateCodec{} - case Istpy1Time && st == Long && lt == TimestampMillis: + case isTime && st == Long && lt == TimestampMillis: return ×tampMillisCodec{} - - case Istpy1Time && st == Long && lt == TimestampMicros: + case isTime && st == Long && lt == TimestampMicros: return ×tampMicrosCodec{} - - case Istpy1Rat && st != Bytes || lt == Decimal: + case isTime && st == Long && lt == LocalTimestampMillis: + return ×tampMillisCodec{local: true} + case isTime && st == Long && lt == LocalTimestampMicros: + return ×tampMicrosCodec{local: true} + case typ.Type1().ConvertibleTo(ratType) && st != Bytes || lt == Decimal: ls := getLogicalSchema(schema) dec := ls.(*DecimalLogicalSchema) - return &bytesDecimalCodec{prec: dec.Precision(), scale: dec.Scale()} - default: break } @@ -477,6 +477,7 @@ func (c *dateCodec) Encode(ptr unsafe.Pointer, w *Writer) { } type timestampMillisCodec struct { + local bool convert func(*Reader) int64 } @@ -489,15 +490,31 @@ func (c *timestampMillisCodec) Decode(ptr unsafe.Pointer, r *Reader) { } sec := i / 1e3 nsec := (i - sec*1e3) * 1e6 - *((*time.Time)(ptr)) = time.Unix(sec, nsec).UTC() + t := time.Unix(sec, nsec) + + if c.local { + // When doing unix time, Go will convert the time from UTC to Local, + // changing the time by the number of seconds in the zone offset. + // Remove those added seconds. + _, offset := t.Zone() + t = t.Add(time.Duration(-1*offset) * time.Second) + *((*time.Time)(ptr)) = t + return + } + *((*time.Time)(ptr)) = t.UTC() } func (c *timestampMillisCodec) Encode(ptr unsafe.Pointer, w *Writer) { t := *((*time.Time)(ptr)) + if c.local { + t = t.Local() + t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC) + } w.WriteLong(t.Unix()*1e3 + int64(t.Nanosecond()/1e6)) } type timestampMicrosCodec struct { + local bool convert func(*Reader) int64 } @@ -510,11 +527,26 @@ func (c *timestampMicrosCodec) Decode(ptr unsafe.Pointer, r *Reader) { } sec := i / 1e6 nsec := (i - sec*1e6) * 1e3 - *((*time.Time)(ptr)) = time.Unix(sec, nsec).UTC() + t := time.Unix(sec, nsec) + + if c.local { + // When doing unix time, Go will convert the time from UTC to Local, + // changing the time by the number of seconds in the zone offset. + // Remove those added seconds. + _, offset := t.Zone() + t = t.Add(time.Duration(-1*offset) * time.Second) + *((*time.Time)(ptr)) = t + return + } + *((*time.Time)(ptr)) = t.UTC() } func (c *timestampMicrosCodec) Encode(ptr unsafe.Pointer, w *Writer) { t := *((*time.Time)(ptr)) + if c.local { + t = t.Local() + t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC) + } w.WriteLong(t.Unix()*1e6 + int64(t.Nanosecond()/1e3)) } diff --git a/decoder_native_test.go b/decoder_native_test.go index 23e310e..0915c15 100644 --- a/decoder_native_test.go +++ b/decoder_native_test.go @@ -507,6 +507,96 @@ func TestDecoder_Time_TimestampMillisOneMicros(t *testing.T) { assert.Equal(t, time.Date(1970, 1, 1, 0, 0, 0, 1e3, time.UTC), got) } +func TestDecoder_Time_LocalTimestampMillis(t *testing.T) { + defer ConfigTeardown() + + data := []byte{0x90, 0xB2, 0xAE, 0xC3, 0xEC, 0x5B} + schema := `{"type":"long","logicalType":"local-timestamp-millis"}` + dec, err := avro.NewDecoder(schema, bytes.NewReader(data)) + require.NoError(t, err) + + var got time.Time + err = dec.Decode(&got) + + require.NoError(t, err) + assert.Equal(t, time.Date(2020, 1, 2, 3, 4, 5, 0, time.Local), got) +} + +func TestDecoder_Time_LocalTimestampMillisZero(t *testing.T) { + defer ConfigTeardown() + + data := []byte{0xff, 0xdf, 0xe6, 0xa2, 0xe2, 0xa0, 0x1c} + schema := `{"type":"long","logicalType":"local-timestamp-millis"}` + dec, err := avro.NewDecoder(schema, bytes.NewReader(data)) + require.NoError(t, err) + + var got time.Time + err = dec.Decode(&got) + + require.NoError(t, err) + assert.Equal(t, time.Date(1, 1, 1, 0, 0, 0, 0, time.Local), got) +} + +func TestDecoder_Time_LocalTimestampMillisOneMillis(t *testing.T) { + defer ConfigTeardown() + + data := []byte{0x02} + schema := `{"type":"long","logicalType":"local-timestamp-millis"}` + dec, err := avro.NewDecoder(schema, bytes.NewReader(data)) + require.NoError(t, err) + + var got time.Time + err = dec.Decode(&got) + + require.NoError(t, err) + assert.Equal(t, time.Date(1970, 1, 1, 0, 0, 0, 1e6, time.Local), got) +} + +func TestDecoder_Time_LocalTimestampMicros(t *testing.T) { + defer ConfigTeardown() + + data := []byte{0x80, 0xCD, 0xB7, 0xA2, 0xEE, 0xC7, 0xCD, 0x05} + schema := `{"type":"long","logicalType":"local-timestamp-micros"}` + dec, err := avro.NewDecoder(schema, bytes.NewReader(data)) + require.NoError(t, err) + + var got time.Time + err = dec.Decode(&got) + + require.NoError(t, err) + assert.Equal(t, time.Date(2020, 1, 2, 3, 4, 5, 0, time.Local), got) +} + +func TestDecoder_Time_LocalTimestampMicrosZero(t *testing.T) { + defer ConfigTeardown() + + data := []byte{0xff, 0xff, 0xdd, 0xf2, 0xdf, 0xff, 0xdf, 0xdc, 0x1} + schema := `{"type":"long","logicalType":"local-timestamp-micros"}` + dec, err := avro.NewDecoder(schema, bytes.NewReader(data)) + require.NoError(t, err) + + var got time.Time + err = dec.Decode(&got) + + require.NoError(t, err) + assert.Equal(t, time.Date(1, 1, 1, 0, 0, 0, 0, time.Local), got) +} + +func TestDecoder_Time_LocalTimestampMillisOneMicros(t *testing.T) { + defer ConfigTeardown() + + data := []byte{0x02} + schema := `{"type":"long","logicalType":"local-timestamp-micros"}` + dec, err := avro.NewDecoder(schema, bytes.NewReader(data)) + require.NoError(t, err) + + var got time.Time + err = dec.Decode(&got) + + require.NoError(t, err) + assert.Equal(t, time.Date(1970, 1, 1, 0, 0, 0, 1e3, time.Local), got) +} + func TestDecoder_TimeInvalidSchema(t *testing.T) { defer ConfigTeardown() diff --git a/encoder_native_test.go b/encoder_native_test.go index a2f9858..f201e06 100644 --- a/encoder_native_test.go +++ b/encoder_native_test.go @@ -501,6 +501,90 @@ func TestEncoder_Time_TimestampMillisOneMicros(t *testing.T) { assert.Equal(t, []byte{0x2}, buf.Bytes()) } +func TestEncoder_Time_LocalTimestampMillis(t *testing.T) { + defer ConfigTeardown() + + schema := `{"type":"long","logicalType":"local-timestamp-millis"}` + buf := bytes.NewBuffer([]byte{}) + enc, err := avro.NewEncoder(schema, buf) + require.NoError(t, err) + + err = enc.Encode(time.Date(2020, 1, 2, 3, 4, 5, 6, time.Local)) + + require.NoError(t, err) + assert.Equal(t, []byte{0x90, 0xB2, 0xAE, 0xC3, 0xEC, 0x5B}, buf.Bytes()) +} + +func TestEncoder_Time_LocalTimestampMillisZero(t *testing.T) { + defer ConfigTeardown() + + schema := `{"type":"long","logicalType":"local-timestamp-millis"}` + buf := bytes.NewBuffer([]byte{}) + enc, err := avro.NewEncoder(schema, buf) + require.NoError(t, err) + + err = enc.Encode(time.Date(1, 1, 1, 0, 0, 0, 0, time.Local)) + + require.NoError(t, err) + assert.Equal(t, []byte{0xff, 0xdf, 0xe6, 0xa2, 0xe2, 0xa0, 0x1c}, buf.Bytes()) +} + +func TestEncoder_Time_LocalTimestampMillisOneMillis(t *testing.T) { + defer ConfigTeardown() + + schema := `{"type":"long","logicalType":"local-timestamp-millis"}` + buf := bytes.NewBuffer([]byte{}) + enc, err := avro.NewEncoder(schema, buf) + require.NoError(t, err) + + err = enc.Encode(time.Date(1970, 1, 1, 0, 0, 0, 1e6, time.Local)) + + require.NoError(t, err) + assert.Equal(t, []byte{0x2}, buf.Bytes()) +} + +func TestEncoder_Time_LocalTimestampMicros(t *testing.T) { + defer ConfigTeardown() + + schema := `{"type":"long","logicalType":"local-timestamp-micros"}` + buf := bytes.NewBuffer([]byte{}) + enc, err := avro.NewEncoder(schema, buf) + require.NoError(t, err) + + err = enc.Encode(time.Date(2020, 1, 2, 3, 4, 5, 6, time.Local)) + + require.NoError(t, err) + assert.Equal(t, []byte{0x80, 0xCD, 0xB7, 0xA2, 0xEE, 0xC7, 0xCD, 0x05}, buf.Bytes()) +} + +func TestEncoder_Time_LocalTimestampMicrosZero(t *testing.T) { + defer ConfigTeardown() + + schema := `{"type":"long","logicalType":"local-timestamp-micros"}` + buf := bytes.NewBuffer([]byte{}) + enc, err := avro.NewEncoder(schema, buf) + require.NoError(t, err) + + err = enc.Encode(time.Date(1, 1, 1, 0, 0, 0, 0, time.Local)) + + require.NoError(t, err) + assert.Equal(t, []byte{0xff, 0xff, 0xdd, 0xf2, 0xdf, 0xff, 0xdf, 0xdc, 0x1}, buf.Bytes()) +} + +func TestEncoder_Time_LocalTimestampMicrosOneMicros(t *testing.T) { + defer ConfigTeardown() + + schema := `{"type":"long","logicalType":"local-timestamp-micros"}` + buf := bytes.NewBuffer([]byte{}) + enc, err := avro.NewEncoder(schema, buf) + require.NoError(t, err) + + err = enc.Encode(time.Date(1970, 1, 1, 0, 0, 0, 1e3, time.Local)) + + require.NoError(t, err) + assert.Equal(t, []byte{0x2}, buf.Bytes()) +} + func TestEncoder_TimeInvalidSchema(t *testing.T) { defer ConfigTeardown() diff --git a/schema.go b/schema.go index 243598b..4dba5e5 100644 --- a/schema.go +++ b/schema.go @@ -65,14 +65,16 @@ type LogicalType string // Schema logical type constants. const ( - Decimal LogicalType = "decimal" - UUID LogicalType = "uuid" - Date LogicalType = "date" - TimeMillis LogicalType = "time-millis" - TimeMicros LogicalType = "time-micros" - TimestampMillis LogicalType = "timestamp-millis" - TimestampMicros LogicalType = "timestamp-micros" - Duration LogicalType = "duration" + Decimal LogicalType = "decimal" + UUID LogicalType = "uuid" + Date LogicalType = "date" + TimeMillis LogicalType = "time-millis" + TimeMicros LogicalType = "time-micros" + TimestampMillis LogicalType = "timestamp-millis" + TimestampMicros LogicalType = "timestamp-micros" + LocalTimestampMillis LogicalType = "local-timestamp-millis" + LocalTimestampMicros LogicalType = "local-timestamp-micros" + Duration LogicalType = "duration" ) func isNative(typ Type) bool { diff --git a/schema_parse.go b/schema_parse.go index 4f5622a..2be5a37 100644 --- a/schema_parse.go +++ b/schema_parse.go @@ -185,7 +185,9 @@ func parsePrimitiveLogicalType(typ Type, lt string, prec, scale int) LogicalSche (typ == Int && ltyp == TimeMillis) || (typ == Long && ltyp == TimeMicros) || (typ == Long && ltyp == TimestampMillis) || - (typ == Long && ltyp == TimestampMicros) { + (typ == Long && ltyp == TimestampMicros) || + (typ == Long && ltyp == LocalTimestampMillis) || + (typ == Long && ltyp == LocalTimestampMicros) { return NewPrimitiveLogicalSchema(ltyp) } diff --git a/schema_test.go b/schema_test.go index 8d850c6..98a8966 100644 --- a/schema_test.go +++ b/schema_test.go @@ -1002,6 +1002,20 @@ func TestSchema_LogicalTypes(t *testing.T) { wantLogical: true, wantLogicalType: avro.TimestampMicros, }, + { + name: "Local Timestamp Millis", + schema: `{"type": "long", "logicalType": "local-timestamp-millis"}`, + wantType: avro.Long, + wantLogical: true, + wantLogicalType: avro.LocalTimestampMillis, + }, + { + name: "Local Timestamp Micros", + schema: `{"type": "long", "logicalType": "local-timestamp-micros"}`, + wantType: avro.Long, + wantLogical: true, + wantLogicalType: avro.LocalTimestampMicros, + }, { name: "UUID", schema: `{"type": "string", "logicalType": "uuid"}`,