Skip to content

Commit

Permalink
PIP-93 Pulsar Proxy Protocol Handlers
Browse files Browse the repository at this point in the history
(cherry picked from commit 42cf81b)
  • Loading branch information
eolivelli authored and nicoloboschi committed Feb 28, 2022
1 parent 896303c commit 997d0c8
Show file tree
Hide file tree
Showing 14 changed files with 1,211 additions and 2 deletions.
10 changes: 10 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,17 @@ webSocketServiceEnabled=false
# Name of the cluster to which this broker belongs to
clusterName=


### --- Protocol Handlers

# List of messaging protocols to load, which is a list of protocol names
#proxyMessagingProtocols=

# The directory to locate messaging protocol handlers
#proxyProtocolHandlerDirectory=

### --- Deprecated config variables --- ###

# Deprecated. Use configurationStoreServers
globalZookeeperServers=

Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.protocol;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyService;

import java.net.InetSocketAddress;
import java.util.Map;

/**
* The protocol handler interface for support additional protocols on Pulsar brokers.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Evolving
public interface ProtocolHandler extends AutoCloseable {

/**
* Returns the unique protocol name. For example, `kafka-v2` for protocol handler for Kafka v2 protocol.
*/
String protocolName();

/**
* Verify if the protocol can speak the given <tt>protocol</tt>.
*
* @param protocol the protocol to verify
* @return true if the protocol handler can handle the given protocol, otherwise false.
*/
boolean accept(String protocol);

/**
* Initialize the protocol handler when the protocol is constructed from reflection.
*
* <p>The initialize should initialize all the resources required for serving the protocol
* handler but don't start those resources until {@link #start(ProxyService)} is called.
*
* @param conf broker service configuration
* @throws Exception when fail to initialize the protocol handler.
*/
void initialize(ProxyConfiguration conf) throws Exception;

/**
* Start the protocol handler with the provided broker service.
*
* <p>The broker service provides the accesses to the Pulsar components such as load
* manager, namespace service, managed ledger and etc.
*
* @param service the broker service to start with.
*/
void start(ProxyService service);

/**
* Create the list of channel initializers for the ports that this protocol handler
* will listen on.
*
* <p>NOTE: this method is called after {@link #start(ProxyService)}.
*
* @return the list of channel initializers for the ports that this protocol handler listens on.
*/
Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers();

@Override
void close();
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.protocol;

import lombok.Data;
import lombok.NoArgsConstructor;

