-
Notifications
You must be signed in to change notification settings - Fork 92
Add simple logic locality-awareness reassignments #116
Add simple logic locality-awareness reassignments #116
Conversation
drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java
Outdated
Show resolved
Hide resolved
drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java
Outdated
Show resolved
Hide resolved
drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java
Outdated
Show resolved
Hide resolved
drkafka/src/test/java/com/pinterest/doctorkafka/KafkaClusterTest.java
Outdated
Show resolved
Hide resolved
drkafka/src/test/java/com/pinterest/doctorkafka/KafkaClusterTest.java
Outdated
Show resolved
Hide resolved
@kabochya could you please squash the commits? |
} | ||
} | ||
|
||
// Second pass to fill in replacements without target brokers in locality |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this still work-in-progress?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a ready PR, it implements trivial logic without handling cases such as failover to other localities if no instances are available in the current locality, and spreading out the reassignments to different localities if there are multiple replicas of the same topic partition on the dead broker that is in the same locality. These issues will be addressed in subsequent PRs.
@@ -12,34 +12,37 @@ | |||
import org.apache.kafka.common.Node; | |||
import org.apache.kafka.common.PartitionInfo; | |||
import org.apache.kafka.common.TopicPartition; | |||
import org.junit.jupiter.api.AfterAll; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this unused import ?
import org.junit.jupiter.api.Test; | ||
|
||
import java.sql.Time; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this unused import ?
drkafka/src/main/java/com/pinterest/doctorkafka/KafkaClusterManager.java
Outdated
Show resolved
Hide resolved
ReplicaStatsManager.bytesInStats = oldBytesInStats; | ||
ReplicaStatsManager.bytesOutStats = oldBytesOutStats; | ||
@Test | ||
void testLocalityAwareReassignments() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we add another test to test the scenario that there is not enough capacity in the rack for assigning the partition?
drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java
Outdated
Show resolved
Hide resolved
} | ||
// push the brokers back to brokerQueue to keep invariant true | ||
brokerQueue.addAll(unusableBrokers); | ||
return success ? result : null; | ||
} | ||
|
||
/** | ||
* Similar to getAlternativeBrokers, but locality aware |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a general question: when broker.rack
is not set on the kafka cluster side, kafkastats will have null
for rackId
. In that case, if the user set locality_awareness.enabled
to true
in the setting, what will be the expected behavior? can we add explanation about this in the comment?
drkafka/src/main/java/com/pinterest/doctorkafka/config/DoctorKafkaClusterConfig.java
Outdated
Show resolved
Hide resolved
10a8edf
to
f7a67ce
Compare
@yuyang08 |
f7a67ce
to
2aa0ff4
Compare
drkafka/src/main/java/com/pinterest/doctorkafka/KafkaClusterManager.java
Outdated
Show resolved
Hide resolved
drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java
Outdated
Show resolved
Hide resolved
drkafka/src/main/java/com/pinterest/doctorkafka/KafkaClusterManager.java
Outdated
Show resolved
Hide resolved
2aa0ff4
to
308725c
Compare
if (replacedNodes == null) { | ||
if (isLocalityAware) { | ||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with this, will we exit the loop with success == true
?
in that case, will we get an email alert if doctorkafka failes to find a locality aware assignment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yuyang08 Yes, we will get a different email alert if the assignment partially failed and these failure will be captured in the hashmap reassignmentToLocalityFailures
. L672-680 will then check if there are partial failures during locality aware reassignments and alert by email.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a guard to set success
to false
if no locality-aware reassignments were made.
308725c
to
941b142
Compare
drkafka/src/main/java/com/pinterest/doctorkafka/KafkaClusterManager.java
Outdated
Show resolved
Hide resolved
Unit test for testing SINGLE oosReplica
941b142
to
34a4665
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for making the fix!!
Removed partial reassignments |
This is the first part for locality-aware reassignments using rackId/availability zone info. Currently it will not failover to other localities nor will it balance replicas throughout the localities. These issues will be issued afterwards.