Skip to content

Commit

Permalink
Update PIP-99 rename Proxy Protocol Handler to ProxyExtension
Browse files Browse the repository at this point in the history
(cherry picked from commit 9ffe5f8)
  • Loading branch information
nicoloboschi committed Feb 28, 2022
1 parent 997d0c8 commit 95471cc
Show file tree
Hide file tree
Showing 17 changed files with 454 additions and 456 deletions.
11 changes: 5 additions & 6 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -231,16 +231,15 @@ webSocketServiceEnabled=false
clusterName=


### --- Protocol Handlers
### --- Proxy Extensions

# List of messaging protocols to load, which is a list of protocol names
#proxyMessagingProtocols=
# List of proxy extensions to load, which is a list of extension names
#proxyExtensions=

# The directory to locate messaging protocol handlers
#proxyProtocolHandlerDirectory=
# The directory to locate extensions
#proxyExtensionsDirectory=

### --- Deprecated config variables --- ###

# Deprecated. Use configurationStoreServers
globalZookeeperServers=

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.protocol;
package org.apache.pulsar.proxy.extensions;

import lombok.Data;
import lombok.experimental.Accessors;
Expand All @@ -25,12 +25,12 @@
import java.util.TreeMap;

/**
* The collection of protocol handlers.
* The collection of Proxy Extensions.
*/
@Data
@Accessors(fluent = true)
class ProtocolHandlerDefinitions {
class ExtensionsDefinitions {

private final Map<String, ProtocolHandlerMetadata> handlers = new TreeMap<>();
private final Map<String, ProxyExtensionMetadata> extensions = new TreeMap<>();

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.protocol;
package org.apache.pulsar.proxy.extensions;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
Expand All @@ -29,53 +29,52 @@
import java.util.Map;

/**
* The protocol handler interface for support additional protocols on Pulsar brokers.
* The extension interface for support additional extensions on Pulsar Proxy.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Evolving
public interface ProtocolHandler extends AutoCloseable {
public interface ProxyExtension extends AutoCloseable {

/**
* Returns the unique protocol name. For example, `kafka-v2` for protocol handler for Kafka v2 protocol.
* Returns the unique extension name. For example, `kafka-v2` for extension for Kafka v2 protocol.
*/
String protocolName();
String extensionName();

/**
* Verify if the protocol can speak the given <tt>protocol</tt>.
* Verify if the extension can handle the given <tt>extension name</tt>.
*
* @param protocol the protocol to verify
* @return true if the protocol handler can handle the given protocol, otherwise false.
* @param extension the extension to verify
* @return true if the extension can handle the given extension name, otherwise false.
*/
boolean accept(String protocol);
boolean accept(String extension);

/**
* Initialize the protocol handler when the protocol is constructed from reflection.
* Initialize the extension when the extension is constructed from reflection.
*
* <p>The initialize should initialize all the resources required for serving the protocol
* handler but don't start those resources until {@link #start(ProxyService)} is called.
* <p>The initialize should initialize all the resources required for serving the extension
* but don't start those resources until {@link #start(ProxyService)} is called.
*
* @param conf broker service configuration
* @throws Exception when fail to initialize the protocol handler.
* @param conf proxy service configuration
* @throws Exception when fail to initialize the extension.
*/
void initialize(ProxyConfiguration conf) throws Exception;

/**
* Start the protocol handler with the provided broker service.
* Start the extension with the provided proxy service.
*
* <p>The broker service provides the accesses to the Pulsar components such as load
* manager, namespace service, managed ledger and etc.
* <p>The proxy service provides the accesses to the Pulsar Proxy components.
*
* @param service the broker service to start with.
*/
void start(ProxyService service);

/**
* Create the list of channel initializers for the ports that this protocol handler
* Create the list of channel initializers for the ports that this extension
* will listen on.
*
* <p>NOTE: this method is called after {@link #start(ProxyService)}.
*
* @return the list of channel initializers for the ports that this protocol handler listens on.
* @return the list of channel initializers for the ports that this extension listens on.
*/
Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,31 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.protocol;
package org.apache.pulsar.proxy.extensions;

import lombok.Data;
import lombok.NoArgsConstructor;

/**
* Metadata information about a Pulsar protocol handler.
* Metadata information about a Proxy Extension.
*/
@Data
@NoArgsConstructor
public class ProtocolHandlerDefinition {
public class ProxyExtensionDefinition {

/**
* The name of the protocol.
* The name of the extension.
*/
private String name;

/**
* The description of the protocol handler to be used for user help.
* The description of the extension to be used for user help.
*/
private String description;

/**
* The class name for the protocol handler.
* The class name for the extension.
*/
private String handlerClass;
private String extensionClass;

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.protocol;
package org.apache.pulsar.proxy.extensions;

import lombok.Data;
import lombok.NoArgsConstructor;

import java.nio.file.Path;

/**
* The metadata of protocol handler.
* The metadata of Proxy Extension.
*/
@Data
@NoArgsConstructor
class ProtocolHandlerMetadata {
class ProxyExtensionMetadata {

/**
* The definition of the protocol handler.
* The definition of the extension.
*/
private ProtocolHandlerDefinition definition;
private ProxyExtensionDefinition definition;

/**
* The path to the handler package.
* The path to the extension package.
*/
private Path archivePath;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.protocol;
package org.apache.pulsar.proxy.extensions;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
Expand All @@ -32,61 +32,61 @@
import java.util.Map;

/**
* A protocol handler with its classloader.
* A extension with its classloader.
*/
@Slf4j
@Data
@RequiredArgsConstructor
class ProtocolHandlerWithClassLoader implements ProtocolHandler {
class ProxyExtensionWithClassLoader implements ProxyExtension {

private final ProtocolHandler handler;
private final ProxyExtension extension;
private final NarClassLoader classLoader;

@Override
public String protocolName() {
public String extensionName() {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
return handler.protocolName();
return extension.extensionName();
}
}

@Override
public boolean accept(String protocol) {
public boolean accept(String extensionName) {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
return handler.accept(protocol);
return extension.accept(extensionName);
}
}

@Override
public void initialize(ProxyConfiguration conf) throws Exception {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
handler.initialize(conf);
extension.initialize(conf);
}
}

@Override
public void start(ProxyService service) {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
handler.start(service);
extension.start(service);
}
}

@Override
public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
return handler.newChannelInitializers();
return extension.newChannelInitializers();
}
}

@Override
public void close() {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
handler.close();
extension.close();
}

try {
classLoader.close();
} catch (IOException e) {
log.warn("Failed to close the protocol handler class loader", e);
log.warn("Failed to close the extension class loader", e);
}
}

Expand Down
Loading

0 comments on commit 95471cc

Please sign in to comment.