Skip to content

Commit

Permalink
Introduce async post processing of uploaded NARs
Browse files Browse the repository at this point in the history
- Return NarUpload object from upload
- Add NarManager methods to get an upload and delete an upload
- Add REST APIs for getting an upload and deleting an upload
- Update CLI command for uploading to submit and poll/delete
  • Loading branch information
bbende committed Jun 11, 2024
1 parent 65f0c85 commit 6cad50c
Show file tree
Hide file tree
Showing 25 changed files with 881 additions and 151 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.web.api.dto;

import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.xml.bind.annotation.XmlType;

@XmlType(name = "narCoordinate")
public class NarCoordinateDTO {

private String group;
private String id;
private String version;

@Schema(description = "The group of the NAR.")
public String getGroup() {
return group;
}

public void setGroup(final String group) {
this.group = group;
}

@Schema(description = "The identifier of the NAR.")
public String getId() {
return id;
}

public void setId(final String id) {
this.id = id;
}

@Schema(description = "The version of the NAR.")
public String getVersion() {
return version;
}

public void setVersion(final String version) {
this.version = version;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.web.api.dto;

import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.xml.bind.annotation.XmlType;

@XmlType(name = "narUpload")
public class NarUploadDTO {

private String identifier;
private NarCoordinateDTO coordinate;
private NarCoordinateDTO dependencyCoordinate;

private boolean processingComplete;
private String processingStatus;
private String processingResult;

public NarUploadDTO() {
}

public NarUploadDTO(final String identifier) {
this.identifier = identifier;
}

@Schema(description = "The identifier for the upload.")
public String getIdentifier() {
return identifier;
}

public void setIdentifier(final String identifier) {
this.identifier = identifier;
}

@Schema(description = "The coordinate of the uploaded NAR.")
public NarCoordinateDTO getCoordinate() {
return coordinate;
}

public void setCoordinate(final NarCoordinateDTO coordinate) {
this.coordinate = coordinate;
}

@Schema(description = "The coordinate of another NAR that the uploaded NAR is dependent on, or null if not dependent on another NAR.")
public NarCoordinateDTO getDependencyCoordinate() {
return dependencyCoordinate;
}

public void setDependencyCoordinate(final NarCoordinateDTO dependencyCoordinate) {
this.dependencyCoordinate = dependencyCoordinate;
}

@Schema(description = "Indicates if processing of the uploaded NAR has completed.")
public boolean isProcessingComplete() {
return processingComplete;
}

public void setProcessingComplete(final boolean processingComplete) {
this.processingComplete = processingComplete;
}

@Schema(description = "Status indicating the phase of processing the uploaded NAR.")
public String getProcessingStatus() {
return processingStatus;
}

public void setProcessingStatus(final String processingStatus) {
this.processingStatus = processingStatus;
}

@Schema(description = "Result of processing the uploaded NAR, or null if processing is not complete.")
public String getProcessingResult() {
return processingResult;
}

public void setProcessingResult(final String processingResult) {
this.processingResult = processingResult;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.web.api.entity;

import jakarta.xml.bind.annotation.XmlType;
import org.apache.nifi.web.api.dto.NarUploadDTO;

@XmlType(name = "narUploadEntity")
public class NarUploadEntity extends Entity {

private NarUploadDTO narUpload;

public NarUploadEntity() {
}

public NarUploadEntity(final NarUploadDTO narUpload) {
this.narUpload = narUpload;
}

public NarUploadDTO getNarUpload() {
return narUpload;
}

public void setNarUpload(final NarUploadDTO narUpload) {
this.narUpload = narUpload;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.apache.nifi.cluster.coordination.http.endpoints.UsersEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.VerifyConfigEndpointMerger;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.manager.NarUploadEntityMerger;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.stream.io.NullOutputStream;
import org.apache.nifi.util.FormatUtils;
Expand Down Expand Up @@ -193,6 +194,7 @@ public StandardHttpResponseMapper(final NiFiProperties nifiProperties) {
endpointMergers.add(new FlowRegistryClientEndpointMerger());
endpointMergers.add(new FlowRegistryClientsEndpointMerger());
endpointMergers.add(new FlowRepositoryClientTypesEndpointMerger());
endpointMergers.add(new NarUploadEntityMerger());

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,15 @@ private <T> T replicateRequest(final NodeIdentifier nodeId, final UploadRequest<

final NiFiUser user = uploadRequest.getUser();
final String filename = uploadRequest.getFilename();
final String uploadId = uploadRequest.getIdentifier();

try (final InputStream inputStream = new FileInputStream(contents)) {
final HttpRequestBodySpec request = webClientService.post()
.uri(uri)
.body(inputStream, OptionalLong.of(inputStream.available()))
.header(CONTENT_TYPE_HEADER, UPLOAD_CONTENT_TYPE)
.header(FILENAME_HEADER, filename)
.header(UPLOAD_ID, uploadId)
// Special NiFi-specific headers to indicate that the request should be performed and not replicated to the nodes
.header(RequestReplicator.REQUEST_EXECUTION_HTTP_HEADER, "true")
.header(RequestReplicator.REPLICATION_INDICATOR_HEADER, "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ public class UploadRequest<T> {

private final NiFiUser user;
private final String filename;
private final String identifier;
private final InputStream contents;
private final URI exampleRequestUri;
private final Class<T> responseClass;

private UploadRequest(final Builder<T> builder) {
this.user = Objects.requireNonNull(builder.user);
this.filename = Objects.requireNonNull(builder.filename);
this.identifier = Objects.requireNonNull(builder.identifier);
this.contents = Objects.requireNonNull(builder.contents);
this.exampleRequestUri = Objects.requireNonNull(builder.exampleRequestUri);
this.responseClass = Objects.requireNonNull(builder.responseClass);
Expand All @@ -52,6 +54,10 @@ public String getFilename() {
return filename;
}

public String getIdentifier() {
return identifier;
}

public InputStream getContents() {
return contents;
}
Expand All @@ -67,6 +73,7 @@ public Class<T> getResponseClass() {
public static final class Builder<T> {
private NiFiUser user;
private String filename;
private String identifier;
private InputStream contents;
private URI exampleRequestUri;
private Class<T> responseClass;
Expand All @@ -81,6 +88,11 @@ public Builder<T> filename(String filename) {
return this;
}

public Builder<T> identifier(String identifier) {
this.identifier = identifier;
return this;
}

public Builder<T> contents(InputStream contents) {
this.contents = contents;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
public interface UploadRequestReplicator {

String FILENAME_HEADER = "Filename";
String UPLOAD_ID = "Upload-Id";
String CONTENT_TYPE_HEADER = "Content-Type";
String UPLOAD_CONTENT_TYPE = "application/octet-stream";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.cluster.manager;

import org.apache.nifi.cluster.coordination.http.endpoints.AbstractSingleDTOEndpoint;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.web.api.dto.NarUploadDTO;
import org.apache.nifi.web.api.entity.NarUploadEntity;

import java.net.URI;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

public class NarUploadEntityMerger extends AbstractSingleDTOEndpoint<NarUploadEntity, NarUploadDTO> {

private static final String NAR_UPLOAD_PATH = "/nifi-api/controller/nar-manager/uploads";
public static final Pattern NAR_UPLOAD_URI_PATTERN = Pattern.compile(NAR_UPLOAD_PATH + "/[a-f0-9\\-]{36}");

@Override
public boolean canHandle(final URI uri, final String method) {
if ("POST".equals(method) && NAR_UPLOAD_PATH.equals(uri.getPath())) {
return true;
} else {
return ("GET".equals(method) || "DELETE".equals(method)) && NAR_UPLOAD_URI_PATTERN.matcher(uri.getPath()).matches();
}
}

@Override
protected Class<NarUploadEntity> getEntityClass() {
return NarUploadEntity.class;
}

@Override
protected NarUploadDTO getDto(final NarUploadEntity entity) {
return entity.getNarUpload();
}

@Override
protected void mergeResponses(final NarUploadDTO clientDto, final Map<NodeIdentifier, NarUploadDTO> dtoMap, final Set<NodeResponse> successfulResponses,
final Set<NodeResponse> problematicResponses) {
for (final NarUploadDTO narUploadDTO : dtoMap.values()) {
if (!narUploadDTO.isProcessingComplete()) {
clientDto.setProcessingComplete(narUploadDTO.isProcessingComplete());
clientDto.setProcessingStatus(narUploadDTO.getProcessingStatus());
clientDto.setProcessingResult(narUploadDTO.getProcessingResult());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@

public interface NarManager {

BundleCoordinate addNar(InputStream inputStream) throws IOException;
NarUpload uploadNar(String uploadId, InputStream inputStream) throws IOException;

NarUpload getUpload(String uploadId);

NarUpload deleteUpload(String uploadId);

void verifyDeleteNar(BundleCoordinate coordinate, boolean forceDelete);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.nar;

public enum NarState {
LOADED,
MISSING_DEPENDENCY
}
Loading

0 comments on commit 6cad50c

Please sign in to comment.