Skip to content

Commit

Permalink
Added timing data and more granular stages to SegmentReplicationState (
Browse files Browse the repository at this point in the history
…#4222)

* Added timing data and more granular stages to SegmentReplicationState

This change introduces instrumentation logging that measures the latency of the various stages of segment replication as seen by each replica. Logs have also been added to the source node for checkpoint publishing and checkpoint metadata responses. All logging is currently at the TRACE level.

Signed-off-by: Kartik Ganesh <[email protected]>

* Fixing SegmentReplicationTarget tests

Signed-off-by: Kartik Ganesh <[email protected]>

* Incorporated PR feedback

Signed-off-by: Kartik Ganesh <[email protected]>

* Fixing SegmentReplicationTargetService tests

Signed-off-by: Kartik Ganesh <[email protected]>

Signed-off-by: Kartik Ganesh <[email protected]>
  • Loading branch information
kartg authored Aug 17, 2022
1 parent f65e02d commit a2ba3a8
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.indices.recovery.FileChunkWriter;
import org.opensearch.indices.recovery.MultiChunkTransfer;
import org.opensearch.indices.replication.common.CopyState;
import org.opensearch.indices.replication.common.ReplicationTimer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.Transports;

Expand Down Expand Up @@ -104,16 +105,24 @@ class SegmentReplicationSourceHandler {
* @param listener {@link ActionListener} that completes with the list of files sent.
*/
public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListener<GetSegmentFilesResponse> listener) {
final ReplicationTimer timer = new ReplicationTimer();
if (isReplicating.compareAndSet(false, true) == false) {
throw new OpenSearchException("Replication to {} is already running.", shard.shardId());
}
future.addListener(listener, OpenSearchExecutors.newDirectExecutorService());
final Closeable releaseResources = () -> IOUtils.close(resources);
try {

timer.start();
final Consumer<Exception> onFailure = e -> {
assert Transports.assertNotTransportThread(SegmentReplicationSourceHandler.this + "[onFailure]");
IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e));
timer.stop();
logger.trace(
"[replication id {}] Source node failed to send files to target node [{}], timing: {}",
request.getReplicationId(),
request.getTargetNode().getId(),
timer.time()
);
};

RunUnderPrimaryPermit.run(() -> {
Expand Down Expand Up @@ -151,6 +160,13 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene
future.onResponse(new GetSegmentFilesResponse(List.of(storeFileMetadata)));
} finally {
IOUtils.close(resources);
timer.stop();
logger.trace(
"[replication id {}] Source node completed sending files to target node [{}], timing: {}",
request.getReplicationId(),
request.getTargetNode().getId(),
timer.time()
);
}
}, onFailure);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.support.ChannelActionListener;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
Expand All @@ -25,6 +26,7 @@
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RetryableTransportClient;
import org.opensearch.indices.replication.common.CopyState;
import org.opensearch.indices.replication.common.ReplicationTimer;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportChannel;
Expand Down Expand Up @@ -86,6 +88,8 @@ public SegmentReplicationSourceService(
private class CheckpointInfoRequestHandler implements TransportRequestHandler<CheckpointInfoRequest> {
@Override
public void messageReceived(CheckpointInfoRequest request, TransportChannel channel, Task task) throws Exception {
final ReplicationTimer timer = new ReplicationTimer();
timer.start();
final RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter = new RemoteSegmentFileChunkWriter(
request.getReplicationId(),
recoverySettings,
Expand All @@ -109,6 +113,16 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan
copyState.getPendingDeleteFiles()
)
);
timer.stop();
logger.trace(
new ParameterizedMessage(
"[replication id {}] Source node sent checkpoint info [{}] to target node [{}], timing: {}",
request.getReplicationId(),
copyState.getCheckpoint(),
request.getTargetNode().getId(),
timer.time()
)
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@

package org.opensearch.indices.replication;

import org.opensearch.common.collect.Tuple;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.indices.replication.common.ReplicationTimer;

import java.util.ArrayList;
import java.util.List;

/**
* ReplicationState implementation to track Segment Replication events.
*
Expand All @@ -26,10 +30,12 @@ public class SegmentReplicationState implements ReplicationState {
*/
public enum Stage {
DONE((byte) 0),

INIT((byte) 1),

REPLICATING((byte) 2);
REPLICATING((byte) 2),
GET_CHECKPOINT_INFO((byte) 3),
FILE_DIFF((byte) 4),
GET_FILES((byte) 5),
FINALIZE_REPLICATION((byte) 6);

private static final Stage[] STAGES = new Stage[Stage.values().length];

Expand Down Expand Up @@ -60,23 +66,45 @@ public static Stage fromId(byte id) {

private Stage stage;
private final ReplicationLuceneIndex index;
private final ReplicationTimer timer;
private final ReplicationTimer overallTimer;
private final ReplicationTimer stageTimer;
private final List<Tuple<String, Long>> timingData;
private long replicationId;

public SegmentReplicationState(ReplicationLuceneIndex index) {
stage = Stage.INIT;
this.index = index;
timer = new ReplicationTimer();
timer.start();
// Timing data will have as many entries as stages, plus one
// additional entry for the overall timer
timingData = new ArrayList<>(Stage.values().length + 1);
overallTimer = new ReplicationTimer();
stageTimer = new ReplicationTimer();
stageTimer.start();
// set an invalid value by default
this.replicationId = -1L;
}

public SegmentReplicationState(ReplicationLuceneIndex index, long replicationId) {
this(index);
this.replicationId = replicationId;
}

@Override
public ReplicationLuceneIndex getIndex() {
return index;
}

public long getReplicationId() {
return replicationId;
}

@Override
public ReplicationTimer getTimer() {
return timer;
return overallTimer;
}

public List<Tuple<String, Long>> getTimingData() {
return timingData;
}

public Stage getStage() {
Expand All @@ -90,23 +118,42 @@ protected void validateAndSetStage(Stage expected, Stage next) {
"can't move replication to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])"
);
}
// save the timing data for the current step
stageTimer.stop();
timingData.add(new Tuple<>(stage.name(), stageTimer.time()));
// restart the step timer
stageTimer.reset();
stageTimer.start();
stage = next;
}

public void setStage(Stage stage) {
switch (stage) {
case INIT:
this.stage = Stage.INIT;
getIndex().reset();
break;
case REPLICATING:
validateAndSetStage(Stage.INIT, stage);
getIndex().start();
// only start the overall timer once we've started replication
overallTimer.start();
break;
case DONE:
case GET_CHECKPOINT_INFO:
validateAndSetStage(Stage.REPLICATING, stage);
getIndex().stop();
getTimer().stop();
break;
case FILE_DIFF:
validateAndSetStage(Stage.GET_CHECKPOINT_INFO, stage);
break;
case GET_FILES:
validateAndSetStage(Stage.FILE_DIFF, stage);
break;
case FINALIZE_REPLICATION:
validateAndSetStage(Stage.GET_FILES, stage);
break;
case DONE:
validateAndSetStage(Stage.FINALIZE_REPLICATION, stage);
// add the overall timing data
overallTimer.stop();
timingData.add(new Tuple<>("OVERALL", overallTimer.time()));
break;
default:
throw new IllegalArgumentException("unknown SegmentReplicationState.Stage [" + stage + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public SegmentReplicationTarget(
super("replication_target", indexShard, new ReplicationLuceneIndex(), listener);
this.checkpoint = checkpoint;
this.source = source;
this.state = new SegmentReplicationState(stateIndex);
this.state = new SegmentReplicationState(stateIndex, getId());
this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, getPrefix(), logger, this::ensureRefCount);
}

Expand Down Expand Up @@ -139,7 +139,9 @@ public void startReplication(ActionListener<Void> listener) {
final StepListener<GetSegmentFilesResponse> getFilesListener = new StepListener<>();
final StepListener<Void> finalizeListener = new StepListener<>();

logger.trace("[shardId {}] Replica starting replication [id {}]", shardId().getId(), getId());
// Get list of files to copy from this checkpoint.
state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO);
source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener);

checkpointInfoListener.whenComplete(checkpointInfo -> getFiles(checkpointInfo, getFilesListener), listener::onFailure);
Expand All @@ -152,14 +154,16 @@ public void startReplication(ActionListener<Void> listener) {

private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener<GetSegmentFilesResponse> getFilesListener)
throws IOException {
state.setStage(SegmentReplicationState.Stage.FILE_DIFF);
final Store.MetadataSnapshot snapshot = checkpointInfo.getSnapshot();
Store.MetadataSnapshot localMetadata = getMetadataSnapshot();
final Store.RecoveryDiff diff = snapshot.segmentReplicationDiff(localMetadata);
logger.debug("Replication diff {}", diff);
// Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming snapshot
// from
// source that means the local copy of the segment has been corrupted/changed in some way and we throw an IllegalStateException to
// fail the shard
logger.trace("Replication diff {}", diff);
/*
* Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming
* snapshot from source that means the local copy of the segment has been corrupted/changed in some way and we throw an
* IllegalStateException to fail the shard
*/
if (diff.different.isEmpty() == false) {
getFilesListener.onFailure(
new IllegalStateException(
Expand All @@ -177,15 +181,18 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener<GetSeg
.collect(Collectors.toSet());

filesToFetch.addAll(pendingDeleteFiles);
logger.trace("Files to fetch {}", filesToFetch);

for (StoreFileMetadata file : filesToFetch) {
state.getIndex().addFileDetail(file.name(), file.length(), false);
}
// always send a req even if not fetching files so the primary can clear the copyState for this shard.
state.setStage(SegmentReplicationState.Stage.GET_FILES);
source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, store, getFilesListener);
}

private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, ActionListener<Void> listener) {
state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION);
ActionListener.completeWith(listener, () -> {
multiFileWriter.renameAllTempFiles();
final Store store = store();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
* @param replicaShard replica shard on which checkpoint is received
*/
public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedCheckpoint, final IndexShard replicaShard) {

logger.trace(() -> new ParameterizedMessage("Replica received new replication checkpoint from primary [{}]", receivedCheckpoint));
// Checks if received checkpoint is already present and ahead then it replaces old received checkpoint
if (latestReceivedCheckpoint.get(replicaShard.shardId()) != null) {
if (receivedCheckpoint.isAheadOf(latestReceivedCheckpoint.get(replicaShard.shardId()))) {
Expand All @@ -139,6 +139,14 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication complete, timing data: {}",
replicaShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
// if we received a checkpoint during the copy event that is ahead of this
// try and process it.
if (latestReceivedCheckpoint.get(replicaShard.shardId()).isAheadOf(replicaShard.getLatestReplicationCheckpoint())) {
Expand All @@ -154,6 +162,14 @@ public void onReplicationDone(SegmentReplicationState state) {

@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
replicaShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
if (sendShardFailure == true) {
logger.error("replication failure", e);
replicaShard.failShard("replication failure", e);
Expand All @@ -172,9 +188,9 @@ public void startReplication(
startReplication(new SegmentReplicationTarget(checkpoint, indexShard, sourceFactory.get(indexShard), listener));
}

public void startReplication(final SegmentReplicationTarget target) {
// pkg-private for integration tests
void startReplication(final SegmentReplicationTarget target) {
final long replicationId = onGoingReplications.start(target, recoverySettings.activityTimeout());
logger.trace(() -> new ParameterizedMessage("Starting replication {}", replicationId));
threadPool.generic().execute(new ReplicationRunner(replicationId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.common.ReplicationTimer;
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -103,7 +104,10 @@ final void publish(IndexShard indexShard) {
// we have to execute under the system context so that if security is enabled the sync is authorized
threadContext.markAsSystemContext();
PublishCheckpointRequest request = new PublishCheckpointRequest(indexShard.getLatestReplicationCheckpoint());
final ReplicationCheckpoint checkpoint = request.getCheckpoint();
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request);
final ReplicationTimer timer = new ReplicationTimer();
timer.start();
transportService.sendChildRequest(
clusterService.localNode(),
transportPrimaryAction,
Expand All @@ -123,12 +127,23 @@ public String executor() {

@Override
public void handleResponse(ReplicationResponse response) {
timer.stop();
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] Completed publishing checkpoint [{}], timing: {}",
indexShard.shardId().getId(),
checkpoint,
timer.time()
)
);
task.setPhase("finished");
taskManager.unregister(task);
}

@Override
public void handleException(TransportException e) {
timer.stop();
logger.trace("[shardId {}] Failed to publish checkpoint, timing: {}", indexShard.shardId().getId(), timer.time());
task.setPhase("finished");
taskManager.unregister(task);
if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) {
Expand All @@ -151,6 +166,13 @@ public void handleException(TransportException e) {
}
}
);
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] Publishing replication checkpoint [{}]",
checkpoint.getShardId().getId(),
checkpoint
)
);
}
}

Expand All @@ -168,7 +190,7 @@ protected void shardOperationOnReplica(PublishCheckpointRequest request, IndexSh
Objects.requireNonNull(request);
Objects.requireNonNull(replica);
ActionListener.completeWith(listener, () -> {
logger.trace("Checkpoint received on replica {}", request);
logger.trace(() -> new ParameterizedMessage("Checkpoint {} received on replica {}", request, replica.shardId()));
if (request.getCheckpoint().getShardId().equals(replica.shardId())) {
replicationService.onNewCheckpoint(request.getCheckpoint(), replica);
}
Expand Down
Loading

0 comments on commit a2ba3a8

Please sign in to comment.