Skip to content

Commit

Permalink
Avoid unnecessary persistence of retention leases (elastic#42299)
Browse files Browse the repository at this point in the history
Today we are persisting the retention leases at least every thirty
seconds by a scheduled background sync. This sync causes an fsync to
disk and when there are a large number of shards allocated to slow
disks, these fsyncs can pile up and can severely impact the system. This
commit addresses this by only persisting and fsyncing the retention
leases if they have changed since the last time that we persisted and
fsynced the retention leases.
  • Loading branch information
jasontedor authored and Gurkan Kaymak committed May 27, 2019
1 parent 8350210 commit 4b42db9
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.ObjectLongMap;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.replication.ReplicationResponse;
Expand Down Expand Up @@ -181,6 +180,18 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
*/
private RetentionLeases retentionLeases = RetentionLeases.EMPTY;

/**
* The primary term of the most-recently persisted retention leases. This is used to check if we need to persist the current retention
* leases.
*/
private long persistedRetentionLeasesPrimaryTerm;

/**
* The version of the most-recently persisted retention leases. This is used to check if we need to persist the current retention
* leases.
*/
private long persistedRetentionLeasesVersion;

/**
* Get all retention leases tracked on this shard.
*
Expand Down Expand Up @@ -343,7 +354,8 @@ public RetentionLeases loadRetentionLeases(final Path path) throws IOException {
private final Object retentionLeasePersistenceLock = new Object();

/**
* Persists the current retention leases to their dedicated state file.
* Persists the current retention leases to their dedicated state file. If this version of the retention leases are already persisted
* then persistence is skipped.
*
* @param path the path to the directory containing the state file
* @throws WriteStateException if an exception occurs writing the state file
Expand All @@ -352,10 +364,16 @@ public void persistRetentionLeases(final Path path) throws WriteStateException {
synchronized (retentionLeasePersistenceLock) {
final RetentionLeases currentRetentionLeases;
synchronized (this) {
if (retentionLeases.supersedes(persistedRetentionLeasesPrimaryTerm, persistedRetentionLeasesVersion) == false) {
logger.trace("skipping persisting retention leases [{}], already persisted", retentionLeases);
return;
}
currentRetentionLeases = retentionLeases;
}
logger.trace("persisting retention leases [{}]", currentRetentionLeases);
RetentionLeases.FORMAT.writeAndCleanup(currentRetentionLeases, path);
persistedRetentionLeasesPrimaryTerm = currentRetentionLeases.primaryTerm();
persistedRetentionLeasesVersion = currentRetentionLeases.version();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,27 @@ public long version() {

/**
* Checks if this retention leases collection supersedes the specified retention leases collection. A retention leases collection
* supersedes another retention leases collection if its primary term is higher, or if for equal primary terms its version is higher
* supersedes another retention leases collection if its primary term is higher, or if for equal primary terms its version is higher.
*
* @param that the retention leases collection to test against
* @return true if this retention leases collection supercedes the specified retention lease collection, otherwise false
*/
public boolean supersedes(final RetentionLeases that) {
return primaryTerm > that.primaryTerm || primaryTerm == that.primaryTerm && version > that.version;
boolean supersedes(final RetentionLeases that) {
return supersedes(that.primaryTerm, that.version);
}

/**
* Checks if this retention leases collection would supersede a retention leases collection with the specified primary term and version.
* A retention leases collection supersedes another retention leases collection if its primary term is higher, or if for equal primary
* terms its version is higher.
*
* @param primaryTerm the primary term
* @param version the version
* @return true if this retention leases collection would supercedes a retention lease collection with the specified primary term and
* version
*/
boolean supersedes(final long primaryTerm, final long version) {
return this.primaryTerm > primaryTerm || this.primaryTerm == primaryTerm && this.version > version;
}

private final Map<String, RetentionLease> leases;
Expand Down Expand Up @@ -203,7 +217,7 @@ public static RetentionLeases fromXContent(final XContentParser parser) {
return PARSER.apply(parser, null);
}

static final MetaDataStateFormat<RetentionLeases> FORMAT = new MetaDataStateFormat<RetentionLeases>("retention-leases-") {
static final MetaDataStateFormat<RetentionLeases> FORMAT = new MetaDataStateFormat<>("retention-leases-") {

@Override
public void toXContent(final XContentBuilder builder, final RetentionLeases retentionLeases) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.gateway.WriteStateException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -489,6 +490,48 @@ public void testLoadAndPersistRetentionLeases() throws IOException {
assertThat(replicationTracker.loadRetentionLeases(path), equalTo(replicationTracker.getRetentionLeases()));
}

public void testUnnecessaryPersistenceOfRetentionLeases() throws IOException {
final AllocationId allocationId = AllocationId.newInitializing();
long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
final ReplicationTracker replicationTracker = new ReplicationTracker(
new ShardId("test", "_na", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
primaryTerm,
UNASSIGNED_SEQ_NO,
value -> {},
() -> 0L,
(leases, listener) -> {});
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId));
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
final int length = randomIntBetween(0, 8);
for (int i = 0; i < length; i++) {
if (rarely() && primaryTerm < Long.MAX_VALUE) {
primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE);
replicationTracker.setOperationPrimaryTerm(primaryTerm);
}
final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
replicationTracker.addRetentionLease(
Integer.toString(i), retainingSequenceNumber, "test-" + i, ActionListener.wrap(() -> {}));
}

final Path path = createTempDir();
replicationTracker.persistRetentionLeases(path);

final Tuple<RetentionLeases, Long> retentionLeasesWithGeneration =
RetentionLeases.FORMAT.loadLatestStateWithGeneration(logger, NamedXContentRegistry.EMPTY, path);

replicationTracker.persistRetentionLeases(path);
final Tuple<RetentionLeases, Long> retentionLeasesWithGenerationAfterUnnecessaryPersistence =
RetentionLeases.FORMAT.loadLatestStateWithGeneration(logger, NamedXContentRegistry.EMPTY, path);

assertThat(retentionLeasesWithGenerationAfterUnnecessaryPersistence.v1(), equalTo(retentionLeasesWithGeneration.v1()));
assertThat(retentionLeasesWithGenerationAfterUnnecessaryPersistence.v2(), equalTo(retentionLeasesWithGeneration.v2()));
}

/**
* Test that we correctly synchronize writing the retention lease state file in {@link ReplicationTracker#persistRetentionLeases(Path)}.
* This test can fail without the synchronization block in that method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ public void testSupersedesByPrimaryTerm() {
final long higherPrimaryTerm = randomLongBetween(lowerPrimaryTerm + 1, Long.MAX_VALUE);
final RetentionLeases right = new RetentionLeases(higherPrimaryTerm, randomLongBetween(1, Long.MAX_VALUE), Collections.emptyList());
assertTrue(right.supersedes(left));
assertTrue(right.supersedes(left.primaryTerm(), left.version()));
assertFalse(left.supersedes(right));
assertFalse(left.supersedes(right.primaryTerm(), right.version()));
}

public void testSupersedesByVersion() {
Expand All @@ -70,7 +72,9 @@ public void testSupersedesByVersion() {
final RetentionLeases left = new RetentionLeases(primaryTerm, lowerVersion, Collections.emptyList());
final RetentionLeases right = new RetentionLeases(primaryTerm, higherVersion, Collections.emptyList());
assertTrue(right.supersedes(left));
assertTrue(right.supersedes(left.primaryTerm(), left.version()));
assertFalse(left.supersedes(right));
assertFalse(left.supersedes(right.primaryTerm(), right.version()));
}

public void testRetentionLeasesRejectsDuplicates() {
Expand Down

0 comments on commit 4b42db9

Please sign in to comment.