Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RCI] Add NoOpEngine for closed indices #33903

Merged
merged 12 commits into from
Sep 26, 2018
149 changes: 149 additions & 0 deletions server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.LongSupplier;
import java.util.stream.Stream;

/**
* NoOpEngine is an engine implementation that does nothing but the bare minimum
* required in order to have an engine. All attempts to do something (search,
* index, get), throw {@link UnsupportedOperationException}. This does maintain
* a translog with a deletion policy so that when flushing, no translog is
* retained on disk (setting a retention size and age of 0).
*
* It's also important to notice that this does list the commits of the Store's
* Directory so that the last commit's user data can be read for the historyUUID
* and last committed segment info.
*/
public final class NoOpEngine extends ReadOnlyEngine {

public NoOpEngine(EngineConfig engineConfig) {
super(engineConfig, null, null, true, directoryReader -> directoryReader);
boolean success = false;
try {
// The deletion policy for the translog should not keep any translogs around, so the min age/size is set to -1
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1);

// The translog is opened and closed to validate that the translog UUID from lucene is the same as the one in the translog
try (Translog translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier())) {
final int nbOperations = translog.totalOperations();
if (nbOperations != 0) {
throw new IllegalArgumentException("Expected 0 translog operations but there were " + nbOperations);
}
}
success = true;
} catch (IOException | TranslogCorruptedException e) {
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(this);
}
}
}

@Override
protected DirectoryReader open(final Directory directory) throws IOException {
final List<IndexCommit> indexCommits = DirectoryReader.listCommits(directory);
assert indexCommits.size() == 1 : "expected only one commit point";
IndexCommit indexCommit = indexCommits.get(indexCommits.size() - 1);
return new DirectoryReader(directory, new LeafReader[0]) {
@Override
protected DirectoryReader doOpenIfChanged() throws IOException {
return null;
}

@Override
protected DirectoryReader doOpenIfChanged(IndexCommit commit) throws IOException {
return null;
}

@Override
protected DirectoryReader doOpenIfChanged(IndexWriter writer, boolean applyAllDeletes) throws IOException {
return null;
}

@Override
public long getVersion() {
return 0;
}

@Override
public boolean isCurrent() throws IOException {
return true;
}

@Override
public IndexCommit getIndexCommit() throws IOException {
return indexCommit;
}

@Override
protected void doClose() throws IOException {
}

@Override
public CacheHelper getReaderCacheHelper() {
return null;
}
};
}

private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier) throws IOException {
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final String translogUUID = loadTranslogUUIDFromLastCommit();
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier,
engineConfig.getPrimaryTermSupplier());
}

/**
* Reads the current stored translog ID from the last commit data.
*/
@Nullable
private String loadTranslogUUIDFromLastCommit() {
final Map<String, String> commitUserData = getLastCommittedSegmentInfos().getUserData();
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
throw new IllegalStateException("Commit doesn't contain translog generation id");
}
return commitUserData.get(Translog.TRANSLOG_UUID_KEY);
}

@Override
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) {
throw new UnsupportedOperationException("Translog synchronization should never be needed");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
import org.apache.lucene.search.IndexSearcher;
Expand Down Expand Up @@ -56,7 +57,7 @@
*
* @see #ReadOnlyEngine(EngineConfig, SeqNoStats, TranslogStats, boolean, Function)
*/
public final class ReadOnlyEngine extends Engine {
public class ReadOnlyEngine extends Engine {

private final SegmentInfos lastCommittedSegmentInfos;
private final SeqNoStats seqNoStats;
Expand Down Expand Up @@ -95,15 +96,15 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory);
this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats;
this.seqNoStats = seqNoStats == null ? buildSeqNoStats(lastCommittedSegmentInfos) : seqNoStats;
reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(directory), config.getShardId());
reader = ElasticsearchDirectoryReader.wrap(open(directory), config.getShardId());
if (config.getIndexSettings().isSoftDeleteEnabled()) {
reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD);
}
reader = readerWrapperFunction.apply(reader);
this.indexCommit = reader.getIndexCommit();
this.searcherManager = new SearcherManager(reader,
new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService()));
this.docsStats = docsStats(reader);
this.docsStats = docsStats(lastCommittedSegmentInfos);
this.indexWriterLock = indexWriterLock;
success = true;
} finally {
Expand All @@ -116,6 +117,10 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
}
}

protected DirectoryReader open(final Directory directory) throws IOException {
return DirectoryReader.open(directory);
}

@Override
protected void closeNoLock(String reason, CountDownLatch closedLatch) {
if (isClosed.compareAndSet(false, true)) {
Expand All @@ -129,6 +134,24 @@ protected void closeNoLock(String reason, CountDownLatch closedLatch) {
}
}

private DocsStats docsStats(final SegmentInfos lastCommittedSegmentInfos) {
long numDocs = 0;
long numDeletedDocs = 0;
long sizeInBytes = 0;
if (lastCommittedSegmentInfos != null) {
for (SegmentCommitInfo segmentCommitInfo : lastCommittedSegmentInfos) {
numDocs += segmentCommitInfo.info.maxDoc() - segmentCommitInfo.getDelCount() - segmentCommitInfo.getSoftDelCount();
numDeletedDocs += segmentCommitInfo.getDelCount() + segmentCommitInfo.getSoftDelCount();
try {
sizeInBytes += segmentCommitInfo.sizeInBytes();
} catch (IOException e) {
throw new UncheckedIOException("Failed to get size for [" + segmentCommitInfo.info.name + "]", e);
}
}
}
return new DocsStats(numDocs, numDeletedDocs, sizeInBytes);
}

public static SeqNoStats buildSeqNoStats(SegmentInfos infos) {
final SequenceNumbers.CommitInfo seqNoStats =
SequenceNumbers.loadSeqNoInfoFromLuceneCommit(infos.userData.entrySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.engine.NoOpEngine;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
Expand Down Expand Up @@ -500,6 +501,12 @@ private synchronized IndexService createIndexService(final String reason,
}

private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
final IndexMetaData indexMetaData = idxSettings.getIndexMetaData();
if (indexMetaData != null && indexMetaData.getState() == IndexMetaData.State.CLOSE) {
// NoOpEngine takes precedence as long as the index is closed
return NoOpEngine::new;
}

final List<Optional<EngineFactory>> engineFactories =
engineFactoryProviders
.stream()
Expand Down
Loading