Skip to content

Commit

Permalink
feat(proxy): format code
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoma20082008 committed Nov 4, 2024
1 parent 75e4e06 commit e70d8b2
Show file tree
Hide file tree
Showing 13 changed files with 66 additions and 63 deletions.
13 changes: 6 additions & 7 deletions proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.rocketmq.proxy;

import java.util.Date;
import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option;
Expand All @@ -35,9 +37,6 @@
import org.apache.rocketmq.proxy.spi.ProxyServerInitializer;
import org.apache.rocketmq.srvutil.ServerUtil;

import java.util.Date;
import java.util.List;

public class ProxyStartup {
private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
private static final ProxyStartAndShutdown PROXY_START_AND_SHUTDOWN = new ProxyStartAndShutdown();
Expand All @@ -56,9 +55,9 @@ public static void main(String[] args) {

ProxyServerFactory factory = ServiceProvider.<ProxyServerFactory>loadClass(ProxyServerFactory.class);
ProxyServer server = factory
.withInitializer(new ProxyServerInitializer(commandLineArgument))
.withAccessValidators(loadAccessValidators())
.get();
.withInitializer(new ProxyServerInitializer(commandLineArgument))
.withAccessValidators(loadAccessValidators())
.get();

server.getStartAndShutdowns().forEach(PROXY_START_AND_SHUTDOWN::appendStartAndShutdown);

Expand Down Expand Up @@ -95,7 +94,7 @@ protected static List<AccessValidator> loadAccessValidators() {

protected static CommandLineArgument parseCommandLineArgument(String[] args) {
CommandLine commandLine = ServerUtil.parseCmdLine("mqproxy", args,
buildCommandlineOptions(), new DefaultParser());
buildCommandlineOptions(), new DefaultParser());
if (commandLine == null) {
throw new RuntimeException("parse command line argument failed");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.proxy.grpc;

import io.grpc.Server;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.spi.ProxyServerBase;

import java.util.concurrent.TimeUnit;

public class GrpcServer extends ProxyServerBase {
private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel;
import io.grpc.protobuf.services.ChannelzService;
import io.grpc.protobuf.services.ProtoReflectionService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.common.utils.StartAndShutdown;
Expand All @@ -37,9 +39,6 @@
import org.apache.rocketmq.proxy.grpc.v2.GrpcMessagingApplication;
import org.apache.rocketmq.proxy.spi.ProxyServerFactoryBase;

import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class GrpcServerBuilder extends ProxyServerFactoryBase {
private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);

Expand All @@ -65,12 +64,12 @@ public void start() throws Exception {
//
NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(port);
serverBuilder.addService(application)
.addService(ChannelzService.newInstance(100))
.addService(ProtoReflectionService.newInstance())
.intercept(new AuthenticationInterceptor(validators))
.intercept(new GlobalExceptionInterceptor())
.intercept(new ContextInterceptor())
.intercept(new HeaderInterceptor());
.addService(ChannelzService.newInstance(100))
.addService(ProtoReflectionService.newInstance())
.intercept(new AuthenticationInterceptor(validators))
.intercept(new GlobalExceptionInterceptor())
.intercept(new ContextInterceptor())
.intercept(new HeaderInterceptor());

serverBuilder.protocolNegotiator(new ProxyAndTlsProtocolNegotiator());

Expand All @@ -93,7 +92,7 @@ public void start() throws Exception {
}

serverBuilder.maxInboundMessageSize(maxInboundMessageSize)
.maxConnectionIdle(idleTimeMills, TimeUnit.MILLISECONDS);
.maxConnectionIdle(idleTimeMills, TimeUnit.MILLISECONDS);

log.info(
"grpc server has built. port: {}, tlsKeyPath: {}, tlsCertPath: {}, threadPool: {}, queueCapacity: {}, "
Expand All @@ -108,11 +107,11 @@ private ThreadPoolExecutor createServerExecutor() {
int threadPoolNums = config.getGrpcThreadPoolNums();
int threadPoolQueueCapacity = config.getGrpcThreadPoolQueueCapacity();
return ThreadPoolMonitor.createAndMonitor(
threadPoolNums,
threadPoolNums,
1, TimeUnit.MINUTES,
"GrpcRequestExecutorThread",
threadPoolQueueCapacity
threadPoolNums,
threadPoolNums,
1, TimeUnit.MINUTES,
"GrpcRequestExecutorThread",
threadPoolQueueCapacity
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.proxy.mixed;

import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.proxy.spi.ProxyServer;
import org.apache.rocketmq.proxy.spi.ProxyServerBase;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.proxy.spi.ProxyServer;
import org.apache.rocketmq.proxy.spi.ProxyServerBase;

public class MixedProxyServer extends ProxyServerBase {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.proxy.mixed;

import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.common.utils.ServiceProvider;
import org.apache.rocketmq.proxy.spi.ProxyServer;
import org.apache.rocketmq.proxy.spi.ProxyServerBase;
import org.apache.rocketmq.proxy.spi.ProxyServerFactory;
import org.apache.rocketmq.proxy.spi.ProxyServerFactoryBase;

import java.util.ArrayList;
import java.util.List;

public class MixedProxyServerBuilder extends ProxyServerFactoryBase {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.proxy.remoting;

import org.apache.rocketmq.proxy.spi.ProxyServerBase;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
*/
package org.apache.rocketmq.proxy.spi;

import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;

import java.util.List;

public interface ProxyServer extends StartAndShutdown {

BrokerController getBrokerController();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.proxy.spi;

import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;

import java.util.List;

public abstract class ProxyServerBase implements ProxyServer {

private BrokerController brokerController;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.proxy.spi;

import org.apache.rocketmq.acl.AccessValidator;

import java.util.List;
import java.util.function.Supplier;
import org.apache.rocketmq.acl.AccessValidator;

public interface ProxyServerFactory extends Supplier<ProxyServer> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.proxy.spi;

import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.common.utils.StartAndShutdown;

import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.common.utils.StartAndShutdown;

public abstract class ProxyServerFactoryBase implements ProxyServerFactory {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.proxy.spi;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerStartup;
Expand All @@ -34,10 +36,6 @@
import org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public class ProxyServerInitializer {

private final CommandLineArgument commandLineArgument;
Expand Down Expand Up @@ -118,10 +116,10 @@ private void validateConfiguration() throws Exception {
private void initializeThreadPoolMonitor() {
ProxyConfig config = ConfigurationManager.getProxyConfig();
ThreadPoolMonitor.config(
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME),
LoggerFactory.getLogger(LoggerName.PROXY_WATER_MARK_LOGGER_NAME),
config.isEnablePrintJstack(), config.getPrintJstackInMillis(),
config.getPrintThreadPoolStatusInMillis());
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME),
LoggerFactory.getLogger(LoggerName.PROXY_WATER_MARK_LOGGER_NAME),
config.isEnablePrintJstack(), config.getPrintJstackInMillis(),
config.getPrintThreadPoolStatusInMillis());
ThreadPoolMonitor.init();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
org.apache.rocketmq.proxy.grpc.GrpcServerBuilder
org.apache.rocketmq.proxy.remoting.RemotingProtocolServerBuilder
org.apache.rocketmq.proxy.mixed.MixedProxyServerBuilder
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.UUID;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.proxy.config.Configuration;
import org.junit.After;
Expand All @@ -28,13 +38,6 @@
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.UUID;

import static org.apache.rocketmq.proxy.config.ConfigurationManager.RMQ_PROXY_HOME;
import static org.junit.Assert.assertEquals;

Expand Down

0 comments on commit e70d8b2

Please sign in to comment.