Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use index for peer recovery instead of translog #45137

Merged
merged 38 commits into from
Aug 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
24941f2
Create peer-recovery retention leases (#43190)
DaveCTurner Jun 19, 2019
f57ec7b
Fix compilation
DaveCTurner Jun 19, 2019
f923fad
Merge branch '7.x' into peer-recovery-retention-leases-7.x
DaveCTurner Jun 21, 2019
d0f889d
Treat UNASSIGNED_SEQUENCE_NUMBER as NO_OPS_PERFORMED
DaveCTurner Jun 24, 2019
465ea7b
Merge branch '7.x' into peer-recovery-retention-leases-7.x
DaveCTurner Jun 26, 2019
1d0930f
Add missing GCP update (#43632)
DaveCTurner Jun 26, 2019
4c97e8a
Merge branch '7.x' into peer-recovery-retention-leases-7.x
DaveCTurner Jul 1, 2019
f4b0742
Less sync
DaveCTurner Jul 1, 2019
f8158fc
Relax condition, we may have renewed some other leases too
DaveCTurner Jul 1, 2019
14ec424
Better test fix
DaveCTurner Jul 1, 2019
a455b47
Checkstyle
DaveCTurner Jul 1, 2019
c4f042b
Advance PRRLs to match GCP of tracked shards (#43751)
DaveCTurner Jul 1, 2019
be80b60
Merge branch '7.x' into peer-recovery-retention-leases-7.x
DaveCTurner Jul 4, 2019
570b4b9
Update BWC version for PRRLs (#43959)
DaveCTurner Jul 4, 2019
50e9b75
Remove PRRLs before performing file-based recovery (#43928)
DaveCTurner Jul 4, 2019
a4d5cf1
Merge branch '7.x' into peer-recovery-retention-leases-7.x
DaveCTurner Jul 5, 2019
5dd6c68
Return recovery to generic thread post-PRRL action (#44000)
DaveCTurner Jul 5, 2019
b9959e3
Skip PRRL renewal on UNASSIGNED_SEQ_NO (#44019)
DaveCTurner Jul 5, 2019
fb39bb0
Only call assertNotTransportThread if asserts on (#44028)
DaveCTurner Jul 8, 2019
59a6830
Merge branch '7.x' into peer-recovery-retention-leases-7.x
DaveCTurner Jul 8, 2019
4b19a4b
Create missing PRRLs after primary activation (#44009)
DaveCTurner Jul 8, 2019
18a0e53
Reduce number of replicas in cluster restart test
DaveCTurner Jul 8, 2019
f1626e9
Enable soft deletes in PRRL restart test
DaveCTurner Jul 8, 2019
aaeb1aa
Can only guarantee PRRLs if soft deletes enabled
DaveCTurner Jul 9, 2019
15a719e
Merge branch '7.x' into peer-recovery-retention-leases-7.x
DaveCTurner Jul 11, 2019
66583fd
Merge branch '7.x' into peer-recovery-retention-leases-7.x
DaveCTurner Jul 15, 2019
062bc8d
Merge branch '7.x' into peer-recovery-retention-leases-7.x
DaveCTurner Jul 20, 2019
85b701a
Merge branch '7.x' into peer-recovery-retention-leases-7.x
DaveCTurner Jul 23, 2019
8fe0cda
Use global checkpoint as starting seq in ops-based recovery (#43463)
dnhatn Jul 23, 2019
96d5ee7
Do not load global checkpoint to ReplicationTracker in local recovery…
dnhatn Jul 24, 2019
6f6aaca
Merge branch '7.x' into peer-recovery-retention-leases-7.x
DaveCTurner Jul 29, 2019
f8bfcb3
Skip local recovery for closed or frozen indices (#44887)
dnhatn Jul 30, 2019
d225b78
Merge branch '7.x' into peer-recovery-retention-leases-7.x
DaveCTurner Jul 31, 2019
e1b059b
Merge branch '7.x' into peer-recovery-retention-leases-7.x
DaveCTurner Aug 1, 2019
6d73b9f
Recover peers using history from Lucene (#44853)
DaveCTurner Aug 1, 2019
513d155
Reset starting seqno if fail to read last commit (#45106)
dnhatn Aug 1, 2019
14bba51
Add frozen indices plugin to transport client
DaveCTurner Aug 2, 2019
a4c9d56
Merge branch '7.x' into peer-recovery-retention-leases-7.x
DaveCTurner Aug 2, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.test.rest.yaml.ObjectPath;
import org.junit.Before;

Expand Down Expand Up @@ -260,7 +261,9 @@ public void testForgetFollower() throws IOException {
final Map<?, ?> shardStatsAsMap = (Map<?, ?>) shardStats.get(0);
final Map<?, ?> retentionLeasesStats = (Map<?, ?>) shardStatsAsMap.get("retention_leases");
final List<?> leases = (List<?>) retentionLeasesStats.get("leases");
assertThat(leases, empty());
for (final Object lease : leases) {
assertThat(((Map<?, ?>) lease).get("source"), equalTo(ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.seqno.RetentionLeaseUtils;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.rest.action.document.RestGetAction;
import org.elasticsearch.rest.action.document.RestIndexAction;
Expand Down Expand Up @@ -89,7 +90,7 @@ public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase {
private String type;

@Before
public void setIndex() throws IOException {
public void setIndex() {
index = getTestName().toLowerCase(Locale.ROOT);
}

Expand Down Expand Up @@ -1338,4 +1339,31 @@ protected void ensureGreenLongWait(String index) throws IOException {
assertEquals("green", healthRsp.get("status"));
assertFalse((Boolean) healthRsp.get("timed_out"));
}

public void testPeerRecoveryRetentionLeases() throws IOException {
assumeTrue(getOldClusterVersion() + " does not support soft deletes", getOldClusterVersion().onOrAfter(Version.V_6_5_0));
if (isRunningAgainstOldCluster()) {
XContentBuilder settings = jsonBuilder();
settings.startObject();
{
settings.startObject("settings");
settings.field("number_of_shards", between(1, 5));
settings.field("number_of_replicas", between(0, 1));
if (randomBoolean() || getOldClusterVersion().before(Version.V_7_0_0)) {
// this is the default after v7.0.0, but is required before that
settings.field("soft_deletes.enabled", true);
}
settings.endObject();
}
settings.endObject();

Request createIndex = new Request("PUT", "/" + index);
createIndex.setJsonEntity(Strings.toString(settings));
client().performRequest(createIndex);
ensureGreen(index);
} else {
ensureGreen(index);
RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.RetentionLeaseUtils;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.document.RestIndexAction;
import org.elasticsearch.rest.action.document.RestUpdateAction;
Expand Down Expand Up @@ -384,6 +385,80 @@ public void testRecoveryWithSoftDeletes() throws Exception {
ensureGreen(index);
}

public void testRetentionLeasesEstablishedWhenPromotingPrimary() throws Exception {
final String index = "recover_and_create_leases_in_promotion";
if (CLUSTER_TYPE == ClusterType.OLD) {
Settings.Builder settings = Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), between(1, 5))
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), between(1, 2)) // triggers nontrivial promotion
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
createIndex(index, settings.build());
int numDocs = randomInt(10);
indexDocs(index, 0, numDocs);
if (randomBoolean()) {
client().performRequest(new Request("POST", "/" + index + "/_flush"));
}
}
ensureGreen(index);
if (CLUSTER_TYPE == ClusterType.UPGRADED) {
assertBusy(() -> RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index));
}
}

public void testRetentionLeasesEstablishedWhenRelocatingPrimary() throws Exception {
final String index = "recover_and_create_leases_in_relocation";
switch (CLUSTER_TYPE) {
case OLD:
Settings.Builder settings = Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), between(1, 5))
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), between(0, 1))
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
createIndex(index, settings.build());
int numDocs = randomInt(10);
indexDocs(index, 0, numDocs);
if (randomBoolean()) {
client().performRequest(new Request("POST", "/" + index + "/_flush"));
}
ensureGreen(index);
break;

case MIXED:
// trigger a primary relocation by excluding the last old node with a shard filter
final Map<?, ?> nodesMap
= ObjectPath.createFromResponse(client().performRequest(new Request("GET", "/_nodes"))).evaluate("nodes");
final List<String> oldNodeNames = new ArrayList<>();
for (Object nodeDetails : nodesMap.values()) {
final Map<?, ?> nodeDetailsMap = (Map<?, ?>) nodeDetails;
final String versionString = (String) nodeDetailsMap.get("version");
if (versionString.equals(Version.CURRENT.toString()) == false) {
oldNodeNames.add((String) nodeDetailsMap.get("name"));
}
}

if (oldNodeNames.size() == 1) {
final String oldNodeName = oldNodeNames.get(0);
logger.info("--> excluding index [{}] from node [{}]", index, oldNodeName);
final Request putSettingsRequest = new Request("PUT", "/" + index + "/_settings");
putSettingsRequest.setJsonEntity("{\"index.routing.allocation.exclude._name\":\"" + oldNodeName + "\"}");
assertOK(client().performRequest(putSettingsRequest));
ensureGreen(index);
assertBusy(() -> RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index));
} else {
ensureGreen(index);
}
break;

case UPGRADED:
ensureGreen(index);
assertBusy(() -> RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index));
break;
}
}

/**
* This test creates an index in the non upgraded cluster and closes it. It then checks that the index
* is effectively closed and potentially replicated (if the version the index was created on supports
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ public abstract int estimateNumberOfHistoryOperations(String source,
MapperService mapperService, long startingSeqNo) throws IOException;

/**
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its translog
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog)
*/
public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,18 +518,30 @@ public void syncTranslog() throws IOException {
}

/**
* Creates a new history snapshot for reading operations since the provided seqno from the translog.
* Creates a new history snapshot for reading operations since the provided seqno.
* The returned snapshot can be retrieved from either Lucene index or translog files.
*/
@Override
public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
return newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
}

return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
}

/**
* Returns the estimated number of history operations whose seq# at least the provided seq# in this engine.
*/
@Override
public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) {
public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
try (Translog.Snapshot snapshot = newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo),
Long.MAX_VALUE, false)) {
return snapshot.totalOperations();
}
}

return getTranslog().estimateTotalOperationsFromMinSeq(startingSeqNo);
}

Expand Down Expand Up @@ -2579,6 +2591,10 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS

@Override
public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
return getMinRetainedSeqNo() <= startingSeqNo;
}

final long currentLocalCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
// avoid scanning translog if not necessary
if (startingSeqNo > currentLocalCheckpoint) {
Expand Down Expand Up @@ -2608,15 +2624,7 @@ public final long getMinRetainedSeqNo() {
@Override
public Closeable acquireRetentionLock() {
if (softDeleteEnabled) {
final Releasable softDeletesRetentionLock = softDeletesPolicy.acquireRetentionLock();
final Closeable translogRetentionLock;
try {
translogRetentionLock = translog.acquireRetentionLock();
} catch (Exception e) {
softDeletesRetentionLock.close();
throw e;
}
return () -> IOUtils.close(translogRetentionLock, softDeletesRetentionLock);
return softDeletesPolicy.acquireRetentionLock();
} else {
return translog.acquireRetentionLock();
}
Expand Down
Loading