diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java index e026c0186275..ce551acb7581 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java @@ -43,6 +43,7 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.FlowFileAccessException; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler; @@ -175,7 +176,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session outgoingFlowFile = session.putAttribute(outgoingFlowFile, CoreAttributes.FILENAME.key(), outputFilename); stopWatch.stop(); - getLogger().info("Successfully received content from {} for {} in {}", new Object[]{qualifiedPath, outgoingFlowFile, stopWatch.getDuration()}); + getLogger().info("Successfully received content from {} for {} in {}", qualifiedPath, outgoingFlowFile, stopWatch.getDuration()); outgoingFlowFile = session.putAttribute(outgoingFlowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString()); session.getProvenanceReporter().fetch(outgoingFlowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS)); session.transfer(outgoingFlowFile, getSuccessRelationship()); @@ -190,6 +191,10 @@ public void onTrigger(final ProcessContext context, final ProcessSession session outgoingFlowFile = session.penalize(outgoingFlowFile); session.transfer(outgoingFlowFile, getCommsFailureRelationship()); } + } catch (FlowFileAccessException ffae) { + getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", outgoingFlowFile, ffae); + outgoingFlowFile = session.penalize(outgoingFlowFile); + session.transfer(outgoingFlowFile, getCommsFailureRelationship()); } finally { IOUtils.closeQuietly(stream); } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java index 036904312c38..9845c61e2803 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java @@ -214,6 +214,25 @@ public void testGSSException() throws IOException { fileSystem.setFailOnOpen(false); } + @Test + public void testRuntimeException() { + MockFileSystem fileSystem = new MockFileSystem(); + fileSystem.setRuntimeFailOnOpen(true); + FetchHDFS proc = new TestableFetchHDFS(kerberosProperties, fileSystem); + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(FetchHDFS.FILENAME, "src/test/resources/testdata/randombytes-1.gz"); + runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "NONE"); + runner.enqueue("trigger flow file"); + runner.run(); + + runner.assertTransferCount(FetchHDFS.REL_SUCCESS, 0); + runner.assertTransferCount(FetchHDFS.REL_FAILURE, 0); + runner.assertTransferCount(FetchHDFS.REL_COMMS_FAILURE, 1); + // assert that the file was penalized + runner.assertPenalizeCount(1); + fileSystem.setRuntimeFailOnOpen(false); + } + private static class TestableFetchHDFS extends FetchHDFS { private final KerberosProperties testKerberosProps; private final FileSystem fileSystem; diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java index f020f8b3b5e3..d5c196f0ee14 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java @@ -18,6 +18,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -26,6 +27,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.util.Progressable; +import org.apache.nifi.processor.exception.FlowFileAccessException; import org.ietf.jgss.GSSException; import java.io.ByteArrayOutputStream; @@ -49,6 +51,7 @@ public class MockFileSystem extends FileSystem { private final Map pathToOutputStream = new HashMap<>(); private boolean failOnOpen; + private boolean runtimeFailOnOpen; private boolean failOnClose; private boolean failOnCreate; private boolean failOnFileStatus; @@ -74,6 +77,10 @@ public void setFailOnOpen(final boolean failOnOpen) { this.failOnOpen = failOnOpen; } + public void setRuntimeFailOnOpen(final boolean runtimeFailOnOpen) { + this.runtimeFailOnOpen = runtimeFailOnOpen; + } + public void setAcl(final Path path, final List aclSpec) { pathToAcl.put(path, aclSpec); } @@ -93,7 +100,10 @@ public FSDataInputStream open(final Path f, final int bufferSize) throws IOExcep if (failOnOpen) { throw new IOException(new GSSException(13)); } - return null; + if (runtimeFailOnOpen) { + throw new FlowFileAccessException("runtime"); + } + return createInputStream(f); } @Override @@ -190,6 +200,19 @@ public boolean exists(Path f) throws IOException { return pathToStatus.containsKey(f); } + private FSDataInputStream createInputStream(final Path f) throws IOException { + if(failOnClose) { + return new FSDataInputStream(new StubFSInputStream()) { + @Override + public void close() throws IOException { + super.close(); + throw new IOException("Fail on close"); + } + }; + } else { + return new FSDataInputStream(new StubFSInputStream()); + } + } private FSDataOutputStream createOutputStream() { if(failOnClose) { return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics("")) { @@ -294,4 +317,27 @@ public short getDefaultReplication() { private static FsPermission perms(short p) { return new FsPermission(p); } + + private class StubFSInputStream extends FSInputStream { + + @Override + public void seek(long l) throws IOException { + + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public boolean seekToNewSource(long l) throws IOException { + return true; + } + + @Override + public int read() throws IOException { + return -1; + } + } } \ No newline at end of file