diff --git a/conf/proxy.conf b/conf/proxy.conf index 2e4265153c5dd..fbb009878e315 100644 --- a/conf/proxy.conf +++ b/conf/proxy.conf @@ -231,16 +231,15 @@ webSocketServiceEnabled=false clusterName= -### --- Protocol Handlers +### --- Proxy Extensions -# List of messaging protocols to load, which is a list of protocol names -#proxyMessagingProtocols= +# List of proxy extensions to load, which is a list of extension names +#proxyExtensions= -# The directory to locate messaging protocol handlers -#proxyProtocolHandlerDirectory= +# The directory to locate extensions +#proxyExtensionsDirectory= ### --- Deprecated config variables --- ### # Deprecated. Use configurationStoreServers globalZookeeperServers= - diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/protocol/ProtocolHandlerDefinitions.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java similarity index 82% rename from pulsar-proxy/src/main/java/org/apache/pulsar/proxy/protocol/ProtocolHandlerDefinitions.java rename to pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java index 0269fedcd24ac..844c7ca85abc0 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/protocol/ProtocolHandlerDefinitions.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.proxy.protocol; +package org.apache.pulsar.proxy.extensions; import lombok.Data; import lombok.experimental.Accessors; @@ -25,12 +25,12 @@ import java.util.TreeMap; /** - * The collection of protocol handlers. + * The collection of Proxy Extensions. */ @Data @Accessors(fluent = true) -class ProtocolHandlerDefinitions { +class ExtensionsDefinitions { - private final Map handlers = new TreeMap<>(); + private final Map extensions = new TreeMap<>(); } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/protocol/ProtocolHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtension.java similarity index 62% rename from pulsar-proxy/src/main/java/org/apache/pulsar/proxy/protocol/ProtocolHandler.java rename to pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtension.java index e197886044341..b973e10128a5c 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/protocol/ProtocolHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtension.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.proxy.protocol; +package org.apache.pulsar.proxy.extensions; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; @@ -29,53 +29,52 @@ import java.util.Map; /** - * The protocol handler interface for support additional protocols on Pulsar brokers. + * The extension interface for support additional extensions on Pulsar Proxy. */ @InterfaceAudience.LimitedPrivate @InterfaceStability.Evolving -public interface ProtocolHandler extends AutoCloseable { +public interface ProxyExtension extends AutoCloseable { /** - * Returns the unique protocol name. For example, `kafka-v2` for protocol handler for Kafka v2 protocol. + * Returns the unique extension name. For example, `kafka-v2` for extension for Kafka v2 protocol. */ - String protocolName(); + String extensionName(); /** - * Verify if the protocol can speak the given protocol. + * Verify if the extension can handle the given extension name. * - * @param protocol the protocol to verify - * @return true if the protocol handler can handle the given protocol, otherwise false. + * @param extension the extension to verify + * @return true if the extension can handle the given extension name, otherwise false. */ - boolean accept(String protocol); + boolean accept(String extension); /** - * Initialize the protocol handler when the protocol is constructed from reflection. + * Initialize the extension when the extension is constructed from reflection. * - *

The initialize should initialize all the resources required for serving the protocol - * handler but don't start those resources until {@link #start(ProxyService)} is called. + *

The initialize should initialize all the resources required for serving the extension + * but don't start those resources until {@link #start(ProxyService)} is called. * - * @param conf broker service configuration - * @throws Exception when fail to initialize the protocol handler. + * @param conf proxy service configuration + * @throws Exception when fail to initialize the extension. */ void initialize(ProxyConfiguration conf) throws Exception; /** - * Start the protocol handler with the provided broker service. + * Start the extension with the provided proxy service. * - *

The broker service provides the accesses to the Pulsar components such as load - * manager, namespace service, managed ledger and etc. + *

