Skip to content

Commit

Permalink
Enforce translog access via engine (#29542)
Browse files Browse the repository at this point in the history
Today the translog of an engine is exposed and can be accessed directly.
While this exposure offers much flexibility, it also causes these troubles:

- Inconsistent behavior between translog method and engine method.
For example, rolling a translog generation via an engine also trims
unreferenced files, but translog's method does not.

- An engine does not get notified when critical errors happen in translog
as the access is direct.

This change isolates translog of an engine and enforces all accesses to
translog via the engine.
  • Loading branch information
dnhatn authored Apr 17, 2018
1 parent 1dd0fd4 commit 45c6c20
Show file tree
Hide file tree
Showing 21 changed files with 170 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -697,8 +697,7 @@ private void maybeFSyncTranslogs() {
if (indexSettings.getTranslogDurability() == Translog.Durability.ASYNC) {
for (IndexShard shard : this.shards.values()) {
try {
Translog translog = shard.getTranslog();
if (translog.syncNeeded()) {
if (shard.isSyncNeeded()) {
shard.sync();
}
} catch (AlreadyClosedException ex) {
Expand Down
62 changes: 60 additions & 2 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStats;

import java.io.Closeable;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -510,8 +511,18 @@ public enum SearcherScope {
EXTERNAL, INTERNAL
}

/** returns the translog for this engine */
public abstract Translog getTranslog();
/**
* Returns the translog associated with this engine.
* Prefer to keep the translog package-private, so that an engine can control all accesses to the translog.
*/
abstract Translog getTranslog();

/**
* Checks if the underlying storage sync is required.
*/
public boolean isTranslogSyncNeeded() {
return getTranslog().syncNeeded();
}

/**
* Ensures that all locations in the given stream have been written to the underlying storage.
Expand All @@ -520,6 +531,36 @@ public enum SearcherScope {

public abstract void syncTranslog() throws IOException;

public Closeable acquireTranslogRetentionLock() {
return getTranslog().acquireRetentionLock();
}

/**
* Creates a new translog snapshot from this engine for reading translog operations whose seq# at least the provided seq#.
* The caller has to close the returned snapshot after finishing the reading.
*/
public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
return getTranslog().newSnapshotFromMinSeqNo(minSeqNo);
}

/**
* Returns the estimated number of translog operations in this engine whose seq# at least the provided seq#.
*/
public int estimateTranslogOperationsFromMinSeq(long minSeqNo) {
return getTranslog().estimateTotalOperationsFromMinSeq(minSeqNo);
}

public TranslogStats getTranslogStats() {
return getTranslog().stats();
}

/**
* Returns the last location that the translog of this engine has written into.
*/
public Translog.Location getTranslogLastWriteLocation() {
return getTranslog().getLastWriteLocation();
}

protected final void ensureOpen(Exception suppressed) {
if (isClosed.get()) {
AlreadyClosedException ace = new AlreadyClosedException(shardId + " engine is closed", failedEngine.get());
Expand All @@ -546,6 +587,13 @@ public CommitStats commitStats() {
*/
public abstract LocalCheckpointTracker getLocalCheckpointTracker();

/**
* Returns the latest global checkpoint value that has been persisted in the underlying storage (i.e. translog's checkpoint)
*/
public long getLastSyncedGlobalCheckpoint() {
return getTranslog().getLastSyncedGlobalCheckpoint();
}

/**
* Global stats on segments.
*/
Expand Down Expand Up @@ -810,6 +858,16 @@ public final boolean refreshNeeded() {
*/
public abstract void trimTranslog() throws EngineException;

/**
* Tests whether or not the translog generation should be rolled to a new generation.
* This test is based on the size of the current generation compared to the configured generation threshold size.
*
* @return {@code true} if the current generation should be rolled to a new generation
*/
public boolean shouldRollTranslogGeneration() {
return getTranslog().shouldRollGeneration();
}

/**
* Rolls the translog generation and cleans unneeded.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy
}

@Override
public Translog getTranslog() {
Translog getTranslog() {
ensureOpen();
return translog;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ protected ReplicaResult shardOperationOnReplica(final Request request, final Ind

private void maybeSyncTranslog(final IndexShard indexShard) throws IOException {
if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST &&
indexShard.getTranslog().getLastSyncedGlobalCheckpoint() < indexShard.getGlobalCheckpoint()) {
indexShard.getLastSyncedGlobalCheckpoint() < indexShard.getGlobalCheckpoint()) {
indexShard.sync();
}
}
Expand Down
48 changes: 37 additions & 11 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,7 @@ public FieldDataStats fieldDataStats(String... fields) {
}

public TranslogStats translogStats() {
return getEngine().getTranslog().stats();
return getEngine().getTranslogStats();
}

public CompletionStats completionStats(String... fields) {
Expand Down Expand Up @@ -1331,7 +1331,7 @@ private boolean assertMaxUnsafeAutoIdInCommit() throws IOException {
}

protected void onNewEngine(Engine newEngine) {
refreshListeners.setTranslog(newEngine.getTranslog());
refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation);
}

/**
Expand Down Expand Up @@ -1563,8 +1563,7 @@ boolean shouldRollTranslogGeneration() {
final Engine engine = getEngineOrNull();
if (engine != null) {
try {
final Translog translog = engine.getTranslog();
return translog.shouldRollGeneration();
return engine.shouldRollTranslogGeneration();
} catch (final AlreadyClosedException e) {
// we are already closed, no need to flush or roll
}
Expand All @@ -1579,9 +1578,26 @@ public void onSettingsChanged() {
}
}

/**
* Acquires a lock on the translog files, preventing them from being trimmed.
*/
public Closeable acquireTranslogRetentionLock() {
Engine engine = getEngine();
return engine.getTranslog().acquireRetentionLock();
return getEngine().acquireTranslogRetentionLock();
}

/**
* Creates a new translog snapshot for reading translog operations whose seq# at least the provided seq#.
* The caller has to close the returned snapshot after finishing the reading.
*/
public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
return getEngine().newTranslogSnapshotFromMinSeqNo(minSeqNo);
}

/**
* Returns the estimated number of operations in translog whose seq# at least the provided seq#.
*/
public int estimateTranslogOperationsFromMinSeq(long minSeqNo) {
return getEngine().estimateTranslogOperationsFromMinSeq(minSeqNo);
}

public List<Segment> segments(boolean verbose) {
Expand All @@ -1592,10 +1608,6 @@ public void flushAndCloseEngine() throws IOException {
getEngine().flushAndClose();
}

public Translog getTranslog() {
return getEngine().getTranslog();
}

public String getHistoryUUID() {
return getEngine().getHistoryUUID();
}
Expand Down Expand Up @@ -1733,6 +1745,13 @@ public long getGlobalCheckpoint() {
return replicationTracker.getGlobalCheckpoint();
}

/**
* Returns the latest global checkpoint value that has been persisted in the underlying storage (i.e. translog's checkpoint)
*/
public long getLastSyncedGlobalCheckpoint() {
return getEngine().getLastSyncedGlobalCheckpoint();
}

/**
* Get the local knowledge of the global checkpoints for all in-sync allocation IDs.
*
Expand Down Expand Up @@ -2308,6 +2327,13 @@ public void sync() throws IOException {
getEngine().syncTranslog();
}

/**
* Checks if the underlying storage sync is required.
*/
public boolean isSyncNeeded() {
return getEngine().isTranslogSyncNeeded();
}

/**
* Returns the current translog durability mode
*/
Expand Down Expand Up @@ -2467,7 +2493,7 @@ final long getLastSearcherAccess() {
}

private void setRefreshPending(Engine engine) {
Translog.Location lastWriteLocation = engine.getTranslog().getLastWriteLocation();
Translog.Location lastWriteLocation = engine.getTranslogLastWriteLocation();
Translog.Location location;
do {
location = this.pendingRefreshLocation.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void resync(final IndexShard indexShard, final ActionListener<ResyncTask>
ActionListener<ResyncTask> resyncListener = null;
try {
final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1;
Translog.Snapshot snapshot = indexShard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
Translog.Snapshot snapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo);
resyncListener = new ActionListener<ResyncTask>() {
@Override
public void onResponse(final ResyncTask resyncTask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.IntSupplier;
import java.util.function.Supplier;

import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -153,21 +154,20 @@ public int pendingCount() {
/**
* Setup the translog used to find the last refreshed location.
*/
public void setTranslog(Translog translog) {
this.translog = translog;
public void setCurrentRefreshLocationSupplier(Supplier<Translog.Location> currentRefreshLocationSupplier) {
this.currentRefreshLocationSupplier = currentRefreshLocationSupplier;
}

// Implementation of ReferenceManager.RefreshListener that adapts Lucene's RefreshListener into Elasticsearch's refresh listeners.
private Translog translog;
/**
* Snapshot of the translog location before the current refresh if there is a refresh going on or null. Doesn't have to be volatile
* because when it is used by the refreshing thread.
*/
private Translog.Location currentRefreshLocation;
private Supplier<Translog.Location> currentRefreshLocationSupplier;

@Override
public void beforeRefresh() throws IOException {
currentRefreshLocation = translog.getLastWriteLocation();
currentRefreshLocation = currentRefreshLocationSupplier.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,6 @@ public RecoveryResponse recoverToTarget() throws IOException {
}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ");

try (Closeable ignored = shard.acquireTranslogRetentionLock()) {

final Translog translog = shard.getTranslog();

final long startingSeqNo;
final long requiredSeqNoRangeStart;
final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
Expand All @@ -170,7 +167,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
requiredSeqNoRangeStart =
Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
try {
phase1(phase1Snapshot.getIndexCommit(), translog::totalOperations);
phase1(phase1Snapshot.getIndexCommit(), () -> shard.estimateTranslogOperationsFromMinSeq(startingSeqNo));
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
} finally {
Expand All @@ -187,7 +184,7 @@ public RecoveryResponse recoverToTarget() throws IOException {

try {
// For a sequence based recovery, the target can keep its local translog
prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, translog.estimateTotalOperationsFromMinSeq(startingSeqNo));
prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, shard.estimateTranslogOperationsFromMinSeq(startingSeqNo));
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
}
Expand All @@ -210,9 +207,9 @@ public RecoveryResponse recoverToTarget() throws IOException {

logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo);

logger.trace("snapshot translog for recovery; current size is [{}]", translog.estimateTotalOperationsFromMinSeq(startingSeqNo));
logger.trace("snapshot translog for recovery; current size is [{}]", shard.estimateTranslogOperationsFromMinSeq(startingSeqNo));
final long targetLocalCheckpoint;
try(Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(startingSeqNo)) {
try(Translog.Snapshot snapshot = shard.newTranslogSnapshotFromMinSeqNo(startingSeqNo)) {
targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot);
} catch (Exception e) {
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
Expand Down Expand Up @@ -261,7 +258,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException {
// the start recovery request is initialized with the starting sequence number set to the target shard's local checkpoint plus one
if (startingSeqNo - 1 <= localCheckpoint) {
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
try (Translog.Snapshot snapshot = shard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
try (Translog.Snapshot snapshot = shard.newTranslogSnapshotFromMinSeqNo(startingSeqNo)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public void testAsyncFsyncActuallyWorks() throws Exception {
client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON).get();
IndexShard shard = indexService.getShard(0);
assertBusy(() -> {
assertFalse(shard.getTranslog().syncNeeded());
assertFalse(shard.isSyncNeeded());
});
}

Expand All @@ -275,7 +275,7 @@ public void testRescheduleAsyncFsync() throws Exception {
client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON).get();
assertNotNull(indexService.getFsyncTask());
final IndexShard shard = indexService.getShard(0);
assertBusy(() -> assertFalse(shard.getTranslog().syncNeeded()));
assertBusy(() -> assertFalse(shard.isSyncNeeded()));

client()
.admin()
Expand Down Expand Up @@ -311,7 +311,7 @@ public void testAsyncTranslogTrimActuallyWorks() throws Exception {
indexService.updateMetaData(metaData);

IndexShard shard = indexService.getShard(0);
assertBusy(() -> assertThat(shard.getTranslog().totalOperations(), equalTo(0)));
assertBusy(() -> assertThat(shard.estimateTranslogOperationsFromMinSeq(0L), equalTo(0)));
}

public void testIllegalFsyncInterval() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ class GlobalCheckpointSync extends ReplicationAction<
@Override
protected PrimaryResult performOnPrimary(
final IndexShard primary, final GlobalCheckpointSyncAction.Request request) throws Exception {
primary.getTranslog().sync();
primary.sync();
return new PrimaryResult(request, new ReplicationResponse());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ public void testSeqNoCollision() throws Exception {

final Translog.Operation op1;
final List<Translog.Operation> initOperations = new ArrayList<>(initDocs);
try (Translog.Snapshot snapshot = replica2.getTranslog().newSnapshot()) {
try (Translog.Snapshot snapshot = getTranslog(replica2).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
for (int i = 0; i < initDocs; i++) {
Translog.Operation op = snapshot.next();
Expand All @@ -347,7 +347,7 @@ public void testSeqNoCollision() throws Exception {
shards.promoteReplicaToPrimary(replica1).get(); // wait until resync completed.
shards.index(new IndexRequest(index.getName(), "type", "d2").source("{}", XContentType.JSON));
final Translog.Operation op2;
try (Translog.Snapshot snapshot = replica2.getTranslog().newSnapshot()) {
try (Translog.Snapshot snapshot = getTranslog(replica2).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(initDocs + 2));
op2 = snapshot.next();
assertThat(op2.seqNo(), equalTo(op1.seqNo()));
Expand All @@ -362,7 +362,7 @@ public void testSeqNoCollision() throws Exception {
shards.promoteReplicaToPrimary(replica2);
logger.info("--> Recover replica3 from replica2");
recoverReplica(replica3, replica2);
try (Translog.Snapshot snapshot = replica3.getTranslog().newSnapshot()) {
try (Translog.Snapshot snapshot = getTranslog(replica3).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
assertThat(snapshot.next(), equalTo(op2));
assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations));
Expand Down Expand Up @@ -468,7 +468,7 @@ private static void assertNoOpTranslogOperationForDocumentFailure(
long expectedPrimaryTerm,
String failureMessage) throws IOException {
for (IndexShard indexShard : replicationGroup) {
try(Translog.Snapshot snapshot = indexShard.getTranslog().newSnapshot()) {
try(Translog.Snapshot snapshot = getTranslog(indexShard).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(expectedOperation));
long expectedSeqNo = 0L;
Translog.Operation op = snapshot.next();
Expand Down
Loading

0 comments on commit 45c6c20

Please sign in to comment.