Skip to content

Commit

Permalink
address nit
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 committed Oct 10, 2024
1 parent 969263e commit 86e8271
Showing 1 changed file with 19 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -433,24 +433,10 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) {
rowBuilder.addValue(icebergValue);
break;
case DATETIME:
long micros;
if (icebergValue instanceof OffsetDateTime) {
micros = DateTimeUtil.microsFromTimestamptz((OffsetDateTime) icebergValue);
} else if (icebergValue instanceof LocalDateTime) {
micros = DateTimeUtil.microsFromTimestamp((LocalDateTime) icebergValue);
} else if (icebergValue instanceof Long) {
micros = (long) icebergValue;
} else if (icebergValue instanceof String) {
rowBuilder.addValue(DateTime.parse((String) icebergValue));
break;
} else {
throw new UnsupportedOperationException(
"Unsupported Iceberg type for Beam type DATETIME: " + icebergValue.getClass());
}
// Iceberg uses a long for micros
// Iceberg uses a long for micros.
// Beam DATETIME uses joda's DateTime, which only supports millis,
// so we do lose some precision here
rowBuilder.addValue(new DateTime(micros / 1000L));
rowBuilder.addValue(getBeamDateTimeValue(icebergValue));
break;
case BYTES:
// Iceberg uses ByteBuffer; Beam uses byte[]
Expand All @@ -475,6 +461,23 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) {
return rowBuilder.build();
}

private static DateTime getBeamDateTimeValue(Object icebergValue) {
long micros;
if (icebergValue instanceof OffsetDateTime) {
micros = DateTimeUtil.microsFromTimestamptz((OffsetDateTime) icebergValue);
} else if (icebergValue instanceof LocalDateTime) {
micros = DateTimeUtil.microsFromTimestamp((LocalDateTime) icebergValue);
} else if (icebergValue instanceof Long) {
micros = (long) icebergValue;
} else if (icebergValue instanceof String) {
return DateTime.parse((String) icebergValue);
} else {
throw new UnsupportedOperationException(
"Unsupported Iceberg type for Beam type DATETIME: " + icebergValue.getClass());
}
return new DateTime(micros / 1000L);
}

private static Object getLogicalTypeValue(Object icebergValue, Schema.FieldType type) {
if (icebergValue instanceof String) {
String strValue = (String) icebergValue;
Expand Down

0 comments on commit 86e8271

Please sign in to comment.