Skip to content

Commit

Permalink
Merge pull request #1160 from chelma/MIGRATIONS-2128
Browse files Browse the repository at this point in the history
Adding subshard work item on lease expiry
  • Loading branch information
AndreKurait authored Dec 5, 2024
2 parents 3d82d1f + 178fe55 commit f2e93a4
Show file tree
Hide file tree
Showing 32 changed files with 1,219 additions and 383 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ private List<CompletableFuture<?>> generateDocs(String indexName, Workload workl
log.atTrace().setMessage("Created doc for index {}: {}")
.addArgument(indexName)
.addArgument(doc::toString).log();
return new BulkDocSection(indexName + "_" + docIdCounter.incrementAndGet(), indexName, null, doc.toString());
var docId = docIdCounter.incrementAndGet();
return new BulkDocSection(indexName + "_" + docId, indexName, null, doc.toString(), null);
})
.collect(Collectors.toList());

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package org.opensearch.migrations;

import java.time.Duration;
import java.time.Instant;

import org.opensearch.migrations.bulkload.workcoordination.WorkItemTimeProvider;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;


class RfsMigrateDocumentsTest {


private static class TestClass extends RfsMigrateDocuments {
public static int getSuccessorNextAcquisitionLeaseExponent(WorkItemTimeProvider workItemTimeProvider, Duration initialLeaseDuration,
Instant leaseExpirationTime) {
return RfsMigrateDocuments.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);
}
}

@Test
public void testGetSuccessorNextAcquisitionLeaseExponent_LessThanLowerThreshold() {
WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider();

// Lease at 40 minutes, shard prep 59 seconds, successor lease should be decreased since shard prep is < 2.5%
// and exponent is > 0
var existingLeaseExponent = 2;
var shardPrepTime = Duration.ofSeconds(59);
Duration initialLeaseDuration = Duration.ofMinutes(10);
var initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent);

workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH);
workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime));
Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple));

int successorNextAcquisitionLeaseExponent = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);

Assertions.assertEquals(existingLeaseExponent - 1, successorNextAcquisitionLeaseExponent, "Should decrease successorExponent");
}


@Test
public void testGetSuccessorNextAcquisitionLeaseExponent_LessThanLowerThresholdWith0Exponent() {
WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider();

var shardPrepTime = Duration.ofSeconds(1);
var existingLeaseExponent = 0;
var initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent);

workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH);
workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime));
Duration initialLeaseDuration = Duration.ofMinutes(10);
Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple));

int successorNextAcquisitionLeaseExponent = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);

Assertions.assertEquals(0, successorNextAcquisitionLeaseExponent, "Should return 0 for successorExponent");
}


@Test
public void testGetSuccessorNextAcquisitionLeaseExponent_LessThanUpperThreshold() {
WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider();

var shardPrepTime = Duration.ofSeconds(59);
var existingLeaseExponent = 0;
var initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent);

workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH);
workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime));
Duration initialLeaseDuration = Duration.ofMinutes(10);
Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple));

int successorNextAcquisitionLeaseExponent = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);

Assertions.assertEquals(existingLeaseExponent, successorNextAcquisitionLeaseExponent, "Should return existingLeaseExponent + 1 when shard prep time is less than 10% of lease duration");
}

@Test
public void testGetSuccessorNextAcquisitionLeaseExponent_EqualToUpperThreshold() {
WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider();

var shardPrepTime = Duration.ofSeconds(60);
var existingLeaseExponent = 0;
var initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent);

workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH);
workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime));
Duration initialLeaseDuration = Duration.ofMinutes(10);
Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple));

int successorNextAcquisitionLeaseExponent = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);

Assertions.assertEquals(existingLeaseExponent, successorNextAcquisitionLeaseExponent, "Should return existingLeaseExponent when shard prep time is equal to 10% of lease duration");
}

@Test
public void testGetSuccessorNextAcquisitionLeaseExponent_ExceedsUpperThreshold() {
WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider();

var shardPrepTime = Duration.ofSeconds(61);
var existingLeaseExponent = 0;
var initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent);

workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH);
workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime));
Duration initialLeaseDuration = Duration.ofMinutes(10);
Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple));

int successorNextAcquisitionLeaseExponent = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);

Assertions.assertEquals(existingLeaseExponent + 1, successorNextAcquisitionLeaseExponent, "Should return existingLeaseExponent + 1 when shard prep time is greater than to 10% of lease duration");
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opensearch.migrations.bulkload;

import java.io.File;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -126,18 +127,21 @@ private void migrationDocumentsWithClusters(
final var clockJitter = new Random(1);

// ExpectedMigrationWorkTerminationException is thrown on completion.
var expectedTerminationException = Assertions.assertThrows(
ExpectedMigrationWorkTerminationException.class,
() -> migrateDocumentsSequentially(
sourceRepo,
snapshotName,
List.of(),
targetCluster.getUrl(),
runCounter,
clockJitter,
testDocMigrationContext,
sourceCluster.getContainerVersion().getVersion(),
false
var expectedTerminationException = Assertions.assertTimeout(
Duration.ofSeconds(30),
() -> Assertions.assertThrows(
ExpectedMigrationWorkTerminationException.class,
() -> migrateDocumentsSequentially(
sourceRepo,
snapshotName,
List.of(),
targetCluster.getUrl(),
runCounter,
clockJitter,
testDocMigrationContext,
sourceCluster.getContainerVersion().getVersion(),
false
)
)
);

Expand Down
Loading

0 comments on commit f2e93a4

Please sign in to comment.