Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address some comments #6

Open
wants to merge 1 commit into
base: unclean_config
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;


class ElectionStrategizer {
private static final Logger log = LoggerFactory.getLogger(ElectionStrategizer.class);

private final int nodeId;
private Boolean nodeUncleanConfig = null;
private Boolean clusterUncleanConfig = null;
private Optional<Boolean> nodeUncleanConfig = null;
private Optional<Boolean> clusterUncleanConfig = null;
private Function<String, String> topicUncleanConfigAccessor = __ -> "false";
private Map<String, String> topicUncleanOverrides = new HashMap<>();

Expand Down Expand Up @@ -62,24 +63,24 @@ ElectionStrategizer setTopicUncleanOverride(String topicName, String value) {
}

boolean shouldBeUnclean(String topicName) {
Boolean topicConfig = (topicUncleanOverrides.containsKey(topicName)) ?
Optional<Boolean> topicConfig = (topicUncleanOverrides.containsKey(topicName)) ?
parseBoolean("topic", topicUncleanOverrides.get(topicName)) :
parseBoolean("topic", topicUncleanConfigAccessor.apply(topicName));
if (topicConfig != null) return topicConfig.booleanValue();
if (nodeUncleanConfig != null) return nodeUncleanConfig.booleanValue();
if (clusterUncleanConfig != null) return clusterUncleanConfig.booleanValue();
if (topicConfig.isPresent()) return topicConfig.get();
if (nodeUncleanConfig.isPresent()) return nodeUncleanConfig.get();
if (clusterUncleanConfig.isPresent()) return clusterUncleanConfig.get();
return false;
}

// VisibleForTesting
Boolean parseBoolean(String what, String value) {
if (value == null) return null;
if (value.equalsIgnoreCase("true")) return true;
if (value.equalsIgnoreCase("false")) return false;
if (value.trim().isEmpty()) return null;
Optional<Boolean> parseBoolean(String what, String value) {
if (value == null) return Optional.empty();
if (value.equalsIgnoreCase("true")) return Optional.of(true);
if (value.equalsIgnoreCase("false")) return Optional.of(false);
if (value.trim().isEmpty()) return Optional.empty();
log.warn("Invalid value for {} config {} on node {}: '{}'. Expected true or false.",
what, TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, nodeId, value);
return null;
return Optional.empty();
}

@Override
Expand Down
12 changes: 0 additions & 12 deletions metadata/src/main/java/org/apache/kafka/controller/Replicas.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,6 @@ public static List<Integer> toList(int[] array) {
return list;
}

/**
* Convert an array of integers to a list of ints and append a final element.
*
* @param array The input array.
* @return The output list.
*/
public static List<Integer> toList(int[] array, int last) {
List<Integer> list = toList(array);
list.add(last);
return list;
}

/**
* Convert a list of integers to an array of ints.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,9 +727,9 @@ void handleBrokerFenced(int brokerId, List<ApiMessageAndVersion> records) {
if (brokerRegistration == null) {
throw new RuntimeException("Can't find broker registration for broker " + brokerId);
}
handleNodeDeactivated(brokerId, records);
records.add(new ApiMessageAndVersion(new FenceBrokerRecord().
setId(brokerId).setEpoch(brokerRegistration.epoch()), (short) 0));
handleNodeDeactivated(brokerId, records);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.slf4j.LoggerFactory;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;


Expand Down Expand Up @@ -68,13 +67,13 @@ public void testTopicUncleanOverride() {
@Test
public void testParseBoolean() {
ElectionStrategizer strategizer = createElectionStrategizer();
assertFalse(strategizer.parseBoolean("testParseBoolean", "false"));
assertFalse(strategizer.parseBoolean("testParseBoolean", "FALSE"));
assertTrue(strategizer.parseBoolean("testParseBoolean", "true"));
assertTrue(strategizer.parseBoolean("testParseBoolean", "TRUE"));
assertNull(strategizer.parseBoolean("testParseBoolean", ""));
assertNull(strategizer.parseBoolean("testParseBoolean", " "));
assertNull(strategizer.parseBoolean("testParseBoolean", null));
assertNull(strategizer.parseBoolean("testParseBoolean", "foo"));
assertFalse(strategizer.parseBoolean("testParseBoolean", "false").get());
assertFalse(strategizer.parseBoolean("testParseBoolean", "FALSE").get());
assertTrue(strategizer.parseBoolean("testParseBoolean", "true").get());
assertTrue(strategizer.parseBoolean("testParseBoolean", "TRUE").get());
assertFalse(strategizer.parseBoolean("testParseBoolean", "").isPresent());
assertFalse(strategizer.parseBoolean("testParseBoolean", " ").isPresent());
assertFalse(strategizer.parseBoolean("testParseBoolean", null).isPresent());
assertFalse(strategizer.parseBoolean("testParseBoolean", "foo").isPresent());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ public void testToList() {
assertEquals(Arrays.asList(1, 2, 3, 4), Replicas.toList(new int[] {1, 2, 3, 4}));
assertEquals(Arrays.asList(), Replicas.toList(Replicas.NONE));
assertEquals(Arrays.asList(2), Replicas.toList(new int[] {2}));
assertEquals(Arrays.asList(2, 3), Replicas.toList(new int[] {2}, 3));
assertEquals(Arrays.asList(3), Replicas.toList(new int[] {}, 3));
}

@Test
Expand Down