diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index aa282296f844f..cdecb35ae253b 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -46,6 +46,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.lucene.LoggerInfoStream; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; @@ -83,6 +84,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; +import java.util.function.LongSupplier; public class InternalEngine extends Engine { @@ -116,7 +118,6 @@ public class InternalEngine extends Engine { private final SequenceNumbersService seqNoService; static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint"; - static final String GLOBAL_CHECKPOINT_KEY = "global_checkpoint"; static final String MAX_SEQ_NO = "max_seq_no"; // How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges @@ -153,7 +154,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig); try { writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG); - final SeqNoStats seqNoStats = loadSeqNoStatsFromCommit(writer); + final SeqNoStats seqNoStats = loadSeqNoStats(engineConfig, writer); if (logger.isTraceEnabled()) { logger.trace( "recovering max sequence number: [{}], local checkpoint: [{}], global checkpoint: [{}]", @@ -169,7 +170,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { seqNoStats.getLocalCheckpoint(), seqNoStats.getGlobalCheckpoint()); indexWriter = writer; - translog = openTranslog(engineConfig, writer); + translog = openTranslog(engineConfig, writer, seqNoService::getGlobalCheckpoint); assert translog.getGeneration() != null; } catch (IOException | TranslogCorruptedException e) { throw new EngineCreationFailureException(shardId, "failed to create engine", e); @@ -257,7 +258,8 @@ private void recoverFromTranslog(TranslogRecoveryPerformer handler) throws IOExc } } - private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer) throws IOException { + private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, LongSupplier globalCheckpointSupplier) throws IOException { + assert openMode != null; final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); Translog.TranslogGeneration generation = null; if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { @@ -266,11 +268,11 @@ private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer) thr if (generation == null) { throw new IllegalStateException("no translog generation present in commit data but translog is expected to exist"); } - if (generation != null && generation.translogUUID == null) { + if (generation.translogUUID == null) { throw new IndexFormatTooOldException("trasnlog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first"); } } - final Translog translog = new Translog(translogConfig, generation); + final Translog translog = new Translog(translogConfig, generation, globalCheckpointSupplier); if (generation == null || generation.translogUUID == null) { assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "OpenMode must not be " + EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG; @@ -322,21 +324,37 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer) return null; } - private SeqNoStats loadSeqNoStatsFromCommit(IndexWriter writer) throws IOException { + /** + * Reads the sequence number stats from the Lucene commit point (maximum sequence number and local checkpoint) and the Translog + * checkpoint (global checkpoint). + * + * @param engineConfig the engine configuration (for the open mode and the translog path) + * @param writer the index writer (for the Lucene commit point) + * @return the sequence number stats + * @throws IOException if an I/O exception occurred reading the Lucene commit point or the translog checkpoint + */ + private static SeqNoStats loadSeqNoStats(final EngineConfig engineConfig, final IndexWriter writer) throws IOException { long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; - long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; for (Map.Entry entry : writer.getLiveCommitData()) { final String key = entry.getKey(); if (key.equals(LOCAL_CHECKPOINT_KEY)) { + assert localCheckpoint == SequenceNumbersService.NO_OPS_PERFORMED; localCheckpoint = Long.parseLong(entry.getValue()); - } else if (key.equals(GLOBAL_CHECKPOINT_KEY)) { - globalCheckpoint = Long.parseLong(entry.getValue()); } else if (key.equals(MAX_SEQ_NO)) { + assert maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : localCheckpoint; maxSeqNo = Long.parseLong(entry.getValue()); } } + // nocommit: reading this should be part of recovery from the translog + final long globalCheckpoint; + if (engineConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { + globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath()); + } else { + globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; + } + return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint); } @@ -1312,25 +1330,21 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn final String translogFileGen = Long.toString(translogGeneration.translogFileGeneration); final String translogUUID = translogGeneration.translogUUID; final String localCheckpoint = Long.toString(seqNoService().getLocalCheckpoint()); - final String globalCheckpoint = Long.toString(seqNoService().getGlobalCheckpoint()); writer.setLiveCommitData(() -> { - /** - * The user data captured above (e.g. local/global checkpoints) contains data that must be evaluated - * *before* Lucene flushes segments, including the local and global checkpoints amongst other values. - * The maximum sequence number is different - we never want the maximum sequence number to be - * less than the last sequence number to go into a Lucene commit, otherwise we run the risk - * of re-using a sequence number for two different documents when restoring from this commit - * point and subsequently writing new documents to the index. Since we only know which Lucene - * documents made it into the final commit after the {@link IndexWriter#commit()} call flushes - * all documents, we defer computation of the max_seq_no to the time of invocation of the commit - * data iterator (which occurs after all documents have been flushed to Lucene). + /* + * The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes + * segments, including the local checkpoint amongst other values. The maximum sequence number is different - we never want + * the maximum sequence number to be less than the last sequence number to go into a Lucene commit, otherwise we run the + * risk of re-using a sequence number for two different documents when restoring from this commit point and subsequently + * writing new documents to the index. Since we only know which Lucene documents made it into the final commit after the + * {@link IndexWriter#commit()} call flushes all documents, we defer computation of the max_seq_no to the time of invocation + * of the commit data iterator (which occurs after all documents have been flushed to Lucene). */ final Map commitData = new HashMap<>(6); commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen); commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID); commitData.put(LOCAL_CHECKPOINT_KEY, localCheckpoint); - commitData.put(GLOBAL_CHECKPOINT_KEY, globalCheckpoint); if (syncId != null) { commitData.put(Engine.SYNC_COMMIT_ID, syncId); } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index 9b555040751f6..ffd4fd8504ee9 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -41,6 +41,8 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.io.UncheckedIOException; +import java.io.UnsupportedEncodingException; public class GlobalCheckpointSyncAction extends TransportReplicationAction { @@ -64,10 +66,11 @@ protected ReplicationResponse newResponseInstance() { } @Override - protected PrimaryResult shardOperationOnPrimary(PrimaryRequest request) { + protected PrimaryResult shardOperationOnPrimary(PrimaryRequest request) throws Exception { IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); IndexShard indexShard = indexService.getShard(request.shardId().id()); long checkpoint = indexShard.getGlobalCheckpoint(); + syncTranslog(indexShard); return new PrimaryResult(new ReplicaRequest(request, checkpoint), new ReplicationResponse()); } @@ -76,9 +79,19 @@ protected ReplicaResult shardOperationOnReplica(ReplicaRequest request) { IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); IndexShard indexShard = indexService.getShard(request.shardId().id()); indexShard.updateGlobalCheckpointOnReplica(request.checkpoint); + syncTranslog(indexShard); return new ReplicaResult(); } + private void syncTranslog(final IndexShard indexShard) { + try { + indexShard.getTranslog().sync(); + } catch (final IOException e) { + // nocommit: no need to wrap this exception after integrating master into feature/seq_no + throw new UncheckedIOException("failed to sync translog after updating global checkpoint for shard " + indexShard.shardId(), e); + } + } + public void updateCheckpointForShard(ShardId shardId) { execute(new PrimaryRequest(shardId), new ActionListener() { @Override @@ -135,4 +148,5 @@ public long getCheckpoint() { return checkpoint; } } + } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java b/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java index ac657c1768315..157fea98d91a5 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.elasticsearch.index.seqno; import org.elasticsearch.common.io.stream.StreamInput; @@ -88,4 +89,5 @@ public String toString() { ", globalCheckpoint=" + globalCheckpoint + '}'; } + } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index 9d6e54cc7c1a2..09ed72fd92a61 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -40,26 +40,16 @@ public class SequenceNumbersService extends AbstractIndexShardComponent { final GlobalCheckpointService globalCheckpointService; /** - * Initialize the sequence number service. The {@code maxSeqNo} - * should be set to the last sequence number assigned by this - * shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}, - * {@code localCheckpoint} should be set to the last known local - * checkpoint for this shard, or - * {@link SequenceNumbersService#NO_OPS_PERFORMED}, and - * {@code globalCheckpoint} should be set to the last known global - * checkpoint for this shard, or - * {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}. + * Initialize the sequence number service. The {@code maxSeqNo} should be set to the last sequence number assigned by this shard, or + * {@link SequenceNumbersService#NO_OPS_PERFORMED}, {@code localCheckpoint} should be set to the last known local checkpoint for this + * shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}, and {@code globalCheckpoint} should be set to the last known global + * checkpoint for this shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}. * - * @param shardId the shard this service is providing tracking - * local checkpoints for - * @param indexSettings the index settings - * @param maxSeqNo the last sequence number assigned by this - * shard, or - * {@link SequenceNumbersService#NO_OPS_PERFORMED} - * @param localCheckpoint the last known local checkpoint for this shard, - * or {@link SequenceNumbersService#NO_OPS_PERFORMED} - * @param globalCheckpoint the last known global checkpoint for this shard, - * or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} + * @param shardId the shard this service is providing tracking local checkpoints for + * @param indexSettings the index settings + * @param maxSeqNo the last sequence number assigned by this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED} + * @param localCheckpoint the last known local checkpoint for this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED} + * @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} */ public SequenceNumbersService( final ShardId shardId, @@ -100,8 +90,7 @@ public void markSeqNoAsCompleted(long seqNo) { * Gets sequence number related stats */ public SeqNoStats stats() { - return new SeqNoStats(localCheckpointService.getMaxSeqNo(), localCheckpointService.getCheckpoint(), - globalCheckpointService.getCheckpoint()); + return new SeqNoStats(getMaxSeqNo(), getLocalCheckpoint(), getGlobalCheckpoint()); } /** @@ -130,6 +119,16 @@ public long getGlobalCheckpoint() { return globalCheckpointService.getCheckpoint(); } + /** + * Scans through the currently known local checkpoint and updates the global checkpoint accordingly. + * + * @return true if the checkpoint has been updated or if it can not be updated since one of the local checkpoints + * of one of the active allocations is not known. + */ + public boolean updateGlobalCheckpointOnPrimary() { + return globalCheckpointService.updateCheckpointOnPrimary(); + } + /** * updates the global checkpoint on a replica shard (after it has been updated by the primary). */ @@ -148,13 +147,4 @@ public void updateAllocationIdsFromMaster(Set activeAllocationIds, Setnull a new translog is created. If non-null - * the translog tries to open the given translog generation. The generation is treated as the last generation referenced - * form already committed data. This means all operations that have not yet been committed should be in the translog - * file referenced by this generation. The translog creation will fail if this generation can't be opened. - * - * @see TranslogConfig#getTranslogPath() + * Creates a new Translog instance. This method will create a new transaction log unless the given {@link TranslogGeneration} is + * {@code null}. If the generation is {@code null} this method is destructive and will delete all files in the translog path given. If + * the generation is not {@code null}, this method tries to open the given translog generation. The generation is treated as the last + * generation referenced from already committed data. This means all operations that have not yet been committed should be in the + * translog file referenced by this generation. The translog creation will fail if this generation can't be opened. * + * @param config the configuration of this translog + * @param translogGeneration the translog generation to open + * @param globalCheckpointSupplier a supplier for the global checkpoint */ - public Translog(TranslogConfig config, TranslogGeneration translogGeneration) throws IOException { + public Translog( + final TranslogConfig config, + final TranslogGeneration translogGeneration, + final LongSupplier globalCheckpointSupplier) throws IOException { super(config.getShardId(), config.getIndexSettings()); this.config = config; + this.globalCheckpointSupplier = globalCheckpointSupplier; if (translogGeneration == null || translogGeneration.translogUUID == null) { // legacy case translogUUID = UUIDs.randomBase64UUID(); } else { @@ -157,7 +160,7 @@ public Translog(TranslogConfig config, TranslogGeneration translogGeneration) th try { if (translogGeneration != null) { - final Checkpoint checkpoint = readCheckpoint(); + final Checkpoint checkpoint = readCheckpoint(location); final Path nextTranslogFile = location.resolve(getFilename(checkpoint.generation + 1)); final Path currentCheckpointFile = location.resolve(getCommitCheckpointFileName(checkpoint.generation)); // this is special handling for error condition when we create a new writer but we fail to bake @@ -195,7 +198,7 @@ public Translog(TranslogConfig config, TranslogGeneration translogGeneration) th logger.debug("wipe translog location - creating new translog"); Files.createDirectories(location); final long generation = 1; - Checkpoint checkpoint = new Checkpoint(0, 0, generation); + Checkpoint checkpoint = new Checkpoint(0, 0, generation, globalCheckpointSupplier.getAsLong()); final Path checkpointFile = location.resolve(CHECKPOINT_FILE_NAME); Checkpoint.write(getChannelFactory(), checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); IOUtils.fsync(checkpointFile, false); @@ -372,11 +375,25 @@ private long sizeInBytes(long minGeneration) { } + /** + * Creates a new translog for the specified generation. + * + * @param fileGeneration the translog generation + * @return a writer for the new translog + * @throws IOException if creating the translog failed + */ TranslogWriter createWriter(long fileGeneration) throws IOException { - TranslogWriter newFile; + final TranslogWriter newFile; try { - newFile = TranslogWriter.create(shardId, translogUUID, fileGeneration, location.resolve(getFilename(fileGeneration)), getChannelFactory(), config.getBufferSize()); - } catch (IOException e) { + newFile = TranslogWriter.create( + shardId, + translogUUID, + fileGeneration, + location.resolve(getFilename(fileGeneration)), + getChannelFactory(), + config.getBufferSize(), + globalCheckpointSupplier); + } catch (final IOException e) { throw new TranslogException(shardId, "failed to create new translog file", e); } return newFile; @@ -441,6 +458,16 @@ public Location getLastWriteLocation() { } } + /** + * The last synced checkpoint for this translog. + * + * @return the last synced checkpoint + */ + public long getLastSyncedGlobalCheckpoint() { + try (final ReleasableLock ignored = readLock.acquire()) { + return current.getLastSyncedCheckpoint().globalCheckpoint; + } + } /** * Snapshots the current transaction log allowing to safely iterate over the snapshot. @@ -531,7 +558,7 @@ public boolean ensureSynced(Location location) throws IOException { /** * Ensures that all locations in the given stream have been synced / written to the underlying storage. - * This method allows for internal optimization to minimize the amout of fsync operations if multiple + * This method allows for internal optimization to minimize the amount of fsync operations if multiple * locations must be synced. * * @return Returns true iff this call caused an actual sync operation otherwise false @@ -1356,10 +1383,21 @@ public Exception getTragicException() { } /** Reads and returns the current checkpoint */ - final Checkpoint readCheckpoint() throws IOException { + static final Checkpoint readCheckpoint(final Path location) throws IOException { return Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME)); } + /** + * Reads the sequence numbers global checkpoint from the translog checkpoint. + * + * @param location the location of the translog + * @return the global checkpoint + * @throws IOException if an I/O exception occurred reading the checkpoint + */ + public static final long readGlobalCheckpoint(final Path location) throws IOException { + return readCheckpoint(location).globalCheckpoint; + } + /** * Returns the translog uuid used to associate a lucene index with a translog. */ diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java index 6367761c143d6..61b009d9b4175 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java @@ -19,15 +19,11 @@ package org.elasticsearch.index.translog; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.translog.Translog.TranslogGeneration; -import org.elasticsearch.threadpool.ThreadPool; import java.nio.file.Path; diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java index 41a430d0c82ab..d696b815b9764 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java @@ -151,10 +151,6 @@ protected void readBytes(ByteBuffer buffer, long position) throws IOException { Channels.readFromFileChannelWithEofException(channel, position, buffer); } - public Checkpoint getInfo() { - return new Checkpoint(length, totalOperations, getGeneration()); - } - @Override public final void close() throws IOException { if (closed.compareAndSet(false, true)) { diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index bb4a84651c561..6870643d20beb 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -38,6 +38,7 @@ import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.LongSupplier; public class TranslogWriter extends BaseTranslogReader implements Closeable { @@ -48,8 +49,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { private final ShardId shardId; private final ChannelFactory channelFactory; - /* the offset in bytes that was written when the file was last synced*/ - private volatile long lastSyncedOffset; + // the last checkpoint that was written when the translog was last synced + private volatile Checkpoint lastSyncedCheckpoint; /* the number of translog operations written to this file */ private volatile int operationCounter; /* if we hit an exception that we can't recover from we assign it to this var and ship it with every AlreadyClosedException we throw */ @@ -59,17 +60,27 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { /* the total offset of this file including the bytes written to the file as well as into the buffer */ private volatile long totalOffset; + private final LongSupplier globalCheckpointSupplier; + protected final AtomicBoolean closed = new AtomicBoolean(false); // lock order synchronized(syncLock) -> synchronized(this) private final Object syncLock = new Object(); - public TranslogWriter(ChannelFactory channelFactory, ShardId shardId, long generation, FileChannel channel, Path path, ByteSizeValue bufferSize) throws IOException { - super(generation, channel, path, channel.position()); + public TranslogWriter( + final ChannelFactory channelFactory, + final ShardId shardId, + final Checkpoint initialCheckpoint, + final FileChannel channel, + final Path path, + final ByteSizeValue bufferSize, + final LongSupplier globalCheckpointSupplier) throws IOException { + super(initialCheckpoint.generation, channel, path, channel.position()); this.shardId = shardId; this.channelFactory = channelFactory; this.outputStream = new BufferedChannelOutputStream(java.nio.channels.Channels.newOutputStream(channel), bufferSize.bytesAsInt()); - this.lastSyncedOffset = channel.position(); - totalOffset = lastSyncedOffset; + this.lastSyncedCheckpoint = initialCheckpoint; + this.totalOffset = initialCheckpoint.offset; + this.globalCheckpointSupplier = globalCheckpointSupplier; } static int getHeaderLength(String translogUUID) { @@ -86,7 +97,14 @@ static void writeHeader(OutputStreamDataOutput out, BytesRef ref) throws IOExcep out.writeBytes(ref.bytes, ref.offset, ref.length); } - public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory, ByteSizeValue bufferSize) throws IOException { + public static TranslogWriter create( + ShardId shardId, + String translogUUID, + long fileGeneration, + Path file, + ChannelFactory channelFactory, + ByteSizeValue bufferSize, + final LongSupplier globalCheckpointSupplier) throws IOException { final BytesRef ref = new BytesRef(translogUUID); final int headerLength = getHeaderLength(ref.length); final FileChannel channel = channelFactory.open(file); @@ -96,8 +114,10 @@ public static TranslogWriter create(ShardId shardId, String translogUUID, long f final OutputStreamDataOutput out = new OutputStreamDataOutput(java.nio.channels.Channels.newOutputStream(channel)); writeHeader(out, ref); channel.force(true); - writeCheckpoint(channelFactory, headerLength, 0, file.getParent(), fileGeneration); - final TranslogWriter writer = new TranslogWriter(channelFactory, shardId, fileGeneration, channel, file, bufferSize); + final Checkpoint checkpoint = + writeCheckpoint(channelFactory, headerLength, 0, globalCheckpointSupplier.getAsLong(), file.getParent(), fileGeneration); + final TranslogWriter writer = + new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, globalCheckpointSupplier); return writer; } catch (Exception exception) { // if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that @@ -163,7 +183,7 @@ public void sync() throws IOException { * returns true if there are buffered ops */ public boolean syncNeeded() { - return totalOffset != lastSyncedOffset; + return totalOffset != lastSyncedCheckpoint.offset || globalCheckpointSupplier.getAsLong() != lastSyncedCheckpoint.globalCheckpoint; } @Override @@ -200,7 +220,8 @@ public TranslogReader closeIntoReader() throws IOException { if (closed.compareAndSet(false, true)) { boolean success = false; try { - final TranslogReader reader = new TranslogReader(generation, channel, path, firstOperationOffset, getWrittenOffset(), operationCounter); + final TranslogReader reader = + new TranslogReader(generation, channel, path, firstOperationOffset, getWrittenOffset(), operationCounter); success = true; return reader; } finally { @@ -244,19 +265,21 @@ private long getWrittenOffset() throws IOException { * @return true if this call caused an actual sync operation */ public boolean syncUpTo(long offset) throws IOException { - if (lastSyncedOffset < offset && syncNeeded()) { + if (lastSyncedCheckpoint.offset < offset && syncNeeded()) { synchronized (syncLock) { // only one sync/checkpoint should happen concurrently but we wait - if (lastSyncedOffset < offset && syncNeeded()) { + if (lastSyncedCheckpoint.offset < offset && syncNeeded()) { // double checked locking - we don't want to fsync unless we have to and now that we have // the lock we should check again since if this code is busy we might have fsynced enough already final long offsetToSync; final int opsCounter; + final long globalCheckpoint; synchronized (this) { ensureOpen(); try { outputStream.flush(); offsetToSync = totalOffset; opsCounter = operationCounter; + globalCheckpoint = globalCheckpointSupplier.getAsLong(); } catch (Exception ex) { try { closeWithTragicEvent(ex); @@ -268,9 +291,11 @@ public boolean syncUpTo(long offset) throws IOException { } // now do the actual fsync outside of the synchronized block such that // we can continue writing to the buffer etc. + final Checkpoint checkpoint; try { channel.force(false); - writeCheckpoint(channelFactory, offsetToSync, opsCounter, path.getParent(), generation); + checkpoint = + writeCheckpoint(channelFactory, offsetToSync, opsCounter, globalCheckpoint, path.getParent(), generation); } catch (Exception ex) { try { closeWithTragicEvent(ex); @@ -279,8 +304,9 @@ public boolean syncUpTo(long offset) throws IOException { } throw ex; } - assert lastSyncedOffset <= offsetToSync : "illegal state: " + lastSyncedOffset + " <= " + offsetToSync; - lastSyncedOffset = offsetToSync; // write protected by syncLock + assert lastSyncedCheckpoint.offset <= offsetToSync : + "illegal state: " + lastSyncedCheckpoint.offset + " <= " + offsetToSync; + lastSyncedCheckpoint = checkpoint; // write protected by syncLock return true; } } @@ -306,10 +332,26 @@ protected void readBytes(ByteBuffer targetBuffer, long position) throws IOExcept Channels.readFromFileChannelWithEofException(channel, position, targetBuffer); } - private static void writeCheckpoint(ChannelFactory channelFactory, long syncPosition, int numOperations, Path translogFile, long generation) throws IOException { + private static Checkpoint writeCheckpoint( + ChannelFactory channelFactory, + long syncPosition, + int numOperations, + long globalCheckpoint, + Path translogFile, + long generation) throws IOException { final Path checkpointFile = translogFile.resolve(Translog.CHECKPOINT_FILE_NAME); - Checkpoint checkpoint = new Checkpoint(syncPosition, numOperations, generation); + final Checkpoint checkpoint = new Checkpoint(syncPosition, numOperations, generation, globalCheckpoint); Checkpoint.write(channelFactory::open, checkpointFile, checkpoint, StandardOpenOption.WRITE); + return checkpoint; + } + + /** + * The last synced checkpoint for this translog. + * + * @return the last synced checkpoint + */ + public Checkpoint getLastSyncedCheckpoint() { + return lastSyncedCheckpoint; } protected final void ensureOpen() { diff --git a/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java b/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java index 6514cd42709d2..967876348c8e8 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java @@ -39,6 +39,7 @@ import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.seqno.SequenceNumbersService; import java.io.IOException; import java.nio.channels.Channels; @@ -166,7 +167,7 @@ protected void execute(Terminal terminal, OptionSet options, Map /** Write a checkpoint file to the given location with the given generation */ public static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration) throws IOException { - Checkpoint emptyCheckpoint = new Checkpoint(translogLength, 0, translogGeneration); + Checkpoint emptyCheckpoint = new Checkpoint(translogLength, 0, translogGeneration, SequenceNumbersService.UNASSIGNED_SEQ_NO); Checkpoint.write(FileChannel::open, filename, emptyCheckpoint, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW); // fsync with metadata here to make sure. diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 857a6c6546858..ebf4e6ad2176a 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -286,7 +286,7 @@ protected Translog createTranslog() throws IOException { protected Translog createTranslog(Path translogPath) throws IOException { TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE); - return new Translog(translogConfig, null); + return new Translog(translogConfig, null, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); } protected SnapshotDeletionPolicy createSnapshotDeletionPolicy() { @@ -599,10 +599,7 @@ public SequenceNumbersService seqNoService() { assertThat( Long.parseLong(stats1.getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), equalTo(SequenceNumbersService.NO_OPS_PERFORMED)); - assertThat(stats1.getUserData(), hasKey(InternalEngine.GLOBAL_CHECKPOINT_KEY)); - assertThat( - Long.parseLong(stats1.getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), - equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + assertThat(stats1.getUserData(), hasKey(InternalEngine.MAX_SEQ_NO)); assertThat( Long.parseLong(stats1.getUserData().get(InternalEngine.MAX_SEQ_NO)), @@ -628,10 +625,6 @@ public SequenceNumbersService seqNoService() { not(equalTo(stats1.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); assertThat(stats2.getUserData().get(Translog.TRANSLOG_UUID_KEY), equalTo(stats1.getUserData().get(Translog.TRANSLOG_UUID_KEY))); assertThat(Long.parseLong(stats2.getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), equalTo(localCheckpoint.get())); - assertThat(stats2.getUserData(), hasKey(InternalEngine.GLOBAL_CHECKPOINT_KEY)); - assertThat( - Long.parseLong(stats2.getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), - equalTo(globalCheckpoint.get())); assertThat(stats2.getUserData(), hasKey(InternalEngine.MAX_SEQ_NO)); assertThat(Long.parseLong(stats2.getUserData().get(InternalEngine.MAX_SEQ_NO)), equalTo(maxSeqNo.get())); } finally { @@ -1702,13 +1695,13 @@ public void testSeqNoAndCheckpoints() throws IOException { if (rarely()) { localCheckpoint = primarySeqNo; maxSeqNo = primarySeqNo; - globalCheckpoint = replicaLocalCheckpoint; initialEngine.seqNoService().updateGlobalCheckpointOnPrimary(); initialEngine.flush(true, true); } } initialEngine.seqNoService().updateGlobalCheckpointOnPrimary(); + globalCheckpoint = initialEngine.seqNoService().getGlobalCheckpoint(); assertEquals(primarySeqNo, initialEngine.seqNoService().getMaxSeqNo()); assertThat(initialEngine.seqNoService().stats().getMaxSeqNo(), equalTo(primarySeqNo)); @@ -1718,8 +1711,9 @@ public void testSeqNoAndCheckpoints() throws IOException { assertThat( Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), equalTo(localCheckpoint)); + initialEngine.getTranslog().sync(); // to guarantee the global checkpoint is written to the translog checkpoint assertThat( - Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), + initialEngine.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint)); assertThat( Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.MAX_SEQ_NO)), @@ -1739,7 +1733,7 @@ public void testSeqNoAndCheckpoints() throws IOException { Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), equalTo(primarySeqNo)); assertThat( - Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), + recoveringEngine.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint)); assertThat( Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.MAX_SEQ_NO)), @@ -2344,8 +2338,10 @@ public void testRecoverFromForeignTranslog() throws IOException { Translog.TranslogGeneration generation = engine.getTranslog().getGeneration(); engine.close(); - Translog translog = new Translog(new TranslogConfig(shardId, createTempDir(), INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE) - , null); + Translog translog = new Translog( + new TranslogConfig(shardId, createTempDir(), INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), + null, + () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); translog.add(new Translog.Index("test", "SomeBogusId", "{}".getBytes(Charset.forName("UTF-8")))); assertEquals(generation.translogFileGeneration, translog.currentFileGeneration()); translog.close(); diff --git a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java new file mode 100644 index 0000000000000..246ecdc154b2c --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ + +package org.elasticsearch.index.seqno; + +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.CapturingTransport; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.HashSet; + +import static org.elasticsearch.mock.orig.Mockito.when; +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class GlobalCheckpointSyncActionTests extends ESTestCase { + + private ThreadPool threadPool; + private Transport transport; + private ClusterService clusterService; + private TransportService transportService; + private ShardStateAction shardStateAction; + + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool(getClass().getName()); + transport = new CapturingTransport(); + clusterService = createClusterService(threadPool); + transportService = new TransportService(clusterService.getSettings(), transport, threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR); + transportService.start(); + transportService.acceptIncomingRequests(); + shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool); + } + + public void tearDown() throws Exception { + try { + IOUtils.close(transportService, clusterService, transport); + } finally { + terminate(threadPool); + } + super.tearDown(); + } + + public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception { + final IndicesService indicesService = mock(IndicesService.class); + + final Index index = new Index("index", "uuid"); + final IndexService indexService = mock(IndexService.class); + when(indicesService.indexServiceSafe(index)).thenReturn(indexService); + + final int id = randomIntBetween(0, 4); + final IndexShard indexShard = mock(IndexShard.class); + when(indexService.getShard(id)).thenReturn(indexShard); + + final Translog translog = mock(Translog.class); + when(indexShard.getTranslog()).thenReturn(translog); + + final GlobalCheckpointSyncAction action = new GlobalCheckpointSyncAction( + Settings.EMPTY, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + new ActionFilters(Collections.emptySet()), + new IndexNameExpressionResolver(Settings.EMPTY)); + final ShardId shardId = new ShardId(index, id); + final GlobalCheckpointSyncAction.PrimaryRequest primaryRequest = new GlobalCheckpointSyncAction.PrimaryRequest(shardId); + if (randomBoolean()) { + action.shardOperationOnPrimary(primaryRequest); + } else { + action.shardOperationOnReplica(new GlobalCheckpointSyncAction.ReplicaRequest(primaryRequest, randomPositiveLong())); + } + + verify(translog).sync(); + } + +} diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index e25570f429962..b4a5c2970ff42 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -48,6 +48,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog.Location; import org.elasticsearch.test.ESTestCase; @@ -85,6 +86,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.LongSupplier; import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; @@ -97,6 +99,7 @@ public class TranslogTests extends ESTestCase { protected final ShardId shardId = new ShardId("index", "_na_", 1); protected Translog translog; + private AtomicLong globalCheckpoint; protected Path translogDir; @Override @@ -136,7 +139,8 @@ public void tearDown() throws Exception { } private Translog create(Path path) throws IOException { - return new Translog(getTranslogConfig(path), null); + globalCheckpoint = new AtomicLong(SequenceNumbersService.UNASSIGNED_SEQ_NO); + return new Translog(getTranslogConfig(path), null, () -> globalCheckpoint.get()); } private TranslogConfig getTranslogConfig(Path path) { @@ -845,11 +849,16 @@ public void testBasicCheckpoint() throws IOException { List locations = new ArrayList<>(); int translogOperations = randomIntBetween(10, 100); int lastSynced = -1; + long lastSyncedGlobalCheckpoint = globalCheckpoint.get(); for (int op = 0; op < translogOperations; op++) { locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); + if (randomBoolean()) { + globalCheckpoint.set(globalCheckpoint.get() + randomIntBetween(1, 16)); + } if (frequently()) { translog.sync(); lastSynced = op; + lastSyncedGlobalCheckpoint = globalCheckpoint.get(); } } assertEquals(translogOperations, translog.totalOperations()); @@ -873,6 +882,7 @@ public void testBasicCheckpoint() throws IOException { assertNull(next); } assertEquals(translogOperations + 1, translog.totalOperations()); + assertThat(checkpoint.globalCheckpoint, equalTo(lastSyncedGlobalCheckpoint)); translog.close(); } @@ -977,7 +987,7 @@ public void testBasicRecovery() throws IOException { TranslogConfig config = translog.getConfig(); translog.close(); - translog = new Translog(config, translogGeneration); + translog = new Translog(config, translogGeneration,() -> SequenceNumbersService.UNASSIGNED_SEQ_NO); if (translogGeneration == null) { assertEquals(0, translog.stats().estimatedNumberOfOperations()); assertEquals(1, translog.currentFileGeneration()); @@ -1018,7 +1028,7 @@ public void testRecoveryUncommitted() throws IOException { // we intentionally don't close the tlog that is in the prepareCommit stage since we try to recovery the uncommitted // translog here as well. TranslogConfig config = translog.getConfig(); - try (Translog translog = new Translog(config, translogGeneration)) { + try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); @@ -1031,7 +1041,7 @@ public void testRecoveryUncommitted() throws IOException { } } if (randomBoolean()) { // recover twice - try (Translog translog = new Translog(config, translogGeneration)) { + try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); @@ -1072,7 +1082,7 @@ public void testRecoveryUncommittedFileExists() throws IOException { Checkpoint read = Checkpoint.read(ckp); Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation))); - try (Translog translog = new Translog(config, translogGeneration)) { + try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); @@ -1087,7 +1097,7 @@ public void testRecoveryUncommittedFileExists() throws IOException { } if (randomBoolean()) { // recover twice - try (Translog translog = new Translog(config, translogGeneration)) { + try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); @@ -1123,15 +1133,15 @@ public void testRecoveryUncommittedCorruptedCheckpoint() throws IOException { TranslogConfig config = translog.getConfig(); Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME); Checkpoint read = Checkpoint.read(ckp); - Checkpoint corrupted = new Checkpoint(0, 0, 0); + Checkpoint corrupted = new Checkpoint(0, 0, 0, SequenceNumbersService.UNASSIGNED_SEQ_NO); Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); - try (Translog translog = new Translog(config, translogGeneration)) { + try (Translog ignored = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { fail("corrupted"); } catch (IllegalStateException ex) { - assertEquals(ex.getMessage(), "Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3178, numOps=55, translogFileGeneration= 2} but got: Checkpoint{offset=0, numOps=0, translogFileGeneration= 0}"); + assertEquals(ex.getMessage(), "Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3178, numOps=55, translogFileGeneration=2, globalCheckpoint=-2} but got: Checkpoint{offset=0, numOps=0, translogFileGeneration=0, globalCheckpoint=-2}"); } Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING); - try (Translog translog = new Translog(config, translogGeneration)) { + try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); @@ -1205,12 +1215,12 @@ public void testOpenForeignTranslog() throws IOException { Translog.TranslogGeneration generation = new Translog.TranslogGeneration(randomRealisticUnicodeOfCodepointLengthBetween(1, translogGeneration.translogUUID.length()), translogGeneration.translogFileGeneration); try { - new Translog(config, generation); + new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); fail("translog doesn't belong to this UUID"); } catch (TranslogCorruptedException ex) { } - this.translog = new Translog(config, translogGeneration); + this.translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); Translog.Snapshot snapshot = this.translog.newSnapshot(); for (int i = firstUncommitted; i < translogOperations; i++) { Translog.Operation next = snapshot.next(); @@ -1381,7 +1391,7 @@ public void testFailFlush() throws IOException { assertFalse(translog.isOpen()); translog.close(); // we are closed - try (Translog tlog = new Translog(config, translogGeneration)) { + try (Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, tlog.currentFileGeneration()); assertFalse(tlog.syncNeeded()); @@ -1508,7 +1518,7 @@ protected void afterAdd() throws IOException { iterator.remove(); } } - try (Translog tlog = new Translog(config, translog.getGeneration())) { + try (Translog tlog = new Translog(config, translog.getGeneration(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { Translog.Snapshot snapshot = tlog.newSnapshot(); if (writtenOperations.size() != snapshot.totalOperations()) { for (int i = 0; i < threadCount; i++) { @@ -1563,7 +1573,7 @@ public void onceFailedFailAlways() { private Translog getFailableTranslog(final FailSwitch fail, final TranslogConfig config, final boolean paritalWrites, final boolean throwUnknownException, Translog.TranslogGeneration generation) throws IOException { - return new Translog(config, generation) { + return new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO) { @Override ChannelFactory getChannelFactory() { final ChannelFactory factory = super.getChannelFactory(); @@ -1675,12 +1685,12 @@ private static final class UnknownException extends RuntimeException { public void testFailWhileCreateWriteWithRecoveredTLogs() throws IOException { Path tempDir = createTempDir(); TranslogConfig config = getTranslogConfig(tempDir); - Translog translog = new Translog(config, null); + Translog translog = new Translog(config, null, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); translog.add(new Translog.Index("test", "boom", "boom".getBytes(Charset.forName("UTF-8")))); Translog.TranslogGeneration generation = translog.getGeneration(); translog.close(); try { - new Translog(config, generation) { + new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO) { @Override protected TranslogWriter createWriter(long fileGeneration) throws IOException { throw new MockDirectoryWrapper.FakeIOException(); @@ -1703,7 +1713,7 @@ public void testRecoverWithUnbackedNextGen() throws IOException { Checkpoint read = Checkpoint.read(ckp); Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation))); Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog")); - try (Translog tlog = new Translog(config, translogGeneration)) { + try (Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { assertNotNull(translogGeneration); assertFalse(tlog.syncNeeded()); Translog.Snapshot snapshot = tlog.newSnapshot(); @@ -1714,7 +1724,7 @@ public void testRecoverWithUnbackedNextGen() throws IOException { } tlog.add(new Translog.Index("test", "" + 1, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); } - try (Translog tlog = new Translog(config, translogGeneration)) { + try (Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { assertNotNull(translogGeneration); assertFalse(tlog.syncNeeded()); Translog.Snapshot snapshot = tlog.newSnapshot(); @@ -1737,7 +1747,7 @@ public void testRecoverWithUnbackedNextGenInIllegalState() throws IOException { Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog")); try { - Translog tlog = new Translog(config, translogGeneration); + Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); fail("file already exists?"); } catch (TranslogException ex) { // all is well @@ -1758,7 +1768,7 @@ public void testRecoverWithUnbackedNextGenAndFutureFile() throws IOException { Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog")); // we add N+1 and N+2 to ensure we only delete the N+1 file and never jump ahead and wipe without the right condition Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 2) + ".tlog")); - try (Translog tlog = new Translog(config, translogGeneration)) { + try (Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { assertNotNull(translogGeneration); assertFalse(tlog.syncNeeded()); Translog.Snapshot snapshot = tlog.newSnapshot(); @@ -1771,7 +1781,7 @@ public void testRecoverWithUnbackedNextGenAndFutureFile() throws IOException { } try { - Translog tlog = new Translog(config, translogGeneration); + Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); fail("file already exists?"); } catch (TranslogException ex) { // all is well @@ -1834,7 +1844,7 @@ public void testWithRandomException() throws IOException { } catch (IOException ex) { assertEquals(ex.getMessage(), "__FAKE__ no space left on device"); } finally { - Checkpoint checkpoint = failableTLog.readCheckpoint(); + Checkpoint checkpoint = Translog.readCheckpoint(config.getTranslogPath()); if (checkpoint.numOps == unsynced.size() + syncedDocs.size()) { syncedDocs.addAll(unsynced); // failed in fsync but got fully written unsynced.clear(); @@ -1859,7 +1869,7 @@ public void testWithRandomException() throws IOException { } fail.failNever(); // we don't wanna fail here but we might since we write a new checkpoint and create a new tlog file - try (Translog translog = new Translog(config, generation)) { + try (Translog translog = new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { Translog.Snapshot snapshot = translog.newSnapshot(); assertEquals(syncedDocs.size(), snapshot.totalOperations()); for (int i = 0; i < syncedDocs.size(); i++) { @@ -1872,10 +1882,10 @@ public void testWithRandomException() throws IOException { } public void testCheckpointOnDiskFull() throws IOException { - Checkpoint checkpoint = new Checkpoint(randomLong(), randomInt(), randomLong()); + Checkpoint checkpoint = new Checkpoint(randomLong(), randomInt(), randomLong(), randomLong()); Path tempDir = createTempDir(); Checkpoint.write(FileChannel::open, tempDir.resolve("foo.cpk"), checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); - Checkpoint checkpoint2 = new Checkpoint(randomLong(), randomInt(), randomLong()); + Checkpoint checkpoint2 = new Checkpoint(randomLong(), randomInt(), randomLong(), randomLong()); try { Checkpoint.write((p, o) -> { if (randomBoolean()) { @@ -1906,14 +1916,14 @@ public void testPendingDelete() throws IOException { Translog.TranslogGeneration generation = translog.getGeneration(); TranslogConfig config = translog.getConfig(); translog.close(); - translog = new Translog(config, generation); + translog = new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); translog.add(new Translog.Index("test", "2", new byte[]{2})); translog.prepareCommit(); Translog.View view = translog.newView(); translog.add(new Translog.Index("test", "3", new byte[]{3})); translog.close(); IOUtils.close(view); - translog = new Translog(config, generation); + translog = new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); } public static Translog.Location randomTranslogLocation() { diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java index 8ae7117d4838e..9a9ba438a1bb6 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java @@ -87,7 +87,7 @@ public void testTruncatedTranslog() throws Exception { public TranslogReader openReader(Path path, long id) throws IOException { FileChannel channel = FileChannel.open(path, StandardOpenOption.READ); try { - TranslogReader reader = TranslogReader.open(channel, path, new Checkpoint(Files.size(path), 1, id), null); + TranslogReader reader = TranslogReader.open(channel, path, new Checkpoint(Files.size(path), 1, id, 0), null); channel = null; return reader; } finally {