Skip to content

Commit

Permalink
NIFI-13713 Make loading of extension UIs asynchronous in JettyServer
Browse files Browse the repository at this point in the history
  • Loading branch information
bbende committed Sep 6, 2024
1 parent 6ac5a96 commit e1ccaf1
Showing 1 changed file with 57 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -44,7 +45,11 @@
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
Expand Down Expand Up @@ -182,6 +187,8 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {

private static final String SPRING_SECURITY_FILTER_CHAIN = "springSecurityFilterChain";

private static final Duration EXTENSION_UI_POLL_INTERVAL = Duration.ofSeconds(5);

private final DeploymentManager deploymentManager = new DeploymentManager();

private Server server;
Expand Down Expand Up @@ -211,6 +218,9 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {

private final Map<BundleCoordinate, List<App>> appsByBundleCoordinate = new ConcurrentHashMap<>();

private ExtensionUiLoadTask extensionUiLoadTask;
private final BlockingQueue<Bundle> extensionUisToLoad = new LinkedBlockingQueue<>();

/**
* Default no-arg constructor for ServiceLoader
*/
Expand Down Expand Up @@ -328,8 +338,12 @@ private Handler loadInitialWars(final Set<Bundle> bundles) {

@Override
public synchronized void loadExtensionUis(final Set<Bundle> bundles) {
extensionUisToLoad.addAll(bundles);
}

private void processExtensionUiBundle(final Bundle bundle) {
// Find and load any WARs contained within the set of bundles...
final Map<File, Bundle> warToBundleLookup = findWars(bundles);
final Map<File, Bundle> warToBundleLookup = findWars(Set.of(bundle));
final ExtensionUiInfo extensionUiInfo = loadWars(warToBundleLookup);
final Map<BundleCoordinate, List<WebAppContext>> webappContextsByBundleCoordinate = extensionUiInfo.webAppContextsByBundleCoordinate();

Expand Down Expand Up @@ -929,6 +943,11 @@ public void start() {
webDocsServletContext.setAttribute("nifi-python-extension-mapping", pythonExtensionMapping);
}

// Start background task to process bundles that are submitted for loading extension UIs, this needs to be
// started after Jetty has been started to ensure the Spring WebApplicationContext is available
extensionUiLoadTask = new ExtensionUiLoadTask(extensionUisToLoad, this::processExtensionUiBundle);
Thread.ofVirtual().name("Extension UI Loader").start(extensionUiLoadTask);

// if this nifi is a node in a cluster, start the flow service and load the flow - the
// flow service is loaded here for clustered nodes because the loading of the flow will
// initialize the connection between the node and the coordinator. if the node connects (starts
Expand Down Expand Up @@ -1112,6 +1131,7 @@ public void stop() {
logger.warn("Failed to stop NAR provider", e);
}

extensionUiLoadTask.stop();
}

private ErrorPageErrorHandler getErrorHandler() {
Expand Down Expand Up @@ -1170,5 +1190,41 @@ public String getContextPath() {
}
}

/**
* Task that asynchronously processes any bundles that were submitted to have extension UIs loaded.
*/
private static class ExtensionUiLoadTask implements Runnable {

private final BlockingQueue<Bundle> extensionUiBundlesToLoad;
private final Consumer<Bundle> extensionUiLoadFunction;

private volatile boolean stopped = false;

public ExtensionUiLoadTask(final BlockingQueue<Bundle> extensionUiBundlesToLoad, final Consumer<Bundle> extensionUiLoadFunction) {
this.extensionUiBundlesToLoad = extensionUiBundlesToLoad;
this.extensionUiLoadFunction = extensionUiLoadFunction;
}

@Override
public void run() {
while (!stopped) {
try {
final Bundle bundle = extensionUiBundlesToLoad.poll(EXTENSION_UI_POLL_INTERVAL.getSeconds(), TimeUnit.SECONDS);
if (bundle != null) {
extensionUiLoadFunction.accept(bundle);
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
} catch (final Exception e) {
logger.error("Failed to load extension UI", e);
}
}
}

public void stop() {
stopped = true;
}
}

}

0 comments on commit e1ccaf1

Please sign in to comment.