Skip to content

Commit

Permalink
Search for dummy file in writer directory (#3589)
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-moseley authored Oct 21, 2022
1 parent 05aa41c commit 13ed3d4
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.filters.HiddenFilter;
import org.apache.gobblin.writer.PartitionedDataWriter;
import org.apache.gobblin.writer.partitioner.TimeBasedWriterPartitioner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -154,25 +156,28 @@ private Map<Path, Metrics> computeFileMetrics(State state) throws IOException {
private Map<Path, Metrics> computeDummyFile(State state) throws IOException {
Map<Path, Metrics> newFiles = new HashMap<>();
FileSystem fs = FileSystem.get(conf);
for (final String pathString : state.getPropAsList(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR, "")) {
Path path = new Path(pathString);
//
PriorityQueue<FileStatus> fileStatuses =
new PriorityQueue<>((x, y) -> Long.compare(y.getModificationTime(), x.getModificationTime()));
if (fs.exists(path)) {
fileStatuses.add(fs.getFileStatus(path));
}
// Only register files
while (!fileStatuses.isEmpty()) {
FileStatus fileStatus = fileStatuses.poll();
if (fileStatus.isDirectory()) {
fileStatuses.addAll(Arrays.asList(fs.listStatus(fileStatus.getPath(), HIDDEN_FILES_FILTER)));
} else {
Path filePath = fileStatus.getPath();
newFiles.put(filePath, null);
// Only one concrete file from the path is needed
return newFiles;
}
if (!state.contains(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR)) {
return newFiles;
}
String baseDatasetString = state.getProp(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR);
Path searchPath = new Path(baseDatasetString);
if (state.contains(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX)) {
searchPath = new Path(searchPath, state.getProp(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX));
}
PriorityQueue<FileStatus> fileStatuses = new PriorityQueue<>((x, y) -> Long.compare(y.getModificationTime(), x.getModificationTime()));
if (fs.exists(searchPath)) {
fileStatuses.add(fs.getFileStatus(searchPath));
}
// Only register files
while (!fileStatuses.isEmpty()) {
FileStatus fileStatus = fileStatuses.poll();
if (fileStatus.isDirectory()) {
fileStatuses.addAll(Arrays.asList(fs.listStatus(fileStatus.getPath(), HIDDEN_FILES_FILTER)));
} else {
Path filePath = fileStatus.getPath();
newFiles.put(filePath, null);
// Only one concrete file from the path is needed
return newFiles;
}
}
return newFiles;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import org.apache.gobblin.writer.FsDataWriterBuilder;
import org.apache.gobblin.writer.GobblinOrcWriter;
import org.apache.gobblin.writer.PartitionedDataWriter;
import org.apache.gobblin.writer.partitioner.TimeBasedWriterPartitioner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -280,6 +282,7 @@ private void setGMCEPublisherStateWithoutNewFile(WorkUnitState state) {
state.setProp(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR, datasetDir.toString());
state.setProp(AbstractJob.JOB_ID, "testFlow");
state.setProp(PartitionedDataWriter.WRITER_LATEST_SCHEMA, _avroPartitionSchema);
state.setProp(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX, "hourly");
}

private void setGMCEPublisherStateForAvroFile(WorkUnitState state) {
Expand Down

0 comments on commit 13ed3d4

Please sign in to comment.