The proxy service provides the accesses to the Pulsar Proxy components. * * @param service the broker service to start with. */ void start(ProxyService service); /** - * Create the list of channel initializers for the ports that this protocol handler + * Create the list of channel initializers for the ports that this extension * will listen on. * *

NOTE: this method is called after {@link #start(ProxyService)}. * - * @return the list of channel initializers for the ports that this protocol handler listens on. + * @return the list of channel initializers for the ports that this extension listens on. */ Map> newChannelInitializers(); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/protocol/ProtocolHandlerDefinition.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionDefinition.java similarity index 75% rename from pulsar-proxy/src/main/java/org/apache/pulsar/proxy/protocol/ProtocolHandlerDefinition.java rename to pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionDefinition.java index b33c7ab14da60..65560260c2478 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/protocol/ProtocolHandlerDefinition.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionDefinition.java @@ -16,31 +16,31 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.proxy.protocol; +package org.apache.pulsar.proxy.extensions; import lombok.Data; import lombok.NoArgsConstructor; /** - * Metadata information about a Pulsar protocol handler. + * Metadata information about a Proxy Extension. */ @Data @NoArgsConstructor -public class ProtocolHandlerDefinition { +public class ProxyExtensionDefinition { /** - * The name of the protocol. + * The name of the extension. */ private String name; /** - * The description of the protocol handler to be used for user help. + * The description of the extension to be used for user help. */ private String description; /** - * The class name for the protocol handler. + * The class name for the extension. */ - private String handlerClass; + private String extensionClass; } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/protocol/ProtocolHandlerMetadata.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionMetadata.java similarity index 80% rename from pulsar-proxy/src/main/java/org/apache/pulsar/proxy/protocol/ProtocolHandlerMetadata.java rename to pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionMetadata.java index f9a9646f1823d..632c841e5afea 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/protocol/ProtocolHandlerMetadata.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionMetadata.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.proxy.protocol; +package org.apache.pulsar.proxy.extensions; import lombok.Data; import lombok.NoArgsConstructor; @@ -24,19 +24,19 @@ import java.nio.file.Path; /** - * The metadata of protocol handler. + * The metadata of Proxy Extension. */ @Data @NoArgsConstructor -class ProtocolHandlerMetadata { +class ProxyExtensionMetadata { /** - * The definition of the protocol handler. + * The definition of the extension. */ - private ProtocolHandlerDefinition definition; + private ProxyExtensionDefinition definition; /** - * The path to the handler package. + * The path to the extension package. */ private Path archivePath; diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/protocol/ProtocolHandlerWithClassLoader.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoader.java similarity index 83% rename from pulsar-proxy/src/main/java/org/apache/pulsar/proxy/protocol/ProtocolHandlerWithClassLoader.java rename to pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoader.java index 88538246cc874..1f6924a747166 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/protocol/ProtocolHandlerWithClassLoader.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoader.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.proxy.protocol; +package org.apache.pulsar.proxy.extensions; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; @@ -32,61 +32,61 @@ import java.util.Map; /** - * A protocol handler with its classloader. + * A extension with its classloader. */ @Slf4j @Data @RequiredArgsConstructor -class ProtocolHandlerWithClassLoader implements ProtocolHandler { +class ProxyExtensionWithClassLoader implements ProxyExtension { - private final ProtocolHandler handler; + private final ProxyExtension extension; private final NarClassLoader classLoader; @Override - public String protocolName() { + public String extensionName() { try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { - return handler.protocolName(); + return extension.extensionName(); } } @Override - public boolean accept(String protocol) { + public boolean accept(String extensionName) { try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { - return handler.accept(protocol); + return extension.accept(extensionName); } } @Override public void initialize(ProxyConfiguration conf) throws Exception { try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { - handler.initialize(conf); + extension.initialize(conf); } } @Override public void start(ProxyService service) { try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { - handler.start(service); + extension.start(service); } } @Override public Map> newChannelInitializers() { try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { - return handler.newChannelInitializers(); + return extension.newChannelInitializers(); } } @Override public void close() { try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { - handler.close(); + extension.close(); } try { classLoader.close(); } catch (IOException e) { - log.warn("Failed to close the protocol handler class loader", e); + log.warn("Failed to close the extension class loader", e); } } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java new file mode 100644 index 0000000000000..8f58a0938a58c --- /dev/null +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.extensions; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.SocketChannel; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.proxy.server.ProxyConfiguration; +import org.apache.pulsar.proxy.server.ProxyService; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.Set; + +/** + * A collection of loaded extensions. + */ +@Slf4j +public class ProxyExtensions implements AutoCloseable { + + /** + * Load the extensions for the given extensions list. + * + * @param conf the pulsar broker service configuration + * @return the collection of extensions + */ + public static ProxyExtensions load(ProxyConfiguration conf) throws IOException { + ExtensionsDefinitions definitions = + ProxyExtensionsUtils.searchForExtensions( + conf.getProxyExtensionsDirectory(), conf.getNarExtractionDirectory()); + + ImmutableMap.Builder extensionsBuilder = ImmutableMap.builder(); + + conf.getProxyExtensions().forEach(extensionName -> { + + ProxyExtensionMetadata definition = definitions.extensions().get(extensionName); + if (null == definition) { + throw new RuntimeException("No extension is found for extension name `" + extensionName + + "`. Available extensions are : " + definitions.extensions()); + } + + ProxyExtensionWithClassLoader extension; + try { + extension = ProxyExtensionsUtils.load(definition, conf.getNarExtractionDirectory()); + } catch (IOException e) { + log.error("Failed to load the extension for extension `" + extensionName + "`", e); + throw new RuntimeException("Failed to load the extension for extension name `" + extensionName + "`"); + } + + if (!extension.accept(extensionName)) { + extension.close(); + log.error("Malformed extension found for extensionName `" + extensionName + "`"); + throw new RuntimeException("Malformed extension found for extension name `" + extensionName + "`"); + } + + extensionsBuilder.put(extensionName, extension); + log.info("Successfully loaded extension for extension name `{}`", extensionName); + }); + + return new ProxyExtensions(extensionsBuilder.build()); + } + + private final Map extensions; + + ProxyExtensions(Map extensions) { + this.extensions = extensions; + } + + /** + * Return the handler for the provided extension. + * + * @param extension the extension to use + * @return the extension to handle the provided extension + */ + public ProxyExtension extension(String extension) { + ProxyExtensionWithClassLoader h = extensions.get(extension); + if (null == h) { + return null; + } else { + return h.getExtension(); + } + } + + public void initialize(ProxyConfiguration conf) throws Exception { + for (ProxyExtension extension : extensions.values()) { + extension.initialize(conf); + } + } + + public Map>> newChannelInitializers() { + Map>> channelInitializers = Maps.newHashMap(); + Set addresses = Sets.newHashSet(); + + for (Map.Entry extension : extensions.entrySet()) { + Map> initializers = + extension.getValue().newChannelInitializers(); + initializers.forEach((address, initializer) -> { + if (!addresses.add(address)) { + log.error("extension for `{}` attempts to use {} for its listening port." + + " But it is already occupied by other extensions.", + extension.getKey(), address); + throw new RuntimeException("extension for `" + extension.getKey() + + "` attempts to use " + address + " for its listening port. But it is" + + " already occupied by other messaging extensions"); + } + channelInitializers.put(extension.getKey(), initializers); + }); + } + + return channelInitializers; + } + + public void start(ProxyService service) { + extensions.values().forEach(extension -> extension.start(service)); + } + + @Override + public void close() { + extensions.values().forEach(ProxyExtension::close); + } +} diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionsUtils.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionsUtils.java new file mode 100644 index 0000000000000..2f02827519c11 --- /dev/null +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionsUtils.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.extensions; + +import lombok.experimental.UtilityClass; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.common.util.ObjectMapperFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * Util class to search and load {@link ProxyExtension}s. + */ +@UtilityClass +@Slf4j +class ProxyExtensionsUtils { + + static final String PROXY_EXTENSION_DEFINITION_FILE = "pulsar-proxy-extension.yml"; + + /** + * Retrieve the extension definition from the provided handler nar package. + * + * @param narPath the path to the extension NAR package + * @return the extension definition + * @throws IOException when fail to load the extension or get the definition + */ + public static ProxyExtensionDefinition getProxyExtensionDefinition(String narPath, String narExtractionDirectory) + throws IOException { + try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), + narExtractionDirectory)) { + return getProxyExtensionDefinition(ncl); + } + } + + private static ProxyExtensionDefinition getProxyExtensionDefinition(NarClassLoader ncl) throws IOException { + String configStr = ncl.getServiceDefinition(PROXY_EXTENSION_DEFINITION_FILE); + + return ObjectMapperFactory.getThreadLocalYaml().readValue( + configStr, ProxyExtensionDefinition.class + ); + } + + /** + * Search and load the available extensions. + * + * @param extensionsDirectory the directory where all the extensions are stored + * @return a collection of extensions + * @throws IOException when fail to load the available extensions from the provided directory. + */ + public static ExtensionsDefinitions searchForExtensions(String extensionsDirectory, + String narExtractionDirectory) throws IOException { + Path path = Paths.get(extensionsDirectory).toAbsolutePath(); + log.info("Searching for extensions in {}", path); + + ExtensionsDefinitions extensions = new ExtensionsDefinitions(); + if (!path.toFile().exists()) { + log.warn("extension directory not found"); + return extensions; + } + + try (DirectoryStream stream = Files.newDirectoryStream(path, "*.nar")) { + for (Path archive : stream) { + try { + ProxyExtensionDefinition phDef = + ProxyExtensionsUtils.getProxyExtensionDefinition(archive.toString(), narExtractionDirectory); + log.info("Found extension from {} : {}", archive, phDef); + + checkArgument(StringUtils.isNotBlank(phDef.getName())); + checkArgument(StringUtils.isNotBlank(phDef.getExtensionClass())); + + ProxyExtensionMetadata metadata = new ProxyExtensionMetadata(); + metadata.setDefinition(phDef); + metadata.setArchivePath(archive); + + extensions.extensions().put(phDef.getName(), metadata); + } catch (Throwable t) { + log.warn("Failed to load connector from {}." + + " It is OK however if you want to use this extension," + + " please make sure you put the correct extension NAR" + + " package in the extensions directory.", archive, t); + } + } + } + + return extensions; + } + + /** + * Load the extension according to the handler definition. + * + * @param metadata the extension definition. + * @return + */ + static ProxyExtensionWithClassLoader load(ProxyExtensionMetadata metadata, + String narExtractionDirectory) throws IOException { + NarClassLoader ncl = NarClassLoader.getFromArchive( + metadata.getArchivePath().toAbsolutePath().toFile(), + Collections.emptySet(), + ProxyExtension.class.getClassLoader(), narExtractionDirectory); + + ProxyExtensionDefinition phDef = getProxyExtensionDefinition(ncl); + if (StringUtils.isBlank(phDef.getExtensionClass())) { + throw new IOException("extension `" + phDef.getName() + "` does NOT provide a protocol" + + " handler implementation"); + } + + try { + Class extensionClass = ncl.loadClass(phDef.getExtensionClass()); + Object extension = extensionClass.newInstance(); + if (!(extension instanceof ProxyExtension)) { + throw new IOException("Class " + phDef.getExtensionClass() + + " does not implement extension interface"); + } + ProxyExtension ph = (ProxyExtension) extension; + return new ProxyExtensionWithClassLoader(ph, ncl); + } catch (Throwable t) { + rethrowIOException(t); + return null; + } + } + + private static void rethrowIOException(Throwable cause) + throws IOException { + if (cause instanceof IOException) { + throw (IOException) cause; + } else if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else if (cause instanceof Error) { + throw (Error) cause; + } else { + throw new IOException(cause.getMessage(), cause); + } + } + +} diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/protocol/ProtocolHandlerUtils.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/protocol/ProtocolHandlerUtils.java deleted file mode 100644 index a7f196621e6ef..0000000000000 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/protocol/ProtocolHandlerUtils.java +++ /dev/null @@ -1,161 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.proxy.protocol; - -import lombok.experimental.UtilityClass; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.common.nar.NarClassLoader; -import org.apache.pulsar.common.util.ObjectMapperFactory; - -import java.io.File; -import java.io.IOException; -import java.nio.file.DirectoryStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Collections; - -import static com.google.common.base.Preconditions.checkArgument; - -/** - * Util class to search and load {@link ProtocolHandler}s. - */ -@UtilityClass -@Slf4j -class ProtocolHandlerUtils { - - static final String PULSAR_PROTOCOL_HANDLER_DEFINITION_FILE = "pulsar-proxy-protocol-handler.yml"; - - /** - * Retrieve the protocol handler definition from the provided handler nar package. - * - * @param narPath the path to the protocol handler NAR package - * @return the protocol handler definition - * @throws IOException when fail to load the protocol handler or get the definition - */ - public static ProtocolHandlerDefinition getProtocolHandlerDefinition(String narPath, String narExtractionDirectory) - throws IOException { - try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), - narExtractionDirectory)) { - return getProtocolHandlerDefinition(ncl); - } - } - - private static ProtocolHandlerDefinition getProtocolHandlerDefinition(NarClassLoader ncl) throws IOException { - String configStr = ncl.getServiceDefinition(PULSAR_PROTOCOL_HANDLER_DEFINITION_FILE); - - return ObjectMapperFactory.getThreadLocalYaml().readValue( - configStr, ProtocolHandlerDefinition.class - ); - } - - /** - * Search and load the available protocol handlers. - * - * @param handlersDirectory the directory where all the protocol handlers are stored - * @return a collection of protocol handlers - * @throws IOException when fail to load the available protocol handlers from the provided directory. - */ - public static ProtocolHandlerDefinitions searchForHandlers(String handlersDirectory, - String narExtractionDirectory) throws IOException { - Path path = Paths.get(handlersDirectory).toAbsolutePath(); - log.info("Searching for protocol handlers in {}", path); - - ProtocolHandlerDefinitions handlers = new ProtocolHandlerDefinitions(); - if (!path.toFile().exists()) { - log.warn("Protocol handler directory not found"); - return handlers; - } - - try (DirectoryStream stream = Files.newDirectoryStream(path, "*.nar")) { - for (Path archive : stream) { - try { - ProtocolHandlerDefinition phDef = - ProtocolHandlerUtils.getProtocolHandlerDefinition(archive.toString(), narExtractionDirectory); - log.info("Found protocol handler from {} : {}", archive, phDef); - - checkArgument(StringUtils.isNotBlank(phDef.getName())); - checkArgument(StringUtils.isNotBlank(phDef.getHandlerClass())); - - ProtocolHandlerMetadata metadata = new ProtocolHandlerMetadata(); - metadata.setDefinition(phDef); - metadata.setArchivePath(archive); - - handlers.handlers().put(phDef.getName(), metadata); - } catch (Throwable t) { - log.warn("Failed to load connector from {}." - + " It is OK however if you want to use this protocol handler," - + " please make sure you put the correct protocol handler NAR" - + " package in the handlers directory.", archive, t); - } - } - } - - return handlers; - } - - /** - * Load the protocol handler according to the handler definition. - * - * @param metadata the protocol handler definition. - * @return - */ - static ProtocolHandlerWithClassLoader load(ProtocolHandlerMetadata metadata, - String narExtractionDirectory) throws IOException { - NarClassLoader ncl = NarClassLoader.getFromArchive( - metadata.getArchivePath().toAbsolutePath().toFile(), - Collections.emptySet(), - ProtocolHandler.class.getClassLoader(), narExtractionDirectory); - - ProtocolHandlerDefinition phDef = getProtocolHandlerDefinition(ncl); - if (StringUtils.isBlank(phDef.getHandlerClass())) { - throw new IOException("Protocol handler `" + phDef.getName() + "` does NOT provide a protocol" - + " handler implementation"); - } - - try { - Class handlerClass = ncl.loadClass(phDef.getHandlerClass()); - Object handler = handlerClass.newInstance(); - if (!(handler instanceof ProtocolHandler)) { - throw new IOException("Class " + phDef.getHandlerClass() - + " does not implement protocol handler interface"); - } - ProtocolHandler ph = (ProtocolHandler) handler; - return new ProtocolHandlerWithClassLoader(ph, ncl); - } catch (Throwable t) { - rethrowIOException(t); - return null; - } - } - - private static void rethrowIOException(Throwable cause) - throws IOException { - if (cause instanceof IOException) { - throw (IOException) cause; - } else if (cause instanceof RuntimeException) { - throw (RuntimeException) cause; - } else if (cause instanceof Error) { - throw (Error) cause; - } else { - throw new IOException(cause.getMessage(), cause); - } - } - -} diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/protocol/ProtocolHandlers.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/protocol/ProtocolHandlers.java deleted file mode 100644 index 83aedc391cb5a..0000000000000 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/protocol/ProtocolHandlers.java +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.proxy.protocol; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.socket.SocketChannel; -import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.proxy.server.ProxyConfiguration; -import org.apache.pulsar.proxy.server.ProxyService; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Map; -import java.util.Set; - -/** - * A collection of loaded handlers. - */ -@Slf4j -public class ProtocolHandlers implements AutoCloseable { - - /** - * Load the protocol handlers for the given protocol list. - * - * @param conf the pulsar broker service configuration - * @return the collection of protocol handlers - */ - public static ProtocolHandlers load(ProxyConfiguration conf) throws IOException { - ProtocolHandlerDefinitions definitions = - ProtocolHandlerUtils.searchForHandlers( - conf.getProxyProtocolHandlerDirectory(), conf.getNarExtractionDirectory()); - - ImmutableMap.Builder handlersBuilder = ImmutableMap.builder(); - - conf.getProxyMessagingProtocols().forEach(protocol -> { - - ProtocolHandlerMetadata definition = definitions.handlers().get(protocol); - if (null == definition) { - throw new RuntimeException("No protocol handler is found for protocol `" + protocol - + "`. Available protocols are : " + definitions.handlers()); - } - - ProtocolHandlerWithClassLoader handler; - try { - handler = ProtocolHandlerUtils.load(definition, conf.getNarExtractionDirectory()); - } catch (IOException e) { - log.error("Failed to load the protocol handler for protocol `" + protocol + "`", e); - throw new RuntimeException("Failed to load the protocol handler for protocol `" + protocol + "`"); - } - - if (!handler.accept(protocol)) { - handler.close(); - log.error("Malformed protocol handler found for protocol `" + protocol + "`"); - throw new RuntimeException("Malformed protocol handler found for protocol `" + protocol + "`"); - } - - handlersBuilder.put(protocol, handler); - log.info("Successfully loaded protocol handler for protocol `{}`", protocol); - }); - - return new ProtocolHandlers(handlersBuilder.build()); - } - - private final Map handlers; - - ProtocolHandlers(Map handlers) { - this.handlers = handlers; - } - - /** - * Return the handler for the provided protocol. - * - * @param protocol the protocol to use - * @return the protocol handler to handle the provided protocol - */ - public ProtocolHandler protocol(String protocol) { - ProtocolHandlerWithClassLoader h = handlers.get(protocol); - if (null == h) { - return null; - } else { - return h.getHandler(); - } - } - - public void initialize(ProxyConfiguration conf) throws Exception { - for (ProtocolHandler handler : handlers.values()) { - handler.initialize(conf); - } - } - - public Map>> newChannelInitializers() { - Map>> channelInitializers = Maps.newHashMap(); - Set addresses = Sets.newHashSet(); - - for (Map.Entry handler : handlers.entrySet()) { - Map> initializers = - handler.getValue().newChannelInitializers(); - initializers.forEach((address, initializer) -> { - if (!addresses.add(address)) { - log.error("Protocol handler for `{}` attempts to use {} for its listening port." - + " But it is already occupied by other message protocols.", - handler.getKey(), address); - throw new RuntimeException("Protocol handler for `" + handler.getKey() - + "` attempts to use " + address + " for its listening port. But it is" - + " already occupied by other messaging protocols"); - } - channelInitializers.put(handler.getKey(), initializers); - }); - } - - return channelInitializers; - } - - public void start(ProxyService service) { - handlers.values().forEach(handler -> handler.start(service)); - } - - @Override - public void close() { - handlers.values().forEach(ProtocolHandler::close); - } -} diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java index c549b45b79fe1..62f8a0281923c 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java @@ -50,6 +50,7 @@ /** * Maintains available active broker list and returns next active broker in round-robin for discovery service. * + * This is an API used by Proxy Extensions. */ public class BrokerDiscoveryProvider implements Closeable { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index 8305e3408e1b0..90af46cd8da6d 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -624,15 +624,15 @@ public class ProxyConfiguration implements PulsarConfiguration { /***** --- Protocol Handlers --- ****/ @FieldContext( category = CATEGORY_PLUGIN, - doc = "The directory to locate messaging protocol handlers" + doc = "The directory to locate proxy extensions" ) - private String proxyProtocolHandlerDirectory = "./proxyprotocols"; + private String proxyExtensionsDirectory = "./proxyextensions"; @FieldContext( category = CATEGORY_PLUGIN, - doc = "List of messaging protocols to load, which is a list of protocol names" + doc = "List of messaging protocols to load, which is a list of extension names" ) - private Set proxyMessagingProtocols = Sets.newTreeSet(); + private Set proxyExtensions = Sets.newTreeSet(); /***** --- WebSocket --- ****/ @FieldContext( diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index b279cf6499cae..699bebf88b51a 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -63,7 +63,7 @@ import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; -import org.apache.pulsar.proxy.protocol.ProtocolHandlers; +import org.apache.pulsar.proxy.extensions.ProxyExtensions; import org.apache.pulsar.proxy.stats.TopicStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,7 +90,7 @@ public class ProxyService implements Closeable { private MetadataStoreExtended localMetadataStore; private MetadataStoreExtended configMetadataStore; private PulsarResources pulsarResources; - private ProtocolHandlers protocolHandlers = null; + private ProxyExtensions proxyExtensions = null; private final EventLoopGroup acceptorGroup; private final EventLoopGroup workerGroup; @@ -167,8 +167,8 @@ public ProxyService(ProxyConfiguration proxyConfig, proxyConfig.getBrokerProxyAllowedTargetPorts()); // Initialize the message protocol handlers - protocolHandlers = ProtocolHandlers.load(proxyConfig); - protocolHandlers.initialize(proxyConfig); + proxyExtensions = ProxyExtensions.load(proxyConfig); + proxyExtensions.initialize(proxyConfig); statsExecutor = Executors .newSingleThreadScheduledExecutor(new DefaultThreadFactory("proxy-stats-executor")); @@ -254,20 +254,20 @@ public void start() throws Exception { // Initialize the message protocol handlers. // start the protocol handlers only after the broker is ready, // so that the protocol handlers can access broker service properly. - this.protocolHandlers.start(this); + this.proxyExtensions.start(this); Map>> protocolHandlerChannelInitializers = - this.protocolHandlers.newChannelInitializers(); - startProtocolHandlers(protocolHandlerChannelInitializers, bootstrap); + this.proxyExtensions.newChannelInitializers(); + startProxyExtensions(protocolHandlerChannelInitializers, bootstrap); } // This call is used for starting additional protocol handlers - public void startProtocolHandlers( + public void startProxyExtensions( Map>> protocolHandlers, ServerBootstrap serverBootstrap) { - protocolHandlers.forEach((protocol, initializers) -> { + protocolHandlers.forEach((extensionName, initializers) -> { initializers.forEach((address, initializer) -> { try { - startProtocolHandler(protocol, address, initializer, serverBootstrap); + startProxyExtension(extensionName, address, initializer, serverBootstrap); } catch (IOException e) { LOG.error("{}", e.getMessage(), e.getCause()); throw new RuntimeException(e.getMessage(), e.getCause()); @@ -276,18 +276,18 @@ public void startProtocolHandlers( }); } - private void startProtocolHandler(String protocol, - SocketAddress address, - ChannelInitializer initializer, - ServerBootstrap serverBootstrap) throws IOException { + private void startProxyExtension(String extensionName, + SocketAddress address, + ChannelInitializer initializer, + ServerBootstrap serverBootstrap) throws IOException { ServerBootstrap bootstrap = serverBootstrap.clone(); bootstrap.childHandler(initializer); try { bootstrap.bind(address).sync(); } catch (Exception e) { - throw new IOException("Failed to bind protocol `" + protocol + "` on " + address, e); + throw new IOException("Failed to bind extension `" + extensionName + "` on " + address, e); } - LOG.info("Successfully bind protocol `{}` on {}", protocol, address); + LOG.info("Successfully bound extension `{}` on {}", extensionName, address); } public BrokerDiscoveryProvider getDiscoveryProvider() { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/protocol/MockProtocolHandler.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/MockProxyExtension.java similarity index 92% rename from pulsar-proxy/src/test/java/org/apache/pulsar/proxy/protocol/MockProtocolHandler.java rename to pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/MockProxyExtension.java index b593909fc1354..94dd3b55e2c62 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/protocol/MockProtocolHandler.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/MockProxyExtension.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.proxy.protocol; +package org.apache.pulsar.proxy.extensions; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; @@ -27,12 +27,12 @@ import java.util.Collections; import java.util.Map; -class MockProtocolHandler implements ProtocolHandler { +class MockProxyExtension implements ProxyExtension { public static final String NAME = "mock"; @Override - public String protocolName() { + public String extensionName() { return NAME; } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/protocol/ProtocolHandlerUtilsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/ProxyExtensionUtilsTest.java similarity index 68% rename from pulsar-proxy/src/test/java/org/apache/pulsar/proxy/protocol/ProtocolHandlerUtilsTest.java rename to pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/ProxyExtensionUtilsTest.java index f0290042124c8..86ae7fffa775e 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/protocol/ProtocolHandlerUtilsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/ProxyExtensionUtilsTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.proxy.protocol; +package org.apache.pulsar.proxy.extensions; import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -32,7 +32,7 @@ import java.nio.file.Paths; import java.util.Set; -import static org.apache.pulsar.proxy.protocol.ProtocolHandlerUtils.PULSAR_PROTOCOL_HANDLER_DEFINITION_FILE; +import static org.apache.pulsar.proxy.extensions.ProxyExtensionsUtils.PROXY_EXTENSION_DEFINITION_FILE; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -42,11 +42,11 @@ import static org.testng.AssertJUnit.fail; @PrepareForTest({ - ProtocolHandlerUtils.class, NarClassLoader.class + ProxyExtensionsUtils.class, NarClassLoader.class }) @PowerMockIgnore({"org.apache.logging.log4j.*"}) @Test(groups = "broker") -public class ProtocolHandlerUtilsTest { +public class ProxyExtensionUtilsTest { // Necessary to make PowerMockito.mockStatic work with TestNG. @ObjectFactory @@ -56,21 +56,21 @@ public IObjectFactory getObjectFactory() { @Test public void testLoadProtocolHandler() throws Exception { - ProtocolHandlerDefinition def = new ProtocolHandlerDefinition(); - def.setHandlerClass(MockProtocolHandler.class.getName()); - def.setDescription("test-protocol-handler"); + ProxyExtensionDefinition def = new ProxyExtensionDefinition(); + def.setExtensionClass(MockProxyExtension.class.getName()); + def.setDescription("test-ext"); - String archivePath = "/path/to/protocol/handler/nar"; + String archivePath = "/path/to/ext/nar"; - ProtocolHandlerMetadata metadata = new ProtocolHandlerMetadata(); + ProxyExtensionMetadata metadata = new ProxyExtensionMetadata(); metadata.setDefinition(def); metadata.setArchivePath(Paths.get(archivePath)); NarClassLoader mockLoader = mock(NarClassLoader.class); - when(mockLoader.getServiceDefinition(eq(PULSAR_PROTOCOL_HANDLER_DEFINITION_FILE))) + when(mockLoader.getServiceDefinition(eq(PROXY_EXTENSION_DEFINITION_FILE))) .thenReturn(ObjectMapperFactory.getThreadLocalYaml().writeValueAsString(def)); - Class handlerClass = MockProtocolHandler.class; - when(mockLoader.loadClass(eq(MockProtocolHandler.class.getName()))) + Class handlerClass = MockProxyExtension.class; + when(mockLoader.loadClass(eq(MockProxyExtension.class.getName()))) .thenReturn(handlerClass); PowerMockito.mockStatic(NarClassLoader.class); @@ -81,29 +81,29 @@ public void testLoadProtocolHandler() throws Exception { any(String.class) )).thenReturn(mockLoader); - ProtocolHandlerWithClassLoader returnedPhWithCL = ProtocolHandlerUtils.load(metadata, ""); - ProtocolHandler returnedPh = returnedPhWithCL.getHandler(); + ProxyExtensionWithClassLoader returnedPhWithCL = ProxyExtensionsUtils.load(metadata, ""); + ProxyExtension returnedPh = returnedPhWithCL.getExtension(); assertSame(mockLoader, returnedPhWithCL.getClassLoader()); - assertTrue(returnedPh instanceof MockProtocolHandler); + assertTrue(returnedPh instanceof MockProxyExtension); } @Test public void testLoadProtocolHandlerBlankHandlerClass() throws Exception { - ProtocolHandlerDefinition def = new ProtocolHandlerDefinition(); - def.setDescription("test-protocol-handler"); + ProxyExtensionDefinition def = new ProxyExtensionDefinition(); + def.setDescription("test-ext"); - String archivePath = "/path/to/protocol/handler/nar"; + String archivePath = "/path/to/ext/nar"; - ProtocolHandlerMetadata metadata = new ProtocolHandlerMetadata(); + ProxyExtensionMetadata metadata = new ProxyExtensionMetadata(); metadata.setDefinition(def); metadata.setArchivePath(Paths.get(archivePath)); NarClassLoader mockLoader = mock(NarClassLoader.class); - when(mockLoader.getServiceDefinition(eq(PULSAR_PROTOCOL_HANDLER_DEFINITION_FILE))) + when(mockLoader.getServiceDefinition(eq(PROXY_EXTENSION_DEFINITION_FILE))) .thenReturn(ObjectMapperFactory.getThreadLocalYaml().writeValueAsString(def)); - Class handlerClass = MockProtocolHandler.class; - when(mockLoader.loadClass(eq(MockProtocolHandler.class.getName()))) + Class handlerClass = MockProxyExtension.class; + when(mockLoader.loadClass(eq(MockProxyExtension.class.getName()))) .thenReturn(handlerClass); PowerMockito.mockStatic(NarClassLoader.class); @@ -115,7 +115,7 @@ public void testLoadProtocolHandlerBlankHandlerClass() throws Exception { )).thenReturn(mockLoader); try { - ProtocolHandlerUtils.load(metadata, ""); + ProxyExtensionsUtils.load(metadata, ""); fail("Should not reach here"); } catch (IOException ioe) { // expected @@ -124,18 +124,18 @@ public void testLoadProtocolHandlerBlankHandlerClass() throws Exception { @Test public void testLoadProtocolHandlerWrongHandlerClass() throws Exception { - ProtocolHandlerDefinition def = new ProtocolHandlerDefinition(); - def.setHandlerClass(Runnable.class.getName()); - def.setDescription("test-protocol-handler"); + ProxyExtensionDefinition def = new ProxyExtensionDefinition(); + def.setExtensionClass(Runnable.class.getName()); + def.setDescription("test-ext"); - String archivePath = "/path/to/protocol/handler/nar"; + String archivePath = "/path/to/ext/nar"; - ProtocolHandlerMetadata metadata = new ProtocolHandlerMetadata(); + ProxyExtensionMetadata metadata = new ProxyExtensionMetadata(); metadata.setDefinition(def); metadata.setArchivePath(Paths.get(archivePath)); NarClassLoader mockLoader = mock(NarClassLoader.class); - when(mockLoader.getServiceDefinition(eq(PULSAR_PROTOCOL_HANDLER_DEFINITION_FILE))) + when(mockLoader.getServiceDefinition(eq(PROXY_EXTENSION_DEFINITION_FILE))) .thenReturn(ObjectMapperFactory.getThreadLocalYaml().writeValueAsString(def)); Class handlerClass = Runnable.class; when(mockLoader.loadClass(eq(Runnable.class.getName()))) @@ -150,7 +150,7 @@ public void testLoadProtocolHandlerWrongHandlerClass() throws Exception { )).thenReturn(mockLoader); try { - ProtocolHandlerUtils.load(metadata, ""); + ProxyExtensionsUtils.load(metadata, ""); fail("Should not reach here"); } catch (IOException ioe) { // expected diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/protocol/ProtocolHandlerWithClassLoaderTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoaderTest.java similarity index 86% rename from pulsar-proxy/src/test/java/org/apache/pulsar/proxy/protocol/ProtocolHandlerWithClassLoaderTest.java rename to pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoaderTest.java index b5961528f70ac..b43eb22ab8952 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/protocol/ProtocolHandlerWithClassLoaderTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoaderTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.proxy.protocol; +package org.apache.pulsar.proxy.extensions; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; @@ -40,22 +40,22 @@ import static org.testng.Assert.expectThrows; /** - * Unit test {@link ProtocolHandlerWithClassLoader}. + * Unit test {@link ProxyExtensionWithClassLoader}. */ @Test(groups = "broker") -public class ProtocolHandlerWithClassLoaderTest { +public class ProxyExtensionWithClassLoaderTest { @Test public void testWrapper() throws Exception { - ProtocolHandler h = mock(ProtocolHandler.class); + ProxyExtension h = mock(ProxyExtension.class); NarClassLoader loader = mock(NarClassLoader.class); - ProtocolHandlerWithClassLoader wrapper = new ProtocolHandlerWithClassLoader(h, loader); + ProxyExtensionWithClassLoader wrapper = new ProxyExtensionWithClassLoader(h, loader); String protocol = "kafka"; - when(h.protocolName()).thenReturn(protocol); - assertEquals(protocol, wrapper.protocolName()); - verify(h, times(1)).protocolName(); + when(h.extensionName()).thenReturn(protocol); + assertEquals(protocol, wrapper.extensionName()); + verify(h, times(1)).extensionName(); when(h.accept(eq(protocol))).thenReturn(true); assertTrue(wrapper.accept(protocol)); @@ -75,9 +75,9 @@ public void testClassLoaderSwitcher() throws Exception { String protocol = "test-protocol"; - ProtocolHandler h = new ProtocolHandler() { + ProxyExtension h = new ProxyExtension() { @Override - public String protocolName() { + public String extensionName() { assertEquals(Thread.currentThread().getContextClassLoader(), loader); return protocol; } @@ -110,11 +110,11 @@ public void close() { assertEquals(Thread.currentThread().getContextClassLoader(), loader); } }; - ProtocolHandlerWithClassLoader wrapper = new ProtocolHandlerWithClassLoader(h, loader); + ProxyExtensionWithClassLoader wrapper = new ProxyExtensionWithClassLoader(h, loader); ClassLoader curClassLoader = Thread.currentThread().getContextClassLoader(); - assertEquals(wrapper.protocolName(), protocol); + assertEquals(wrapper.extensionName(), protocol); assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader); assertTrue(wrapper.accept(protocol)); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/protocol/ProtocolHandlersTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/ProxyExtensionsTest.java similarity index 72% rename from pulsar-proxy/src/test/java/org/apache/pulsar/proxy/protocol/ProtocolHandlersTest.java rename to pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/ProxyExtensionsTest.java index ce43d4dbacac1..00fb3be50c3ac 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/protocol/ProtocolHandlersTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/ProxyExtensionsTest.java @@ -16,11 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.proxy.protocol; +package org.apache.pulsar.proxy.extensions; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; -import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.proxy.server.ProxyConfiguration; import org.apache.pulsar.proxy.server.ProxyService; @@ -42,70 +41,70 @@ import static org.testng.Assert.assertSame; /** - * Unit test {@link ProtocolHandlers}. + * Unit test {@link ProxyExtensions}. */ @Test(groups = "proxy") -public class ProtocolHandlersTest { +public class ProxyExtensionsTest { private static final String protocol1 = "protocol1"; - private ProtocolHandler handler1; + private ProxyExtension extension1; private NarClassLoader ncl1; private static final String protocol2 = "protocol2"; - private ProtocolHandler handler2; + private ProxyExtension extension2; private NarClassLoader ncl2; private static final String protocol3 = "protocol3"; - private Map handlerMap; - private ProtocolHandlers handlers; + private Map extensionsMap; + private ProxyExtensions extensions; @BeforeMethod public void setup() { - this.handler1 = mock(ProtocolHandler.class); + this.extension1 = mock(ProxyExtension.class); this.ncl1 = mock(NarClassLoader.class); - this.handler2 = mock(ProtocolHandler.class); + this.extension2 = mock(ProxyExtension.class); this.ncl2 = mock(NarClassLoader.class); - this.handlerMap = new HashMap<>(); - this.handlerMap.put( + this.extensionsMap = new HashMap<>(); + this.extensionsMap.put( protocol1, - new ProtocolHandlerWithClassLoader(handler1, ncl1)); - this.handlerMap.put( + new ProxyExtensionWithClassLoader(extension1, ncl1)); + this.extensionsMap.put( protocol2, - new ProtocolHandlerWithClassLoader(handler2, ncl2)); - this.handlers = new ProtocolHandlers(this.handlerMap); + new ProxyExtensionWithClassLoader(extension2, ncl2)); + this.extensions = new ProxyExtensions(this.extensionsMap); } @AfterMethod(alwaysRun = true) public void teardown() throws Exception { - this.handlers.close(); + this.extensions.close(); - verify(handler1, times(1)).close(); - verify(handler2, times(1)).close(); + verify(extension1, times(1)).close(); + verify(extension2, times(1)).close(); verify(ncl1, times(1)).close(); verify(ncl2, times(1)).close(); } @Test public void testGetProtocol() { - assertSame(handler1, handlers.protocol(protocol1)); - assertSame(handler2, handlers.protocol(protocol2)); - assertNull(handlers.protocol(protocol3)); + assertSame(extension1, extensions.extension(protocol1)); + assertSame(extension2, extensions.extension(protocol2)); + assertNull(extensions.extension(protocol3)); } @Test public void testInitialize() throws Exception { ProxyConfiguration conf = new ProxyConfiguration(); - handlers.initialize(conf); - verify(handler1, times(1)).initialize(same(conf)); - verify(handler2, times(1)).initialize(same(conf)); + extensions.initialize(conf); + verify(extension1, times(1)).initialize(same(conf)); + verify(extension2, times(1)).initialize(same(conf)); } @Test public void testStart() { ProxyService service = mock(ProxyService.class); - handlers.start(service); - verify(handler1, times(1)).start(same(service)); - verify(handler2, times(1)).start(same(service)); + extensions.start(service); + verify(extension1, times(1)).start(same(service)); + verify(extension2, times(1)).start(same(service)); } @Test @@ -122,11 +121,11 @@ public void testNewChannelInitializersSuccess() { p2Initializers.put(new InetSocketAddress("127.0.0.3", 6650), i3); p2Initializers.put(new InetSocketAddress("127.0.0.4", 6651), i4); - when(handler1.newChannelInitializers()).thenReturn(p1Initializers); - when(handler2.newChannelInitializers()).thenReturn(p2Initializers); + when(extension1.newChannelInitializers()).thenReturn(p1Initializers); + when(extension2.newChannelInitializers()).thenReturn(p2Initializers); Map>> initializers = - handlers.newChannelInitializers(); + extensions.newChannelInitializers(); assertEquals(2, initializers.size()); assertSame(p1Initializers, initializers.get(protocol1)); @@ -147,10 +146,10 @@ public void testNewChannelInitializersOverlapped() { p2Initializers.put(new InetSocketAddress("127.0.0.1", 6650), i3); p2Initializers.put(new InetSocketAddress("127.0.0.4", 6651), i4); - when(handler1.newChannelInitializers()).thenReturn(p1Initializers); - when(handler2.newChannelInitializers()).thenReturn(p2Initializers); + when(extension1.newChannelInitializers()).thenReturn(p1Initializers); + when(extension2.newChannelInitializers()).thenReturn(p2Initializers); - handlers.newChannelInitializers(); + extensions.newChannelInitializers(); } }