Skip to content

Commit

Permalink
Update tests and rebase onto latest main
Browse files Browse the repository at this point in the history
Signed-off-by: Bhumika Saini <[email protected]>
  • Loading branch information
Bhumika Saini committed Aug 14, 2023
1 parent a8f3bce commit 9adab8b
Show file tree
Hide file tree
Showing 17 changed files with 583 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,16 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

builder.startObject(Fields.TRANSLOG);
builder.startObject(SubFields.UPLOAD);
buildTranslogUploadStats(builder);
// Ensuring that we are not showing 0 metrics to the user
if (remoteTranslogShardStats.totalUploadsStarted > 0) {
buildTranslogUploadStats(builder);
}
builder.endObject(); // translog.upload
builder.startObject(SubFields.DOWNLOAD);
buildTranslogDownloadStats(builder);
// Ensuring that we are not showing 0 metrics to the user
if (remoteTranslogShardStats.totalDownloadsSucceeded > 0) {
buildTranslogDownloadStats(builder);
}
builder.endObject(); // translog.download
builder.endObject(); // translog

Expand Down Expand Up @@ -291,7 +297,7 @@ static final class UploadStatsFields {
static final String LAST_SUCCESSFUL_UPLOAD_TIMESTAMP = "last_successful_upload_timestamp";

/**
* Number of total uploads to remote store
* Count of files uploaded to remote store
*/
static final String TOTAL_UPLOADS = "total_uploads";

Expand Down Expand Up @@ -323,7 +329,7 @@ static final class UploadStatsFields {

static final class DownloadStatsFields {
/**
* Epoch timestamp of the last successful download session
* Epoch timestamp of the last successful download
*/
public static final String LAST_SUCCESSFUL_DOWNLOAD_TIMESTAMP = "last_successful_download_timestamp";

Expand All @@ -333,7 +339,7 @@ static final class DownloadStatsFields {
static final String LAST_SYNC_TIMESTAMP = "last_sync_timestamp";

/**
* Count of total remote store download sessions
* Count of files downloaded from remote store
*/
public static final String TOTAL_DOWNLOADS = "total_downloads";

Expand All @@ -353,18 +359,18 @@ static final class DownloadStatsFields {
static final String DOWNLOAD_SIZE_IN_BYTES = "download_size_in_bytes";

/**
* Average speed (in bytes/sec) of a remote store download session
* Average speed (in bytes/sec) of a remote store download
*/
static final String DOWNLOAD_SPEED_IN_BYTES_PER_SEC = "download_speed_in_bytes_per_sec";

/**
* Average time spent on a remote store download session
* Average time spent on a remote store download
*/
public static final String DOWNLOAD_TIME_IN_MILLIS = "download_time_in_millis";
}

/**
* Reusable sub fields for {@link Fields}
* Reusable sub fields for {@link UploadStatsFields} and {@link DownloadStatsFields}
*/
static final class SubFields {
static final String STARTED = "started";
Expand All @@ -375,12 +381,12 @@ static final class SubFields {
static final String UPLOAD = "upload";

/**
* Moving avg over last N values stat for a {@link Fields}
* Moving avg over last N values stat
*/
static final String MOVING_AVG = "moving_avg";

/**
* Most recent successful attempt stat for a {@link Fields}
* Most recent successful attempt stat
*/
static final String LAST_SUCCESSFUL = "last_successful";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ protected ShardsIterator shards(ClusterState clusterState, RemoteStoreStatsReque
|| (shardRouting.currentNodeId() == null
|| shardRouting.currentNodeId().equals(clusterState.getNodes().getLocalNodeId()))
)
.filter(ShardRouting::primary)
.filter(
shardRouting -> Boolean.parseBoolean(
clusterState.getMetadata().index(shardRouting.index()).getSettings().get(IndexMetadata.SETTING_REMOTE_STORE_ENABLED)
Expand Down Expand Up @@ -158,7 +157,6 @@ protected RemoteStoreStats shardOperation(RemoteStoreStatsRequest request, Shard
indexShard.shardId()
);
assert Objects.nonNull(remoteSegmentTransferTracker);

RemoteTranslogTracker remoteTranslogTracker = remoteStorePressureService.getRemoteTranslogTracker(indexShard.shardId());
assert Objects.nonNull(remoteTranslogTracker);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ public RemoteSegmentTransferTracker getRemoteRefreshSegmentTracker(ShardId shard
}

/**
* Get {@link RemoteTranslogTracker} only if the underlying Index has remote translog store enabled.
* Get {@link RemoteTranslogTracker} for the shard
*
* @param shardId shard id
* @return the tracker if index is remote translog store-backed, else null.
* @return The tracker if index is Remote Translog Store-backed, else null.
*/
public RemoteTranslogTracker getRemoteTranslogTracker(ShardId shardId) {
return trackerMapRemoteTranslogStore.get(shardId);
Expand Down Expand Up @@ -135,16 +135,6 @@ public boolean isSegmentsUploadBackpressureEnabled() {
return pressureSettings.isRemoteRefreshSegmentPressureEnabled();
}

/**
* Check if remote translog backpressure is enabled.
*
* @return true if enabled, else false.
*/
public boolean isTranslogUploadBackpressureEnabled() {
// Note: This is not yet implemented.
return false;
}

/**
* Validates if segments are lagging more than the limits. If yes, it would lead to rejections of the requests.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public long getTotalDownloadsSucceeded() {
return totalDownloadsSucceeded.get();
}

public void addTotalDownloadsSucceeded(long count) {
public void addDownloadsSucceeded(long count) {
totalDownloadsSucceeded.addAndGet(count);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ public RemoteFsTranslog(
}
}

// visible for testing
public RemoteTranslogTracker getRemoteTranslogTracker() {
return remoteTranslogTracker;
}

public static void download(Repository repository, ShardId shardId, ThreadPool threadPool, Path location, Logger logger)
throws IOException {
assert repository instanceof BlobStoreRepository : String.format(
Expand All @@ -156,12 +161,11 @@ public static void download(Repository repository, ShardId shardId, ThreadPool t

public static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException {
logger.trace("Downloading translog files from remote");
RemoteTranslogTracker statsTracker = translogTransferManager.getRemoteTranslogTracker();
long bytesBefore = statsTracker.getDownloadBytesSucceeded();
long downloadStartTime = RemoteStoreUtils.getCurrentSystemNanoTime();
TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata();
if (translogMetadata != null) {
RemoteTranslogTracker statsTracker = translogTransferManager.getRemoteTranslogTracker();
statsTracker.addTotalDownloadsSucceeded(1);
long bytesBefore = statsTracker.getDownloadBytesSucceeded();
long downloadStartTime = RemoteStoreUtils.getCurrentSystemNanoTime();
if (Files.notExists(location)) {
Files.createDirectories(location);
}
Expand All @@ -181,7 +185,11 @@ public static void download(TranslogTransferManager translogTransferManager, Pat
long downloadEndTimeMs = System.currentTimeMillis();
long durationInMillis = (downloadEndTime - downloadStartTime) / 1_000_000L;
long bytesDownloaded = statsTracker.getDownloadBytesSucceeded() - bytesBefore;

statsTracker.setLastSuccessfulDownloadTimestamp(downloadEndTimeMs);
// We update the duration at the end of successfully downloading all of metadata, .tlog, .ckp
// files because this is not a file-level metric but a sync-level metric.
// This also ensures the bytes per sec moving average can be correlated.
statsTracker.addDownloadTimeInMillis(durationInMillis);
statsTracker.updateDownloadBytesMovingAverage(bytesDownloaded);
statsTracker.updateDownloadTimeMovingAverage(durationInMillis);
Expand Down Expand Up @@ -525,17 +533,20 @@ public RemoteFsTranslogTransferListener(Releasable transferReleasable, Long gene
@Override
public void beforeUpload(TransferSnapshot transferSnapshot) throws IOException {
toUpload = RemoteStoreUtils.getUploadBlobsFromSnapshot(transferSnapshot, fileTransferTracker);
uploadBytes = RemoteStoreUtils.getTotalBytes(toUpload);
uploadStartTime = RemoteStoreUtils.getCurrentSystemNanoTime();

captureStatsBeforeUpload();
if (toUpload.size() > 0) {
uploadBytes = RemoteStoreUtils.getTotalBytes(toUpload);
captureStatsBeforeUpload();
uploadStartTime = RemoteStoreUtils.getCurrentSystemNanoTime();
}
}

@Override
public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException {
uploadEndTime = RemoteStoreUtils.getCurrentSystemNanoTime();
uploadEndTimeMs = System.currentTimeMillis();
captureStatsOnUploadSuccess();
if (toUpload != null && toUpload.size() > 0) {
uploadEndTime = RemoteStoreUtils.getCurrentSystemNanoTime();
uploadEndTimeMs = System.currentTimeMillis();
captureStatsOnUploadSuccess();
}

transferReleasable.close();
closeFilesIfNoPendingRetentionLocks();
Expand All @@ -546,13 +557,13 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOExcepti

@Override
public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) throws IOException {
uploadEndTime = RemoteStoreUtils.getCurrentSystemNanoTime();
uploadEndTimeMs = System.currentTimeMillis();
captureStatsOnUploadFailure();
if (toUpload != null && toUpload.size() > 0) {
uploadEndTime = RemoteStoreUtils.getCurrentSystemNanoTime();
captureStatsOnUploadFailure();
}

transferReleasable.close();
closeFilesIfNoPendingRetentionLocks();

if (ex instanceof IOException) {
throw (IOException) ex;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
try {
if (toUpload.isEmpty()) {
logger.trace("Nothing to upload for transfer");
translogTransferListener.onUploadComplete(transferSnapshot);
return true;
}

Expand Down Expand Up @@ -200,7 +201,7 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th
// Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync
fileTransferTracker.add(fileName, true);
remoteTranslogTrackerSetOnce.get().addDownloadBytesSucceeded(bytesToRead);
remoteTranslogTrackerSetOnce.get().addTotalDownloadsSucceeded(1);
remoteTranslogTrackerSetOnce.get().addDownloadsSucceeded(1);
}

public TranslogTransferMetadata readMetadata() throws IOException {
Expand All @@ -212,9 +213,11 @@ public TranslogTransferMetadata readMetadata() throws IOException {
if (blobMetadataList.isEmpty()) return;
String filename = blobMetadataList.get(0).name();
try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename)) {
long bytesToRead = inputStream.available();
IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes());
metadataSetOnce.set(metadataStreamWrapper.readStream(indexInput));
remoteTranslogTrackerSetOnce.get().addDownloadBytesSucceeded(indexInput.length());
remoteTranslogTrackerSetOnce.get().addDownloadBytesSucceeded(bytesToRead);
remoteTranslogTrackerSetOnce.get().addDownloadsSucceeded(1);
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e);
exceptionSetOnce.set(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
import org.opensearch.common.util.iterable.Iterables;
import org.opensearch.common.util.set.Sets;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.common.lease.Releasable;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
Expand Down Expand Up @@ -953,7 +952,7 @@ public IndexShard createShard(
.indices()
.preparePutMapping()
.setConcreteIndex(shardRouting.index()) // concrete index - no name clash, it uses uuid
.setSource(mapping.source().string(), XContentType.JSON)
.setSource(mapping.source().string(), MediaTypeRegistry.JSON)
.get();
}, this);
return indexShard;
Expand Down Expand Up @@ -1866,4 +1865,9 @@ public boolean allPendingDanglingIndicesWritten() {
public void setPressureService(RemoteStorePressureService pressureService) {
pressureServiceSetOnce.trySet(pressureService);
}

// visible for testing
public RemoteStorePressureService getPressureService() {
return pressureServiceSetOnce.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.Map;

import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.compareStatsResponse;
import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createPressureTrackerTranslogStats;
import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createTranslogStats;
import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createShardRouting;
import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createStatsForNewPrimary;
import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createStatsForNewReplica;
Expand All @@ -51,7 +51,7 @@ public void tearDown() throws Exception {

public void testSerializationForPrimary() throws Exception {
RemoteSegmentTransferTracker.Stats mockPrimaryTrackerStats = createStatsForNewPrimary(shardId);
RemoteTranslogTracker.Stats mockTranslogTrackerStats = createPressureTrackerTranslogStats(shardId);
RemoteTranslogTracker.Stats mockTranslogTrackerStats = createTranslogStats(shardId);
ShardRouting primaryShardRouting = createShardRouting(shardId, true);
RemoteStoreStats primaryShardStats = new RemoteStoreStats(mockPrimaryTrackerStats, mockTranslogTrackerStats, primaryShardRouting);
RemoteStoreStatsResponse statsResponse = new RemoteStoreStatsResponse(
Expand Down Expand Up @@ -82,7 +82,7 @@ public void testSerializationForPrimary() throws Exception {
public void testSerializationForBothPrimaryAndReplica() throws Exception {
RemoteSegmentTransferTracker.Stats mockPrimaryTrackerStats = createStatsForNewPrimary(shardId);
RemoteSegmentTransferTracker.Stats mockReplicaTrackerStats = createStatsForNewReplica(shardId);
RemoteTranslogTracker.Stats mockTranslogTrackerStats = createPressureTrackerTranslogStats(shardId);
RemoteTranslogTracker.Stats mockTranslogTrackerStats = createTranslogStats(shardId);
ShardRouting primaryShardRouting = createShardRouting(shardId, true);
ShardRouting replicaShardRouting = createShardRouting(shardId, false);
RemoteStoreStats primaryShardStats = new RemoteStoreStats(mockPrimaryTrackerStats, mockTranslogTrackerStats, primaryShardRouting);
Expand Down Expand Up @@ -123,7 +123,7 @@ public void testSerializationForBothPrimaryAndReplica() throws Exception {
public void testSerializationForBothRemoteStoreRestoredPrimaryAndReplica() throws Exception {
RemoteSegmentTransferTracker.Stats mockPrimaryTrackerStats = createStatsForRemoteStoreRestoredPrimary(shardId);
RemoteSegmentTransferTracker.Stats mockReplicaTrackerStats = createStatsForNewReplica(shardId);
RemoteTranslogTracker.Stats mockTranslogTrackerStats = createPressureTrackerTranslogStats(shardId);
RemoteTranslogTracker.Stats mockTranslogTrackerStats = createTranslogStats(shardId);
ShardRouting primaryShardRouting = createShardRouting(shardId, true);
ShardRouting replicaShardRouting = createShardRouting(shardId, false);
RemoteStoreStats primaryShardStats = new RemoteStoreStats(mockPrimaryTrackerStats, mockTranslogTrackerStats, primaryShardRouting);
Expand Down
Loading

0 comments on commit 9adab8b

Please sign in to comment.