Skip to content

Commit

Permalink
support second, millisecond and microsecond precision for arrow batch…
Browse files Browse the repository at this point in the history
… timestamp (#1042)
  • Loading branch information
Yifeng-Sigma authored Feb 1, 2024
1 parent 385c856 commit 8158364
Show file tree
Hide file tree
Showing 5 changed files with 498 additions and 100 deletions.
4 changes: 2 additions & 2 deletions cmd/arrow/batches/arrow_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ func main() {
}

ctx :=
sf.WithOriginalTimestamp(
sf.WithArrowBatchesTimestampOption(
sf.WithArrowAllocator(
sf.WithArrowBatches(context.Background()), memory.DefaultAllocator))
sf.WithArrowBatches(context.Background()), memory.DefaultAllocator), sf.UseOriginalTimestamp)

query := "SELECT SEQ4(), 'example ' || (SEQ4() * 2), " +
" TO_TIMESTAMP_NTZ('9999-01-01 13:13:13.' || LPAD(SEQ4(),9,'0')) ltz " +
Expand Down
92 changes: 72 additions & 20 deletions converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ const (
TimeType
)

type snowflakeArrowBatchesTimestampOption int

const (
// UseNanosecondTimestamp uses arrow.Timestamp in nanosecond precision, could cause ErrTooHighTimestampPrecision if arrow.Timestamp cannot fit original timestamp values.
UseNanosecondTimestamp snowflakeArrowBatchesTimestampOption = iota
// UseMicrosecondTimestamp uses arrow.Timestamp in microsecond precision
UseMicrosecondTimestamp
// UseMillisecondTimestamp uses arrow.Timestamp in millisecond precision
UseMillisecondTimestamp
// UseSecondTimestamp uses arrow.Timestamp in second precision
UseSecondTimestamp
// UseOriginalTimestamp uses original timestamp struct returned by Snowflake. It can be used in case arrow.Timestamp cannot fit original timestamp values.
UseOriginalTimestamp
)

