Skip to content

Commit

Permalink
add logs in coordinator::publish flow
Browse files Browse the repository at this point in the history
  • Loading branch information
sumitasr committed Jul 24, 2024
1 parent c6d42bc commit f6f3d69
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1292,6 +1292,7 @@ public void publish(
) {
try {
synchronized (mutex) {
long overallStartTimeNS = System.nanoTime();
if (mode != Mode.LEADER || getCurrentTerm() != clusterChangedEvent.state().term()) {
logger.debug(
() -> new ParameterizedMessage(
Expand Down Expand Up @@ -1349,11 +1350,41 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
currentPublication = Optional.of(publication);

final DiscoveryNodes publishNodes = publishRequest.getAcceptedState().nodes();
long startTimeNS = System.nanoTime();
leaderChecker.setCurrentNodes(publishNodes);
logger.info(
"[Custom Log] Coordinator, leaderChecker.setCurrentNodes latency: {} ms",
TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)
);
startTimeNS = System.nanoTime();
followersChecker.setCurrentNodes(publishNodes);
logger.info(
"[Custom Log] Coordinator, followersChecker.setCurrentNodes latency: {} ms",
TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)
);
startTimeNS = System.nanoTime();
lagDetector.setTrackedNodes(publishNodes);
logger.info(
"[Custom Log] Coordinator, lagDetector.setTrackedNodes latency: {} ms",
TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)
);
startTimeNS = System.nanoTime();
coordinationState.get().handlePrePublish(clusterState);
logger.info(
"[Custom Log] Coordinator, coordinationState.get().handlePrePublish latency: {} ms",
TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)
);
startTimeNS = System.nanoTime();
publication.start(followersChecker.getFaultyNodes());
logger.info(
"[Custom Log] Coordinator, publication.start(followersChecker.getFaultyNodes) latency: {} ms",
TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)
);

logger.info(
"[Custom Log] Coordinator::publish latency: {} ms",
TimeValue.nsecToMSec(System.nanoTime() - overallStartTimeNS)
);
}
} catch (Exception e) {
logger.debug(() -> new ParameterizedMessage("[{}] publishing failed", clusterChangedEvent.source()), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,11 +563,29 @@ private void reroute(RoutingAllocation allocation) {
: "auto-expand replicas out of sync with number of nodes in the cluster";
assert assertInitialized();
long rerouteStartTimeNS = System.nanoTime();
long startTimeNS = System.nanoTime();
removeDelayMarkers(allocation);

logger.info(
"[Custom Log] AllocationService, removeDelayMarkers latency: {} ms",
TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)
);
startTimeNS = System.nanoTime();
allocateExistingUnassignedShards(allocation); // try to allocate existing shard copies first
logger.info(
"[Custom Log] AllocationService, allocateExistingUnassignedShards latency: {} ms",
TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)
);
startTimeNS = System.nanoTime();
shardsAllocator.allocate(allocation);
logger.info("[Custom Log] AllocationService, reroute latency: {} ms", TimeValue.nsecToMSec(System.nanoTime() - rerouteStartTimeNS));
logger.info(
"[Custom Log] AllocationService, shardsAllocator.allocate latency: {} ms",
TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)
);

logger.info(
"[Custom Log] AllocationService, Overall reroute latency: {} ms",
TimeValue.nsecToMSec(System.nanoTime() - rerouteStartTimeNS)
);
clusterManagerMetrics.recordLatency(
clusterManagerMetrics.rerouteHistogram,
(double) Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - rerouteStartTimeNS))
Expand All @@ -576,7 +594,6 @@ private void reroute(RoutingAllocation allocation) {
}

private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
long latencyStartTimeInNs = System.nanoTime();
allocation.routingNodes().unassigned().sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority ordering

for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {
Expand All @@ -592,10 +609,6 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
Currently AllocationService will not run any custom Allocator that implements allocateAllUnassignedShards
*/
allocateAllUnassignedShards(allocation);
logger.info(
"[Custom Log] AllocationService, allocateExistingUnassignedShards latency: {} ms",
TimeValue.nsecToMSec(System.nanoTime() - latencyStartTimeInNs)
);
return;
}
logger.warn("Falling back to single shard assignment since batch mode disable or multiple custom allocators set");
Expand All @@ -619,10 +632,6 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
getAllocatorForShard(shardRouting, allocation).allocateUnassigned(shardRouting, allocation, replicaIterator);
}
}
logger.info(
"[Custom Log] AllocationService, allocateExistingUnassignedShards latency: {} ms",
TimeValue.nsecToMSec(System.nanoTime() - latencyStartTimeInNs)
);
}

private void allocateAllUnassignedShards(RoutingAllocation allocation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,9 @@ public ClusterState apply(ClusterState clusterState) {

@Override
public void run() {
long startTimeInNs = System.nanoTime();
runTask(this);
logger.info("[Custom Log] ClusterApplierService, run latency: {} ms", TimeValue.nsecToMSec(System.nanoTime() - startTimeInNs));
}
}

Expand Down

0 comments on commit f6f3d69

Please sign in to comment.