From 13ed3d42962003991507ad7e095b93b7d4c107da Mon Sep 17 00:00:00 2001 From: Jack Moseley Date: Fri, 21 Oct 2022 16:37:00 -0700 Subject: [PATCH] Search for dummy file in writer directory (#3589) --- .../publisher/GobblinMCEPublisher.java | 43 +++++++++++-------- .../publisher/GobblinMCEPublisherTest.java | 3 ++ 2 files changed, 27 insertions(+), 19 deletions(-) diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java index 53611f8c1e4..47616529f86 100644 --- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java +++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java @@ -39,6 +39,8 @@ import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.filters.HiddenFilter; import org.apache.gobblin.writer.PartitionedDataWriter; +import org.apache.gobblin.writer.partitioner.TimeBasedWriterPartitioner; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -154,25 +156,28 @@ private Map computeFileMetrics(State state) throws IOException { private Map computeDummyFile(State state) throws IOException { Map newFiles = new HashMap<>(); FileSystem fs = FileSystem.get(conf); - for (final String pathString : state.getPropAsList(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR, "")) { - Path path = new Path(pathString); - // - PriorityQueue fileStatuses = - new PriorityQueue<>((x, y) -> Long.compare(y.getModificationTime(), x.getModificationTime())); - if (fs.exists(path)) { - fileStatuses.add(fs.getFileStatus(path)); - } - // Only register files - while (!fileStatuses.isEmpty()) { - FileStatus fileStatus = fileStatuses.poll(); - if (fileStatus.isDirectory()) { - fileStatuses.addAll(Arrays.asList(fs.listStatus(fileStatus.getPath(), HIDDEN_FILES_FILTER))); - } else { - Path filePath = fileStatus.getPath(); - newFiles.put(filePath, null); - // Only one concrete file from the path is needed - return newFiles; - } + if (!state.contains(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR)) { + return newFiles; + } + String baseDatasetString = state.getProp(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR); + Path searchPath = new Path(baseDatasetString); + if (state.contains(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX)) { + searchPath = new Path(searchPath, state.getProp(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX)); + } + PriorityQueue fileStatuses = new PriorityQueue<>((x, y) -> Long.compare(y.getModificationTime(), x.getModificationTime())); + if (fs.exists(searchPath)) { + fileStatuses.add(fs.getFileStatus(searchPath)); + } + // Only register files + while (!fileStatuses.isEmpty()) { + FileStatus fileStatus = fileStatuses.poll(); + if (fileStatus.isDirectory()) { + fileStatuses.addAll(Arrays.asList(fs.listStatus(fileStatus.getPath(), HIDDEN_FILES_FILTER))); + } else { + Path filePath = fileStatus.getPath(); + newFiles.put(filePath, null); + // Only one concrete file from the path is needed + return newFiles; } } return newFiles; diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java index e4f0345463b..faa59dd805e 100644 --- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java +++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java @@ -53,6 +53,8 @@ import org.apache.gobblin.writer.FsDataWriterBuilder; import org.apache.gobblin.writer.GobblinOrcWriter; import org.apache.gobblin.writer.PartitionedDataWriter; +import org.apache.gobblin.writer.partitioner.TimeBasedWriterPartitioner; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -280,6 +282,7 @@ private void setGMCEPublisherStateWithoutNewFile(WorkUnitState state) { state.setProp(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR, datasetDir.toString()); state.setProp(AbstractJob.JOB_ID, "testFlow"); state.setProp(PartitionedDataWriter.WRITER_LATEST_SCHEMA, _avroPartitionSchema); + state.setProp(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX, "hourly"); } private void setGMCEPublisherStateForAvroFile(WorkUnitState state) {