type interfaceArrayBinding struct {
hasTimezone bool
tzType timezoneType
Expand Down Expand Up @@ -968,19 +983,22 @@ func higherPrecisionEnabled(ctx context.Context) bool {
return ok && d
}

func originalTimestampEnabled(ctx context.Context) bool {
v := ctx.Value(enableOriginalTimestamp)
func getArrowBatchesTimestampOption(ctx context.Context) snowflakeArrowBatchesTimestampOption {
v := ctx.Value(arrowBatchesTimestampOption)
if v == nil {
return false
return UseNanosecondTimestamp
}
d, ok := v.(bool)
return ok && d
o, ok := v.(snowflakeArrowBatchesTimestampOption)
if !ok {
return UseNanosecondTimestamp
}
return o
}

func arrowToRecord(ctx context.Context, record arrow.Record, pool memory.Allocator, rowType []execResponseRowType, loc *time.Location) (arrow.Record, error) {
useOriginalTimestamp := originalTimestampEnabled(ctx)
arrowBatchesTimestampOption := getArrowBatchesTimestampOption(ctx)

s, err := recordToSchema(record.Schema(), rowType, loc, useOriginalTimestamp)
s, err := recordToSchema(record.Schema(), rowType, loc, arrowBatchesTimestampOption)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1029,27 +1047,49 @@ func arrowToRecord(ctx context.Context, record arrow.Record, pool memory.Allocat
}
defer newCol.Release()
case timestampNtzType, timestampLtzType, timestampTzType:
if useOriginalTimestamp {
if arrowBatchesTimestampOption == UseOriginalTimestamp {
// do nothing - return timestamp as is
} else {
var unit arrow.TimeUnit
switch arrowBatchesTimestampOption {
case UseMicrosecondTimestamp:
unit = arrow.Microsecond
case UseMillisecondTimestamp:
unit = arrow.Millisecond
case UseSecondTimestamp:
unit = arrow.Second
case UseNanosecondTimestamp:
unit = arrow.Nanosecond
}
var tb *array.TimestampBuilder
if snowflakeType == timestampLtzType {
tb = array.NewTimestampBuilder(pool, &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: loc.String()})
tb = array.NewTimestampBuilder(pool, &arrow.TimestampType{Unit: unit, TimeZone: loc.String()})
} else {
tb = array.NewTimestampBuilder(pool, &arrow.TimestampType{Unit: arrow.Nanosecond})
tb = array.NewTimestampBuilder(pool, &arrow.TimestampType{Unit: unit})
}
defer tb.Release()

for i := 0; i < int(numRows); i++ {
ts := arrowSnowflakeTimestampToTime(col, snowflakeType, int(srcColumnMeta.Scale), i, loc)
if ts != nil {
ar := arrow.Timestamp(ts.UnixNano())
// in case of overflow in arrow timestamp return error
if ts.Year() != ar.ToTime(arrow.Nanosecond).Year() {
return nil, &SnowflakeError{
Number: ErrTooHighTimestampPrecision,
SQLState: SQLStateInvalidDataTimeFormat,
Message: fmt.Sprintf("Cannot convert timestamp %v in column %v to Arrow.Timestamp data type due to too high precision. Please use context with WithOriginalTimestamp.", ts.UTC(), srcColumnMeta.Name),
var ar arrow.Timestamp
switch arrowBatchesTimestampOption {
case UseMicrosecondTimestamp:
ar = arrow.Timestamp(ts.UnixMicro())
case UseMillisecondTimestamp:
ar = arrow.Timestamp(ts.UnixMilli())
case UseSecondTimestamp:
ar = arrow.Timestamp(ts.Unix())
case UseNanosecondTimestamp:
ar = arrow.Timestamp(ts.UnixNano())
// in case of overflow in arrow timestamp return error
// this could only happen for nanosecond case
if ts.Year() != ar.ToTime(arrow.Nanosecond).Year() {
return nil, &SnowflakeError{
Number: ErrTooHighTimestampPrecision,
SQLState: SQLStateInvalidDataTimeFormat,
Message: fmt.Sprintf("Cannot convert timestamp %v in column %v to Arrow.Timestamp data type due to too high precision. Please use context with WithOriginalTimestamp.", ts.UTC(), srcColumnMeta.Name),
}
}
}
tb.Append(ar)
Expand All @@ -1067,7 +1107,7 @@ func arrowToRecord(ctx context.Context, record arrow.Record, pool memory.Allocat
return array.NewRecord(s, cols, numRows), nil
}

func recordToSchema(sc *arrow.Schema, rowType []execResponseRowType, loc *time.Location, useOriginalTimestamp bool) (*arrow.Schema, error) {
func recordToSchema(sc *arrow.Schema, rowType []execResponseRowType, loc *time.Location, timestampOption snowflakeArrowBatchesTimestampOption) (*arrow.Schema, error) {
var fields []arrow.Field
for i := 0; i < len(sc.Fields()); i++ {
f := sc.Field(i)
Expand All @@ -1094,16 +1134,28 @@ func recordToSchema(sc *arrow.Schema, rowType []execResponseRowType, loc *time.L
case timeType:
t = &arrow.Time64Type{Unit: arrow.Nanosecond}
case timestampNtzType, timestampTzType:
if useOriginalTimestamp {
if timestampOption == UseOriginalTimestamp {
// do nothing - return timestamp as is
converted = false
} else if timestampOption == UseMicrosecondTimestamp {
t = &arrow.TimestampType{Unit: arrow.Microsecond}
} else if timestampOption == UseMillisecondTimestamp {
t = &arrow.TimestampType{Unit: arrow.Millisecond}
} else if timestampOption == UseSecondTimestamp {
t = &arrow.TimestampType{Unit: arrow.Second}
} else {
t = &arrow.TimestampType{Unit: arrow.Nanosecond}
}
case timestampLtzType:
if useOriginalTimestamp {
if timestampOption == UseOriginalTimestamp {
// do nothing - return timestamp as is
converted = false
} else if timestampOption == UseMicrosecondTimestamp {
t = &arrow.TimestampType{Unit: arrow.Microsecond, TimeZone: loc.String()}
} else if timestampOption == UseMillisecondTimestamp {
t = &arrow.TimestampType{Unit: arrow.Millisecond, TimeZone: loc.String()}
} else if timestampOption == UseSecondTimestamp {
t = &arrow.TimestampType{Unit: arrow.Second, TimeZone: loc.String()}
} else {
t = &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: loc.String()}
}
Expand Down
Loading

0 comments on commit 8158364

Please sign in to comment.