Skip to content

Commit

Permalink
NIFI-13344 Implement backend for uploading and managing custom NARs
Browse files Browse the repository at this point in the history
- Add NarPersistenceProvider to framework API with standard implementation
- Add NarManager to orchestrate interactions with the persistence provider and framework
- Add REST API for upload, download, list, and delete of custom NARs
- Add CLI commands for call these REST APIs
- Implement an upload request replicator for replicating a file upload across the cluster
- Update ExtensionManager, NarLoader, and JettyServer to support unloading a NAR
- Add ability to replace a NAR in place by unloading components, loading new NAR, and reloading components
- Add system tests for uploading a NAR and verifying other APIs
- Add NAR digest to NarNode and summary DTO, fix bug in digest utils
- Sync NARs from cluster coordinator when joining the cluster
  • Loading branch information
bbende committed Jun 28, 2024
1 parent 903090b commit 2649d18
Show file tree
Hide file tree
Showing 114 changed files with 7,268 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ nifi.components.status.snapshot.frequency=1 min
# Swap Manager
nifi.swap.manager.implementation=org.apache.nifi.controller.FileSystemSwapManager
nifi.queue.swap.threshold=20000
# NAR Persistence Properties
nifi.nar.persistence.provider.implementation=org.apache.nifi.nar.StandardNarPersistenceProvider
nifi.nar.persistence.provider.properties.directory=./nar_storage

# Uncomment in order to enable Python Extensions.
#nifi.python.command=python3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,10 @@ public class NiFiProperties extends ApplicationProperties {
// kubernetes properties
public static final String CLUSTER_LEADER_ELECTION_KUBERNETES_LEASE_PREFIX = "nifi.cluster.leader.election.kubernetes.lease.prefix";

// nar manager properties
public static final String NAR_PERSISTENCE_PROVIDER_IMPLEMENTATION_CLASS = "nifi.nar.persistence.provider.implementation";
public static final String NAR_PERSISTENCE_PROVIDER_PROPERTIES_PREFIX = "nifi.nar.persistence.provider.properties.";

public static final String DEFAULT_PYTHON_WORKING_DIRECTORY = "./work/python";

// automatic diagnostic defaults
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.stream.io;

import java.io.IOException;
import java.io.InputStream;

public class MaxLengthInputStream extends InputStream {

private static final String ERROR_MESSAGE_FORMAT = "Unable to read past maximum allowed length of %s bytes";

private final InputStream in;
private final long limit;
private long bytesRead = 0;
private long markOffset = -1L;

/**
* Constructs an input stream that will throw an exception if reading past a maximum length of bytes.
*
* @param in the underlying input stream
* @param limit maximum length of bytes to read from underlying input stream
*/
public MaxLengthInputStream(final InputStream in, final long limit) {
this.in = in;
this.limit = (limit + 1);
}

@Override
public int read() throws IOException {
if (bytesRead >= limit) {
throw new IOException(ERROR_MESSAGE_FORMAT.formatted(limit));
}

final int val = in.read();
if (val > -1) {
bytesRead++;
}
return val;
}

@Override
public int read(final byte[] b) throws IOException {
if (bytesRead >= limit) {
throw new IOException(ERROR_MESSAGE_FORMAT.formatted(limit));
}

final int maxToRead = (int) Math.min(b.length, limit - bytesRead);

final int val = in.read(b, 0, maxToRead);
if (val > 0) {
bytesRead += val;
}
return val;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
if (bytesRead >= limit) {
throw new IOException(ERROR_MESSAGE_FORMAT.formatted(limit));
}

final int maxToRead = (int) Math.min(len, limit - bytesRead);

final int val = in.read(b, off, maxToRead);
if (val > 0) {
bytesRead += val;
}
return val;
}

@Override
public long skip(final long n) throws IOException {
final long toSkip = Math.min(n, limit - bytesRead);
final long skipped = in.skip(toSkip);
bytesRead += skipped;
return skipped;
}

@Override
public int available() throws IOException {
return in.available();
}

@Override
public void close() throws IOException {
in.close();
}

@Override
public void mark(int readlimit) {
in.mark(readlimit);
markOffset = bytesRead;
}

@Override
public boolean markSupported() {
return in.markSupported();
}

@Override
public void reset() throws IOException {
in.reset();

if (markOffset >= 0) {
bytesRead = markOffset;
}
markOffset = -1;
}

public long getLimit() {
return limit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static byte[] getDigest(final InputStream inputStream) throws IOException
int bytesRead = inputStream.read(buffer, START_READ_INDEX, BUFFER_LENGTH);

while (bytesRead > STREAM_END_INDEX) {
messageDigest.update(buffer);
messageDigest.update(buffer, 0, bytesRead);
bytesRead = inputStream.read(buffer, START_READ_INDEX, BUFFER_LENGTH);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.stream.io;

import org.junit.jupiter.api.Test;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class MaxLengthInputStreamTest {

@Test
public void testReadingLessThanMaxLength() throws IOException {
final String content = "12345";
try (final InputStream inputStream = new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8));
final InputStream maxLengthInputStream = new MaxLengthInputStream(inputStream, Integer.MAX_VALUE)) {
final byte[] allBytes = maxLengthInputStream.readAllBytes();
assertEquals(5, allBytes.length);
}
}

@Test
public void testReadingMoreThanMaxLength() throws IOException {
final String content = "12345";
try (final InputStream inputStream = new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8));
final InputStream maxLengthInputStream = new MaxLengthInputStream(inputStream, 1)) {
assertThrows(IOException.class, maxLengthInputStream::readAllBytes);
}
}

@Test
public void testReadingEqualToMaxLength() throws IOException {
final String content = "12345";
try (final InputStream inputStream = new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8));
final InputStream maxLengthInputStream = new MaxLengthInputStream(inputStream, 5)) {
final byte[] allBytes = maxLengthInputStream.readAllBytes();
assertEquals(5, allBytes.length);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public enum HttpResponseStatus {

PROXY_AUTHENTICATION_REQUIRED(407),

CONFLICT(409),

INTERNAL_SERVER_ERROR(500),

SERVICE_UNAVAILABLE(503);
Expand All @@ -55,4 +57,12 @@ public enum HttpResponseStatus {
public int getCode() {
return code;
}

public boolean isSuccessful() {
return isSuccessful(code);
}

public static boolean isSuccessful(final int code) {
return code >= 200 && code < 300;
}
}
15 changes: 15 additions & 0 deletions nifi-framework-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,19 @@
<version>2.0.0-SNAPSHOT</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes combine.children="append">
<exclude>src/test/resources/nar/MANIFEST-FULL.MF</exclude>
<exclude>src/test/resources/nar/MANIFEST-MINIMAL.MF</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
Loading

0 comments on commit 2649d18

Please sign in to comment.