diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java index 8e29650f3fc3..e5bdf18ecbcf 100644 --- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java +++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.hcatalog; +import java.util.List; +import java.util.stream.Collectors; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -25,6 +27,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.hive.hcatalog.data.HCatRecord; +import org.joda.time.Instant; /** Utilities to convert {@link HCatRecord HCatRecords} to {@link Row Rows}. */ @SuppressWarnings({ @@ -74,6 +77,18 @@ public PCollection expand(PBegin input) { private static class HCatToRowFn extends DoFn { private final Schema schema; + private Object maybeCastHDate(Object obj) { + if (obj instanceof org.apache.hadoop.hive.common.type.Date) { + return new Instant(((org.apache.hadoop.hive.common.type.Date) obj).toEpochMilli()); + } + return obj; + } + + /** Cast objects of the types that aren't supported by {@link Row}. */ + private List castTypes(List values) { + return values.stream().map(this::maybeCastHDate).collect(Collectors.toList()); + } + HCatToRowFn(Schema schema) { this.schema = schema; } @@ -81,7 +96,7 @@ private static class HCatToRowFn extends DoFn { @ProcessElement public void processElement(ProcessContext c) { HCatRecord hCatRecord = c.element(); - c.output(Row.withSchema(schema).addValues(hCatRecord.getAll()).build()); + c.output(Row.withSchema(schema).addValues(castTypes(hCatRecord.getAll())).build()); } } } diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java index 4bb7e1bd7044..3d97a2ccc1d9 100644 --- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java +++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java @@ -22,6 +22,7 @@ import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.TEST_RECORDS_COUNT; import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.TEST_TABLE; import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.buildHCatRecords; +import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.buildHCatRecordsWithDate; import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.getConfigPropertiesAsMap; import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.getExpectedRecords; import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.getReaderContext; @@ -54,12 +55,14 @@ import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Distinct; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Watch; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hive.hcatalog.data.DefaultHCatRecord; @@ -230,6 +233,44 @@ public void processElement(ProcessContext c) { readAfterWritePipeline.run(); } + /** Perform test for reading Date column type from an hcatalog. */ + @Test + public void testReadHCatalogDateType() throws Exception { + service.executeQuery("drop table if exists " + TEST_TABLE); + service.executeQuery("create table " + TEST_TABLE + "(mycol1 string, mycol2 date)"); + + defaultPipeline + .apply(Create.of(buildHCatRecordsWithDate(TEST_RECORDS_COUNT))) + .apply( + HCatalogIO.write() + .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf())) + .withDatabase(TEST_DATABASE) + .withTable(TEST_TABLE) + .withPartition(new java.util.HashMap<>())); + defaultPipeline.run().waitUntilFinish(); + + final PCollection output = + readAfterWritePipeline + .apply( + HCatToRow.fromSpec( + HCatalogIO.read() + .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf())) + .withDatabase(TEST_DATABASE) + .withTable(TEST_TABLE) + .withFilter(TEST_FILTER))) + .apply( + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element().getDateTime("mycol2").toString("yyyy-MM-dd HH:mm:ss")); + } + })) + .apply(Distinct.create()); + PAssert.that(output).containsInAnyOrder(ImmutableList.of("2014-01-20 00:00:00")); + readAfterWritePipeline.run(); + } + /** Test of Write to a non-existent table. */ @Test public void testWriteFailureTableDoesNotExist() { diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/test/HCatalogIOTestUtils.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/test/HCatalogIOTestUtils.java index d0d1d850a6cb..c09c2c906d64 100644 --- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/test/HCatalogIOTestUtils.java +++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/test/HCatalogIOTestUtils.java @@ -26,6 +26,7 @@ import java.util.Map.Entry; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.values.KV; +import org.apache.hadoop.hive.common.type.Date; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.data.DefaultHCatRecord; @@ -120,4 +121,13 @@ public static Map getConfigPropertiesAsMap(HiveConf hiveConf) { private static DefaultHCatRecord toHCatRecord(int value) { return new DefaultHCatRecord(Arrays.asList("record " + value, value)); } + + /** Returns a list of HCatRecords of passed size with some dummy date as a field. */ + public static List buildHCatRecordsWithDate(int size) { + List expected = new ArrayList<>(); + for (int i = 0; i < size; i++) { + expected.add(new DefaultHCatRecord(Arrays.asList("record " + i, Date.valueOf("2014-01-20")))); + } + return expected; + } }