Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support local timestamp logic types #345

Merged
merged 2 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ linters:
- gocyclo
- goerr113
- gomnd
- gosmopolitan
- ireturn
- nestif
- nlreturn
Expand Down
52 changes: 27 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,31 +68,33 @@ More examples in the [godoc](https://godoc.org/github.com/hamba/avro/v2).

#### Types Conversions

| Avro | Go Struct | Go Interface |
|-------------------------|--------------------------------------------------------|--------------------------|
| `null` | `nil` | `nil` |
| `boolean` | `bool` | `bool` |
| `bytes` | `[]byte` | `[]byte` |
| `float` | `float32` | `float32` |
| `double` | `float64` | `float64` |
| `long` | `int64`, `uint32`\* | `int64`, `uint32` |
| `int` | `int`, `int32`, `int16`, `int8`, `uint8`\*, `uint16`\* | `int`, `uint8`, `uint16` |
| `fixed` | `uint64` | `uint64` |
| `string` | `string` | `string` |
| `array` | `[]T` | `[]any` |
| `enum` | `string` | `string` |
| `fixed` | `[n]byte` | `[n]byte` |
| `map` | `map[string]T{}` | `map[string]any` |
| `record` | `struct` | `map[string]any` |
| `union` | *see below* | *see below* |
| `int.date` | `time.Time` | `time.Time` |
| `int.time-millis` | `time.Duration` | `time.Duration` |
| `long.time-micros` | `time.Duration` | `time.Duration` |
| `long.timestamp-millis` | `time.Time` | `time.Time` |
| `long.timestamp-micros` | `time.Time` | `time.Time` |
| `bytes.decimal` | `*big.Rat` | `*big.Rat` |
| `fixed.decimal` | `*big.Rat` | `*big.Rat` |
| `string.uuid` | `string` | `string` |
| Avro | Go Struct | Go Interface |
|-------------------------------|--------------------------------------------------------|--------------------------|
| `null` | `nil` | `nil` |
| `boolean` | `bool` | `bool` |
| `bytes` | `[]byte` | `[]byte` |
| `float` | `float32` | `float32` |
| `double` | `float64` | `float64` |
| `long` | `int64`, `uint32`\* | `int64`, `uint32` |
| `int` | `int`, `int32`, `int16`, `int8`, `uint8`\*, `uint16`\* | `int`, `uint8`, `uint16` |
| `fixed` | `uint64` | `uint64` |
| `string` | `string` | `string` |
| `array` | `[]T` | `[]any` |
| `enum` | `string` | `string` |
| `fixed` | `[n]byte` | `[n]byte` |
| `map` | `map[string]T{}` | `map[string]any` |
| `record` | `struct` | `map[string]any` |
| `union` | *see below* | *see below* |
| `int.date` | `time.Time` | `time.Time` |
| `int.time-millis` | `time.Duration` | `time.Duration` |
| `long.time-micros` | `time.Duration` | `time.Duration` |
| `long.timestamp-millis` | `time.Time` | `time.Time` |
| `long.timestamp-micros` | `time.Time` | `time.Time` |
| `long.local-timestamp-millis` | `time.Time` | `time.Time` |
| `long.local-timestamp-micros` | `time.Time` | `time.Time` |
| `bytes.decimal` | `*big.Rat` | `*big.Rat` |
| `fixed.decimal` | `*big.Rat` | `*big.Rat` |
| `string.uuid` | `string` | `string` |

\* Please note that when the Go type is an unsigned integer care must be taken to ensure that information is not lost
when converting between the Avro type and Go type. For example, storing a *negative* number in Avro of `int = -100`
Expand Down
11 changes: 8 additions & 3 deletions codec_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ func genericDecode(schema Schema, r *Reader) any {
return nil
}

// seems generic reader is not compatible with codec
// Generic reader returns a different result from the
// codec in the case of a big.Rat. Handle this.
if rTyp.Type1() == ratType {
dec := obj.(big.Rat)
return &dec
Expand Down Expand Up @@ -69,14 +70,18 @@ func genericReceiver(schema Schema) (unsafe.Pointer, reflect2.Type, error) {
case TimeMicros:
var v time.Duration
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil

case TimestampMillis:
var v time.Time
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil

case TimestampMicros:
var v time.Time
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
case LocalTimestampMillis:
var v time.Time
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
case LocalTimestampMicros:
var v time.Time
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
}
}
var v int64
Expand Down
19 changes: 16 additions & 3 deletions codec_generic_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package avro
import (
"bytes"
"math/big"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -76,6 +75,20 @@ func TestGenericDecode(t *testing.T) {
want: time.Date(2020, 1, 2, 3, 4, 5, 0, time.UTC),
wantErr: require.NoError,
},
{
name: "Long Local-Timestamp-Millis",
data: []byte{0x90, 0xB2, 0xAE, 0xC3, 0xEC, 0x5B},
schema: `{"type":"long","logicalType":"local-timestamp-millis"}`,
want: time.Date(2020, 1, 2, 3, 4, 5, 0, time.Local),
wantErr: require.NoError,
},
{
name: "Long Local-Timestamp-Micros",
data: []byte{0x80, 0xCD, 0xB7, 0xA2, 0xEE, 0xC7, 0xCD, 0x05},
schema: `{"type":"long","logicalType":"local-timestamp-micros"}`,
want: time.Date(2020, 1, 2, 3, 4, 5, 0, time.Local),
wantErr: require.NoError,
},
{
name: "Float",
data: []byte{0x33, 0x33, 0x93, 0x3F},
Expand Down Expand Up @@ -197,9 +210,9 @@ func TestGenericDecode(t *testing.T) {
},
}

for i, test := range tests {
for _, test := range tests {
test := test
t.Run(strconv.Itoa(i), func(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
schema := MustParse(test.schema)
r := NewReader(bytes.NewReader(test.data), 10)

Expand Down
86 changes: 59 additions & 27 deletions codec_native.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/modern-go/reflect2"
)

//nolint:maintidx // Splitting this would not make it simpler.
func createDecoderOfNative(schema Schema, typ reflect2.Type) ValDecoder {
converter := resolveConverter(schema.(*PrimitiveSchema).actual)

Expand Down Expand Up @@ -120,24 +121,29 @@ func createDecoderOfNative(schema Schema, typ reflect2.Type) ValDecoder {
st := schema.Type()
ls := getLogicalSchema(schema)
lt := getLogicalType(schema)
tpy1 := typ.Type1()
Istpy1Time := tpy1.ConvertibleTo(timeType)
Istpy1Rat := tpy1.ConvertibleTo(ratType)
isTime := typ.Type1().ConvertibleTo(timeType)
switch {
case Istpy1Time && st == Int && lt == Date:
case isTime && st == Int && lt == Date:
return &dateCodec{}

case Istpy1Time && st == Long && lt == TimestampMillis:
case isTime && st == Long && lt == TimestampMillis:
return &timestampMillisCodec{
convert: converter.toLong,
}

case Istpy1Time && st == Long && lt == TimestampMicros:
case isTime && st == Long && lt == TimestampMicros:
return &timestampMicrosCodec{
convert: converter.toLong,
}

case Istpy1Rat && st == Bytes && lt == Decimal:
case isTime && st == Long && lt == LocalTimestampMillis:
return &timestampMillisCodec{
local: true,
convert: converter.toLong,
}
case isTime && st == Long && lt == LocalTimestampMicros:
return &timestampMicrosCodec{
local: true,
convert: converter.toLong,
}
case typ.Type1().ConvertibleTo(ratType) && st == Bytes && lt == Decimal:
dec := ls.(*DecimalLogicalSchema)
return &bytesDecimalCodec{
prec: dec.Precision(), scale: dec.Scale(),
Expand Down Expand Up @@ -228,13 +234,10 @@ func createEncoderOfNative(schema Schema, typ reflect2.Type) ValEncoder {
switch {
case st == Int && lt == TimeMillis: // time.Duration
return &timeMillisCodec{}

case st == Long && lt == TimeMicros: // time.Duration
return &timeMicrosCodec{}

case st == Long:
return &longCodec[int64]{}

default:
break
}
Expand All @@ -243,7 +246,6 @@ func createEncoderOfNative(schema Schema, typ reflect2.Type) ValEncoder {
switch schema.Type() {
case Double:
return &float32DoubleCodec{}

case Float:
return &float32Codec{}
}
Expand All @@ -269,24 +271,22 @@ func createEncoderOfNative(schema Schema, typ reflect2.Type) ValEncoder {
case reflect.Struct:
st := schema.Type()
lt := getLogicalType(schema)
tpy1 := typ.Type1()
Istpy1Time := tpy1.ConvertibleTo(timeType)
Istpy1Rat := tpy1.ConvertibleTo(ratType)
isTime := typ.Type1().ConvertibleTo(timeType)
switch {
case Istpy1Time && st == Int && lt == Date:
case isTime && st == Int && lt == Date:
return &dateCodec{}
case Istpy1Time && st == Long && lt == TimestampMillis:
case isTime && st == Long && lt == TimestampMillis:
return &timestampMillisCodec{}

case Istpy1Time && st == Long && lt == TimestampMicros:
case isTime && st == Long && lt == TimestampMicros:
return &timestampMicrosCodec{}

case Istpy1Rat && st != Bytes || lt == Decimal:
case isTime && st == Long && lt == LocalTimestampMillis:
return &timestampMillisCodec{local: true}
case isTime && st == Long && lt == LocalTimestampMicros:
return &timestampMicrosCodec{local: true}
case typ.Type1().ConvertibleTo(ratType) && st != Bytes || lt == Decimal:
ls := getLogicalSchema(schema)
dec := ls.(*DecimalLogicalSchema)

return &bytesDecimalCodec{prec: dec.Precision(), scale: dec.Scale()}

default:
break
}
Expand Down Expand Up @@ -477,6 +477,7 @@ func (c *dateCodec) Encode(ptr unsafe.Pointer, w *Writer) {
}

type timestampMillisCodec struct {
local bool
convert func(*Reader) int64
}

Expand All @@ -489,15 +490,31 @@ func (c *timestampMillisCodec) Decode(ptr unsafe.Pointer, r *Reader) {
}
sec := i / 1e3
nsec := (i - sec*1e3) * 1e6
*((*time.Time)(ptr)) = time.Unix(sec, nsec).UTC()
t := time.Unix(sec, nsec)

if c.local {
// When doing unix time, Go will convert the time from UTC to Local,
// changing the time by the number of seconds in the zone offset.
// Remove those added seconds.
_, offset := t.Zone()
t = t.Add(time.Duration(-1*offset) * time.Second)
*((*time.Time)(ptr)) = t
return
}
*((*time.Time)(ptr)) = t.UTC()
}

func (c *timestampMillisCodec) Encode(ptr unsafe.Pointer, w *Writer) {
t := *((*time.Time)(ptr))
if c.local {
t = t.Local()
t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
}
w.WriteLong(t.Unix()*1e3 + int64(t.Nanosecond()/1e6))
}

type timestampMicrosCodec struct {
local bool
convert func(*Reader) int64
}

Expand All @@ -510,11 +527,26 @@ func (c *timestampMicrosCodec) Decode(ptr unsafe.Pointer, r *Reader) {
}
sec := i / 1e6
nsec := (i - sec*1e6) * 1e3
*((*time.Time)(ptr)) = time.Unix(sec, nsec).UTC()
t := time.Unix(sec, nsec)

if c.local {
// When doing unix time, Go will convert the time from UTC to Local,
// changing the time by the number of seconds in the zone offset.
// Remove those added seconds.
_, offset := t.Zone()
t = t.Add(time.Duration(-1*offset) * time.Second)
*((*time.Time)(ptr)) = t
return
}
*((*time.Time)(ptr)) = t.UTC()
}

func (c *timestampMicrosCodec) Encode(ptr unsafe.Pointer, w *Writer) {
t := *((*time.Time)(ptr))
if c.local {
t = t.Local()
t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
}
w.WriteLong(t.Unix()*1e6 + int64(t.Nanosecond()/1e3))
}

Expand Down
Loading
Loading