Skip to content

Commit

Permalink
Introduce async post processing of uploaded NARs
Browse files Browse the repository at this point in the history
  • Loading branch information
bbende committed Jun 7, 2024
1 parent 056260a commit 2dca805
Show file tree
Hide file tree
Showing 12 changed files with 269 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.web.api.dto;

import jakarta.xml.bind.annotation.XmlType;

@XmlType(name = "narUpload")
public class NarUploadDTO {


}
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,15 @@ private <T> T replicateRequest(final NodeIdentifier nodeId, final UploadRequest<

final NiFiUser user = uploadRequest.getUser();
final String filename = uploadRequest.getFilename();
final String uploadId = uploadRequest.getIdentifier();

try (final InputStream inputStream = new FileInputStream(contents)) {
final HttpRequestBodySpec request = webClientService.post()
.uri(uri)
.body(inputStream, OptionalLong.of(inputStream.available()))
.header(CONTENT_TYPE_HEADER, UPLOAD_CONTENT_TYPE)
.header(FILENAME_HEADER, filename)
.header(UPLOAD_ID, uploadId)
// Special NiFi-specific headers to indicate that the request should be performed and not replicated to the nodes
.header(RequestReplicator.REQUEST_EXECUTION_HTTP_HEADER, "true")
.header(RequestReplicator.REPLICATION_INDICATOR_HEADER, "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ public class UploadRequest<T> {

private final NiFiUser user;
private final String filename;
private final String identifier;
private final InputStream contents;
private final URI exampleRequestUri;
private final Class<T> responseClass;

private UploadRequest(final Builder<T> builder) {
this.user = Objects.requireNonNull(builder.user);
this.filename = Objects.requireNonNull(builder.filename);
this.identifier = Objects.requireNonNull(builder.identifier);
this.contents = Objects.requireNonNull(builder.contents);
this.exampleRequestUri = Objects.requireNonNull(builder.exampleRequestUri);
this.responseClass = Objects.requireNonNull(builder.responseClass);
Expand All @@ -52,6 +54,10 @@ public String getFilename() {
return filename;
}

public String getIdentifier() {
return identifier;
}

public InputStream getContents() {
return contents;
}
Expand All @@ -67,6 +73,7 @@ public Class<T> getResponseClass() {
public static final class Builder<T> {
private NiFiUser user;
private String filename;
private String identifier;
private InputStream contents;
private URI exampleRequestUri;
private Class<T> responseClass;
Expand All @@ -81,6 +88,11 @@ public Builder<T> filename(String filename) {
return this;
}

public Builder<T> identifier(String identifier) {
this.identifier = identifier;
return this;
}

public Builder<T> contents(InputStream contents) {
this.contents = contents;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
public interface UploadRequestReplicator {

String FILENAME_HEADER = "Filename";
String UPLOAD_ID = "Upload-Id";
String CONTENT_TYPE_HEADER = "Content-Type";
String UPLOAD_CONTENT_TYPE = "application/octet-stream";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@

public interface NarManager {

BundleCoordinate addNar(InputStream inputStream) throws IOException;
NarUpload uploadNar(String uploadId, InputStream inputStream) throws IOException;

NarUpload getUpload(String uploadId);

NarUpload deleteUpload(String uploadId);

void verifyDeleteNar(BundleCoordinate coordinate, boolean forceDelete);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.nar;

public enum NarState {
LOADED,
MISSING_DEPENDENCY
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.nar;

public class NarUpload {

private final String identifier;
private final NarManifest narManifest;

private NarUploadStatus uploadStatus;
private NarState narState;

public NarUpload(final String identifier, final NarManifest narManifest) {
this.identifier = identifier;
this.narManifest = narManifest;
this.uploadStatus = NarUploadStatus.UPLOADING;
}

public NarUpload(final NarUpload other) {
this.identifier = other.identifier;
this.narManifest = other.narManifest;
this.uploadStatus = other.uploadStatus;
this.narState = other.narState;
}

public String getIdentifier() {
return identifier;
}

public NarManifest getNarManifest() {
return narManifest;
}

public NarUploadStatus getUploadStatus() {
return uploadStatus;
}

public void setUploadStatus(final NarUploadStatus uploadStatus) {
this.uploadStatus = uploadStatus;
}

public NarState getNarState() {
return narState;
}

public void setNarState(final NarState narState) {
this.narState = narState;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.nar;

public enum NarUploadStatus {
UPLOADING,
LOADING_NAR,
RELOADING_COMPONENTS,
COMPLETED
}
Loading

0 comments on commit 2dca805

Please sign in to comment.