Skip to content

Commit

Permalink
KAFKA-12697: Add FencedBrokerCount and ActiveBrokerCount metrics to t…
Browse files Browse the repository at this point in the history
…he QuorumController (apache#10772)

Reviewers: Colin P. McCabe <[email protected]>
  • Loading branch information
dielhennr authored and Ralph Debusmann committed Dec 22, 2021
1 parent 1c39e56 commit e0f5f24
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ boolean check() {
*/
private final TimelineHashMap<Integer, BrokerRegistration> brokerRegistrations;

/**
* A reference to the controller's metrics registry.
*/
private final ControllerMetrics controllerMetrics;

/**
* The broker heartbeat manager, or null if this controller is on standby.
*/
Expand All @@ -131,7 +136,8 @@ boolean check() {
Time time,
SnapshotRegistry snapshotRegistry,
long sessionTimeoutNs,
ReplicaPlacer replicaPlacer) {
ReplicaPlacer replicaPlacer,
ControllerMetrics metrics) {
this.logContext = logContext;
this.log = logContext.logger(ClusterControlManager.class);
this.time = time;
Expand All @@ -140,6 +146,7 @@ boolean check() {
this.brokerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0);
this.heartbeatManager = null;
this.readyBrokersFuture = Optional.empty();
this.controllerMetrics = metrics;
}

/**
Expand Down Expand Up @@ -249,11 +256,13 @@ public void replay(RegisterBrokerRecord record) {
features.put(feature.name(), new VersionRange(
feature.minSupportedVersion(), feature.maxSupportedVersion()));
}

// Update broker registrations.
BrokerRegistration prevRegistration = brokerRegistrations.put(brokerId,
new BrokerRegistration(brokerId, record.brokerEpoch(),
record.incarnationId(), listeners, features,
Optional.ofNullable(record.rack()), record.fenced()));
updateMetrics(prevRegistration, brokerRegistrations.get(brokerId));
if (prevRegistration == null) {
log.info("Registered new broker: {}", record);
} else if (prevRegistration.incarnationId().equals(record.incarnationId())) {
Expand All @@ -274,6 +283,7 @@ public void replay(UnregisterBrokerRecord record) {
"registration with that epoch found", record.toString()));
} else {
brokerRegistrations.remove(brokerId);
updateMetrics(registration, brokerRegistrations.get(brokerId));
log.info("Unregistered broker: {}", record);
}
}
Expand All @@ -289,6 +299,7 @@ public void replay(FenceBrokerRecord record) {
"registration with that epoch found", record.toString()));
} else {
brokerRegistrations.put(brokerId, registration.cloneWithFencing(true));
updateMetrics(registration, brokerRegistrations.get(brokerId));
log.info("Fenced broker: {}", record);
}
}
Expand All @@ -304,6 +315,7 @@ public void replay(UnfenceBrokerRecord record) {
"registration with that epoch found", record.toString()));
} else {
brokerRegistrations.put(brokerId, registration.cloneWithFencing(false));
updateMetrics(registration, brokerRegistrations.get(brokerId));
log.info("Unfenced broker: {}", record);
}
if (readyBrokersFuture.isPresent()) {
Expand All @@ -314,6 +326,31 @@ public void replay(UnfenceBrokerRecord record) {
}
}

private void updateMetrics(BrokerRegistration prevRegistration, BrokerRegistration registration) {
if (registration == null) {
if (prevRegistration.fenced()) {
controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() - 1);
} else {
controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() - 1);
}
} else if (prevRegistration == null) {
if (registration.fenced()) {
controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() + 1);
} else {
controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() + 1);
}
} else {
if (prevRegistration.fenced() && !registration.fenced()) {
controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() - 1);
controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() + 1);
} else if (!prevRegistration.fenced() && registration.fenced()) {
controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() + 1);
controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() - 1);
}
}
}


public List<List<Integer>> placeReplicas(int startPartition,
int numPartitions,
short numReplicas) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ public interface ControllerMetrics extends AutoCloseable {

void updateEventQueueProcessingTime(long durationMs);

void setFencedBrokerCount(int brokerCount);

int fencedBrokerCount();

void setActiveBrokerCount(int brokerCount);

int activeBrokerCount();

void setGlobalTopicsCount(int topicCount);

int globalTopicsCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,7 @@ private QuorumController(LogContext logContext,
snapshotRegistry, configDefs, alterConfigPolicy);
this.clientQuotaControlManager = new ClientQuotaControlManager(snapshotRegistry);
this.clusterControl = new ClusterControlManager(logContext, time,
snapshotRegistry, sessionTimeoutNs, replicaPlacer);
snapshotRegistry, sessionTimeoutNs, replicaPlacer, controllerMetrics);
this.featureControl = new FeatureControlManager(supportedFeatures, snapshotRegistry);
this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
this.snapshotMaxNewRecordBytes = snapshotMaxNewRecordBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
"ControllerEventManager", "EventQueueTimeMs");
private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName(
"ControllerEventManager", "EventQueueProcessingTimeMs");
private final static MetricName FENCED_BROKER_COUNT = getMetricName(
"KafkaController", "FencedBrokerCount");
private final static MetricName ACTIVE_BROKER_COUNT = getMetricName(
"KafkaController", "ActiveBrokerCount");
private final static MetricName GLOBAL_TOPIC_COUNT = getMetricName(
"KafkaController", "GlobalTopicCount");
private final static MetricName GLOBAL_PARTITION_COUNT = getMetricName(
Expand All @@ -40,14 +44,18 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
"KafkaController", "OfflinePartitionsCount");
private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT = getMetricName(
"KafkaController", "PreferredReplicaImbalanceCount");

private final MetricsRegistry registry;
private volatile boolean active;
private volatile int fencedBrokerCount;
private volatile int activeBrokerCount;
private volatile int globalTopicCount;
private volatile int globalPartitionCount;
private volatile int offlinePartitionCount;
private volatile int preferredReplicaImbalanceCount;
private final Gauge<Integer> activeControllerCount;
private final Gauge<Integer> fencedBrokerCountGauge;
private final Gauge<Integer> activeBrokerCountGauge;
private final Gauge<Integer> globalPartitionCountGauge;
private final Gauge<Integer> globalTopicCountGauge;
private final Gauge<Integer> offlinePartitionCountGauge;
Expand All @@ -58,6 +66,8 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
public QuorumControllerMetrics(MetricsRegistry registry) {
this.registry = Objects.requireNonNull(registry);
this.active = false;
this.fencedBrokerCount = 0;
this.activeBrokerCount = 0;
this.globalTopicCount = 0;
this.globalPartitionCount = 0;
this.offlinePartitionCount = 0;
Expand All @@ -70,6 +80,18 @@ public Integer value() {
});
this.eventQueueTime = registry.newHistogram(EVENT_QUEUE_TIME_MS, true);
this.eventQueueProcessingTime = registry.newHistogram(EVENT_QUEUE_PROCESSING_TIME_MS, true);
this.fencedBrokerCountGauge = registry.newGauge(FENCED_BROKER_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return fencedBrokerCount;
}
});
this.activeBrokerCountGauge = registry.newGauge(ACTIVE_BROKER_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return activeBrokerCount;
}
});
this.globalTopicCountGauge = registry.newGauge(GLOBAL_TOPIC_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
Expand Down Expand Up @@ -116,6 +138,25 @@ public void updateEventQueueProcessingTime(long durationMs) {
eventQueueTime.update(durationMs);
}

@Override
public void setFencedBrokerCount(int brokerCount) {
this.fencedBrokerCount = brokerCount;
}

@Override
public int fencedBrokerCount() {
return this.fencedBrokerCount;
}

public void setActiveBrokerCount(int brokerCount) {
this.activeBrokerCount = brokerCount;
}

@Override
public int activeBrokerCount() {
return this.activeBrokerCount;
}

@Override
public void setGlobalTopicsCount(int topicCount) {
this.globalTopicCount = topicCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void testReplay() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ClusterControlManager clusterControl = new ClusterControlManager(
new LogContext(), time, snapshotRegistry, 1000,
new StripedReplicaPlacer(new Random()));
new StripedReplicaPlacer(new Random()), new MockControllerMetrics());
clusterControl.activate();
assertFalse(clusterControl.unfenced(0));

Expand Down Expand Up @@ -99,7 +99,7 @@ public void testUnregister() throws Exception {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ClusterControlManager clusterControl = new ClusterControlManager(
new LogContext(), new MockTime(0, 0, 0), snapshotRegistry, 1000,
new StripedReplicaPlacer(new Random()));
new StripedReplicaPlacer(new Random()), new MockControllerMetrics());
clusterControl.activate();
clusterControl.replay(brokerRecord);
assertEquals(new BrokerRegistration(1, 100,
Expand All @@ -122,7 +122,7 @@ public void testPlaceReplicas(int numUsableBrokers) throws Exception {
MockRandom random = new MockRandom();
ClusterControlManager clusterControl = new ClusterControlManager(
new LogContext(), time, snapshotRegistry, 1000,
new StripedReplicaPlacer(random));
new StripedReplicaPlacer(random), new MockControllerMetrics());
clusterControl.activate();
for (int i = 0; i < numUsableBrokers; i++) {
RegisterBrokerRecord brokerRecord =
Expand Down Expand Up @@ -159,7 +159,7 @@ public void testIterator() throws Exception {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ClusterControlManager clusterControl = new ClusterControlManager(
new LogContext(), time, snapshotRegistry, 1000,
new StripedReplicaPlacer(new Random()));
new StripedReplicaPlacer(new Random()), new MockControllerMetrics());
clusterControl.activate();
assertFalse(clusterControl.unfenced(0));
for (int i = 0; i < 3; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

public final class MockControllerMetrics implements ControllerMetrics {
private volatile boolean active;
private volatile int fencedBrokers;
private volatile int activeBrokers;
private volatile int topics;
private volatile int partitions;
private volatile int offlinePartitions;
Expand All @@ -27,6 +29,8 @@ public final class MockControllerMetrics implements ControllerMetrics {

public MockControllerMetrics() {
this.active = false;
this.fencedBrokers = 0;
this.activeBrokers = 0;
this.topics = 0;
this.partitions = 0;
this.offlinePartitions = 0;
Expand All @@ -53,6 +57,26 @@ public void updateEventQueueProcessingTime(long durationMs) {
// nothing to do
}

@Override
public void setFencedBrokerCount(int brokerCount) {
this.fencedBrokers = brokerCount;
}

@Override
public int fencedBrokerCount() {
return this.fencedBrokers;
}

@Override
public void setActiveBrokerCount(int brokerCount) {
this.activeBrokers = brokerCount;
}

@Override
public int activeBrokerCount() {
return activeBrokers;
}

@Override
public void setGlobalTopicsCount(int topicCount) {
this.topics = topicCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void setUp() {
snapshotRegistry = new SnapshotRegistry(logContext);
clusterControl = new ClusterControlManager(
logContext, time, snapshotRegistry, 1000,
new StripedReplicaPlacer(random));
new StripedReplicaPlacer(random), new MockControllerMetrics());

clusterControl.activate();
for (int i = 0; i < 4; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ private static class ReplicationControlTestContext {
final LogContext logContext = new LogContext();
final MockTime time = new MockTime();
final MockRandom random = new MockRandom();
final ControllerMetrics metrics = new MockControllerMetrics();
final ClusterControlManager clusterControl = new ClusterControlManager(
logContext, time, snapshotRegistry, TimeUnit.MILLISECONDS.convert(BROKER_SESSION_TIMEOUT_MS, TimeUnit.NANOSECONDS),
new StripedReplicaPlacer(random));
final ControllerMetrics metrics = new MockControllerMetrics();
new StripedReplicaPlacer(random), metrics);
final ConfigurationControlManager configurationControl = new ConfigurationControlManager(
new LogContext(), snapshotRegistry, Collections.emptyMap(), Optional.empty());
final ReplicationControlManager replicationControl;
Expand Down Expand Up @@ -429,6 +429,41 @@ public void testCreateTopics() throws Exception {
ctx.replicationControl.iterator(Long.MAX_VALUE));
}

@Test
public void testBrokerCountMetrics() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlManager replicationControl = ctx.replicationControl;

ctx.registerBrokers(0);

assertEquals(1, ctx.metrics.fencedBrokerCount());
assertEquals(0, ctx.metrics.activeBrokerCount());

ctx.unfenceBrokers(0);

assertEquals(0, ctx.metrics.fencedBrokerCount());
assertEquals(1, ctx.metrics.activeBrokerCount());

ctx.registerBrokers(1);
ctx.unfenceBrokers(1);

assertEquals(2, ctx.metrics.activeBrokerCount());

ctx.registerBrokers(2);
ctx.unfenceBrokers(2);

assertEquals(0, ctx.metrics.fencedBrokerCount());
assertEquals(3, ctx.metrics.activeBrokerCount());

ControllerResult<Void> result = replicationControl.unregisterBroker(0);
ctx.replay(result.records());
result = replicationControl.unregisterBroker(2);
ctx.replay(result.records());

assertEquals(0, ctx.metrics.fencedBrokerCount());
assertEquals(1, ctx.metrics.activeBrokerCount());
}

@Test
public void testCreateTopicsWithValidateOnlyFlag() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
Expand Down

0 comments on commit e0f5f24

Please sign in to comment.