Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Fix AdminManager memory leak #438

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ class AdminManager {
this.admin = admin;
}

public void shutdown() {
topicPurgatory.shutdown();
}

CompletableFuture<Map<String, ApiError>> createTopicsAsync(Map<String, TopicDetails> createInfo, int timeoutMs) {
final Map<String, CompletableFuture<ApiError>> futureMap = new ConcurrentHashMap<>();
final AtomicInteger numTopics = new AtomicInteger(createInfo.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class KafkaChannelInitializer extends ChannelInitializer<SocketChannel> {
private final GroupCoordinator groupCoordinator;
@Getter
private final TransactionCoordinator transactionCoordinator;
private final AdminManager adminManager;
@Getter
private final boolean enableTls;
@Getter
Expand All @@ -56,6 +57,7 @@ public KafkaChannelInitializer(PulsarService pulsarService,
KafkaServiceConfiguration kafkaConfig,
GroupCoordinator groupCoordinator,
TransactionCoordinator transactionCoordinator,
AdminManager adminManager,
boolean enableTLS,
EndPoint advertisedEndPoint,
StatsLogger statsLogger) {
Expand All @@ -64,6 +66,7 @@ public KafkaChannelInitializer(PulsarService pulsarService,
this.kafkaConfig = kafkaConfig;
this.groupCoordinator = groupCoordinator;
this.transactionCoordinator = transactionCoordinator;
this.adminManager = adminManager;
this.enableTls = enableTLS;
this.advertisedEndPoint = advertisedEndPoint;
this.statsLogger = statsLogger;
Expand All @@ -85,7 +88,8 @@ protected void initChannel(SocketChannel ch) throws Exception {
new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
ch.pipeline().addLast("handler",
new KafkaRequestHandler(pulsarService, kafkaConfig,
groupCoordinator, transactionCoordinator, enableTls, advertisedEndPoint, statsLogger));
groupCoordinator, transactionCoordinator, adminManager,
enableTls, advertisedEndPoint, statsLogger));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class KafkaProtocolHandler implements ProtocolHandler {
private StatsLogger rootStatsLogger;
private PrometheusMetricsProvider statsProvider;
private KopBrokerLookupManager kopBrokerLookupManager;
private AdminManager adminManager = null;

/**
* Listener for the changing of topic that stores offsets of consumer group.
Expand Down Expand Up @@ -258,6 +259,13 @@ public void start(BrokerService service) {
KopVersion.getBuildHost(),
KopVersion.getBuildTime());

try {
adminManager = new AdminManager(brokerService.getPulsar().getAdminClient());
} catch (PulsarServerException e) {
log.error("Failed to create PulsarAdmin: {}", e.getMessage());
throw new IllegalStateException(e);
}

// init and start group coordinator
if (kafkaConfig.isEnableGroupCoordinator()) {
try {
Expand Down Expand Up @@ -313,13 +321,13 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti
case PLAINTEXT:
case SASL_PLAINTEXT:
builder.put(endPoint.getInetAddress(), new KafkaChannelInitializer(brokerService.getPulsar(),
kafkaConfig, groupCoordinator, transactionCoordinator, false,
kafkaConfig, groupCoordinator, transactionCoordinator, adminManager, false,
advertisedEndPoint, rootStatsLogger.scope(SERVER_SCOPE)));
break;
case SSL:
case SASL_SSL:
builder.put(endPoint.getInetAddress(), new KafkaChannelInitializer(brokerService.getPulsar(),
kafkaConfig, groupCoordinator, transactionCoordinator, true,
kafkaConfig, groupCoordinator, transactionCoordinator, adminManager, true,
advertisedEndPoint, rootStatsLogger.scope(SERVER_SCOPE)));
break;
}
Expand All @@ -334,6 +342,7 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti

@Override
public void close() {
adminManager.shutdown();
if (groupCoordinator != null) {
groupCoordinator.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ public KafkaRequestHandler(PulsarService pulsarService,
KafkaServiceConfiguration kafkaConfig,
GroupCoordinator groupCoordinator,
TransactionCoordinator transactionCoordinator,
AdminManager adminManager,
Boolean tlsEnabled,
EndPoint advertisedEndPoint,
StatsLogger statsLogger) throws Exception {
Expand All @@ -218,7 +219,7 @@ public KafkaRequestHandler(PulsarService pulsarService,
this.authenticator = authenticationEnabled
? new SaslAuthenticator(pulsarService, kafkaConfig.getSaslAllowedMechanisms(), kafkaConfig)
: null;
this.adminManager = new AdminManager(admin);
this.adminManager = adminManager;
this.tlsEnabled = tlsEnabled;
this.advertisedEndPoint = advertisedEndPoint;
this.advertisedListeners = kafkaConfig.getKafkaAdvertisedListeners();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class EntryPublishTimeTest extends KopProtocolHandlerTestBase {

KafkaRequestHandler kafkaRequestHandler;
SocketAddress serviceAddress;
private AdminManager adminManager;

public EntryPublishTimeTest(String format) {
super(format);
Expand Down Expand Up @@ -85,11 +86,13 @@ protected void setup() throws Exception {
GroupCoordinator groupCoordinator = ((KafkaProtocolHandler) handler).getGroupCoordinator();
TransactionCoordinator transactionCoordinator = ((KafkaProtocolHandler) handler).getTransactionCoordinator();

adminManager = new AdminManager(pulsar.getAdminClient());
kafkaRequestHandler = new KafkaRequestHandler(
pulsar,
(KafkaServiceConfiguration) conf,
groupCoordinator,
transactionCoordinator,
adminManager,
false,
getPlainEndPoint(),
NullStatsLogger.INSTANCE);
Expand All @@ -104,6 +107,7 @@ protected void setup() throws Exception {
@AfterMethod
@Override
protected void cleanup() throws Exception {
adminManager.shutdown();
super.internalCleanup();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public class KafkaApisTest extends KopProtocolHandlerTestBase {

KafkaRequestHandler kafkaRequestHandler;
SocketAddress serviceAddress;
private AdminManager adminManager;

@Override
protected void resetConfig() {
Expand Down Expand Up @@ -136,11 +137,13 @@ protected void setup() throws Exception {
GroupCoordinator groupCoordinator = ((KafkaProtocolHandler) handler).getGroupCoordinator();
TransactionCoordinator transactionCoordinator = ((KafkaProtocolHandler) handler).getTransactionCoordinator();

adminManager = new AdminManager(pulsar.getAdminClient());
kafkaRequestHandler = new KafkaRequestHandler(
pulsar,
(KafkaServiceConfiguration) conf,
groupCoordinator,
transactionCoordinator,
adminManager,
false,
getPlainEndPoint(),
NullStatsLogger.INSTANCE);
Expand All @@ -155,6 +158,7 @@ protected void setup() throws Exception {
@AfterMethod
@Override
protected void cleanup() throws Exception {
adminManager.shutdown();
super.internalCleanup();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
public class KafkaRequestHandlerTest extends KopProtocolHandlerTestBase {

private KafkaRequestHandler handler;
private AdminManager adminManager;

@BeforeMethod
@Override
Expand Down Expand Up @@ -141,11 +142,13 @@ protected void setup() throws Exception {
GroupCoordinator groupCoordinator = ((KafkaProtocolHandler) handler1).getGroupCoordinator();
TransactionCoordinator transactionCoordinator = ((KafkaProtocolHandler) handler1).getTransactionCoordinator();

adminManager = new AdminManager(pulsar.getAdminClient());
handler = new KafkaRequestHandler(
pulsar,
(KafkaServiceConfiguration) conf,
groupCoordinator,
transactionCoordinator,
adminManager,
false,
getPlainEndPoint(),
NullStatsLogger.INSTANCE);
Expand All @@ -154,6 +157,7 @@ protected void setup() throws Exception {
@AfterMethod
@Override
protected void cleanup() throws Exception {
adminManager.shutdown();
super.internalCleanup();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class KafkaTopicConsumerManagerTest extends KopProtocolHandlerTestBase {
private KafkaTopicManager kafkaTopicManager;
private KafkaRequestHandler kafkaRequestHandler;
private SocketAddress serviceAddress;
private AdminManager adminManager;

@BeforeMethod
@Override
Expand All @@ -64,11 +65,13 @@ protected void setup() throws Exception {
GroupCoordinator groupCoordinator = ((KafkaProtocolHandler) handler).getGroupCoordinator();
TransactionCoordinator transactionCoordinator = ((KafkaProtocolHandler) handler).getTransactionCoordinator();

adminManager = new AdminManager(pulsar.getAdminClient());
kafkaRequestHandler = new KafkaRequestHandler(
pulsar,
(KafkaServiceConfiguration) conf,
groupCoordinator,
transactionCoordinator,
adminManager,
false,
getPlainEndPoint(),
NullStatsLogger.INSTANCE);
Expand All @@ -86,6 +89,7 @@ protected void setup() throws Exception {
@AfterMethod
@Override
protected void cleanup() throws Exception {
adminManager.shutdown();
super.internalCleanup();
}

Expand Down