/**
* Metadata information about a Pulsar protocol handler.
*/
@Data
@NoArgsConstructor
public class ProtocolHandlerDefinition {

/**
* The name of the protocol.
*/
private String name;

/**
* The description of the protocol handler to be used for user help.
*/
private String description;

/**
* The class name for the protocol handler.
*/
private String handlerClass;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.protocol;

import lombok.Data;
import lombok.experimental.Accessors;

import java.util.Map;
import java.util.TreeMap;

/**
* The collection of protocol handlers.
*/
@Data
@Accessors(fluent = true)
class ProtocolHandlerDefinitions {

private final Map<String, ProtocolHandlerMetadata> handlers = new TreeMap<>();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.protocol;

import lombok.Data;
import lombok.NoArgsConstructor;

import java.nio.file.Path;

/**
* The metadata of protocol handler.
*/
@Data
@NoArgsConstructor
class ProtocolHandlerMetadata {

/**
* The definition of the protocol handler.
*/
private ProtocolHandlerDefinition definition;

/**
* The path to the handler package.
*/
private Path archivePath;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.protocol;

import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;

import java.io.File;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;

import static com.google.common.base.Preconditions.checkArgument;

/**
* Util class to search and load {@link ProtocolHandler}s.
*/
@UtilityClass
@Slf4j
class ProtocolHandlerUtils {

static final String PULSAR_PROTOCOL_HANDLER_DEFINITION_FILE = "pulsar-proxy-protocol-handler.yml";

/**
* Retrieve the protocol handler definition from the provided handler nar package.
*
* @param narPath the path to the protocol handler NAR package
* @return the protocol handler definition
* @throws IOException when fail to load the protocol handler or get the definition
*/
public static ProtocolHandlerDefinition getProtocolHandlerDefinition(String narPath, String narExtractionDirectory)
throws IOException {
try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(),
narExtractionDirectory)) {
return getProtocolHandlerDefinition(ncl);
}
}

private static ProtocolHandlerDefinition getProtocolHandlerDefinition(NarClassLoader ncl) throws IOException {
String configStr = ncl.getServiceDefinition(PULSAR_PROTOCOL_HANDLER_DEFINITION_FILE);

return ObjectMapperFactory.getThreadLocalYaml().readValue(
configStr, ProtocolHandlerDefinition.class
);
}

/**
* Search and load the available protocol handlers.
*
* @param handlersDirectory the directory where all the protocol handlers are stored
* @return a collection of protocol handlers
* @throws IOException when fail to load the available protocol handlers from the provided directory.
*/
public static ProtocolHandlerDefinitions searchForHandlers(String handlersDirectory,
String narExtractionDirectory) throws IOException {
Path path = Paths.get(handlersDirectory).toAbsolutePath();
log.info("Searching for protocol handlers in {}", path);

ProtocolHandlerDefinitions handlers = new ProtocolHandlerDefinitions();
if (!path.toFile().exists()) {
log.warn("Protocol handler directory not found");
return handlers;
}

try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
try {
ProtocolHandlerDefinition phDef =
ProtocolHandlerUtils.getProtocolHandlerDefinition(archive.toString(), narExtractionDirectory);
log.info("Found protocol handler from {} : {}", archive, phDef);

checkArgument(StringUtils.isNotBlank(phDef.getName()));
checkArgument(StringUtils.isNotBlank(phDef.getHandlerClass()));

ProtocolHandlerMetadata metadata = new ProtocolHandlerMetadata();
metadata.setDefinition(phDef);
metadata.setArchivePath(archive);

handlers.handlers().put(phDef.getName(), metadata);
} catch (Throwable t) {
log.warn("Failed to load connector from {}."
+ " It is OK however if you want to use this protocol handler,"
+ " please make sure you put the correct protocol handler NAR"
+ " package in the handlers directory.", archive, t);
}
}
}

return handlers;
}

/**
* Load the protocol handler according to the handler definition.
*
* @param metadata the protocol handler definition.
* @return
*/
static ProtocolHandlerWithClassLoader load(ProtocolHandlerMetadata metadata,
String narExtractionDirectory) throws IOException {
NarClassLoader ncl = NarClassLoader.getFromArchive(
metadata.getArchivePath().toAbsolutePath().toFile(),
Collections.emptySet(),
ProtocolHandler.class.getClassLoader(), narExtractionDirectory);

ProtocolHandlerDefinition phDef = getProtocolHandlerDefinition(ncl);
if (StringUtils.isBlank(phDef.getHandlerClass())) {
throw new IOException("Protocol handler `" + phDef.getName() + "` does NOT provide a protocol"
+ " handler implementation");
}

try {
Class handlerClass = ncl.loadClass(phDef.getHandlerClass());
Object handler = handlerClass.newInstance();
if (!(handler instanceof ProtocolHandler)) {
throw new IOException("Class " + phDef.getHandlerClass()
+ " does not implement protocol handler interface");
}
ProtocolHandler ph = (ProtocolHandler) handler;
return new ProtocolHandlerWithClassLoader(ph, ncl);
} catch (Throwable t) {
rethrowIOException(t);
return null;
}
}

private static void rethrowIOException(Throwable cause)
throws IOException {
if (cause instanceof IOException) {
throw (IOException) cause;
} else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else if (cause instanceof Error) {
throw (Error) cause;
} else {
throw new IOException(cause.getMessage(), cause);
}
}

}
Loading

0 comments on commit 997d0c8

Please sign in to comment.