Skip to content

Commit

Permalink
Add test
Browse files Browse the repository at this point in the history
  • Loading branch information
fcofdez committed Jun 19, 2024
1 parent 523ca22 commit 5790fc6
Showing 1 changed file with 50 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
package org.elasticsearch.action.bulk;

import org.apache.lucene.tests.mockfile.FilterFileChannel;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.translog.ChannelFactory;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESSingleNodeTestCase;

import java.io.IOException;
import java.nio.channels.FileChannel;
Expand All @@ -24,13 +27,18 @@
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

public class BulkAfterWriteFsyncFailureIT extends ESIntegTestCase {
import static org.elasticsearch.index.IndexSettings.INDEX_REFRESH_INTERVAL_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;

public class BulkAfterWriteFsyncFailureIT extends ESSingleNodeTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(TestEnginePlugin.class);
protected Collection<Class<? extends Plugin>> getPlugins() {
return List.of(FailingFsyncEnginePlugin.class);
}

public static class TestEnginePlugin extends Plugin implements EnginePlugin {
public static class FailingFsyncEnginePlugin extends Plugin implements EnginePlugin {
static final AtomicBoolean simulateFsyncFailure = new AtomicBoolean(false);

@Override
Expand All @@ -41,8 +49,8 @@ protected ChannelFactory getTranslogChannelFactory() {
return (file, openOption) -> new FilterFileChannel(FileChannel.open(file, openOption)) {
@Override
public void force(boolean metaData) throws IOException {
if (simulateFsyncFailure.get()) {
throw new IOException("Simulated failure");
if (simulateFsyncFailure.compareAndSet(true, false)) {
throw new IOException("Simulated fsync failure");
} else {
super.force(metaData);
}
Expand All @@ -53,12 +61,42 @@ public void force(boolean metaData) throws IOException {
}
}

public void testFsyncFailure() {
internalCluster().startDataOnlyNode();
public void testFsyncFailureDoesNotAdvanceLocalCheckpoints() {
String indexName = randomIdentifier();
createIndex(indexName);
client().admin()
.indices()
.prepareCreate(indexName)
.setSettings(
Settings.builder()
.put(INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
)
.get();
ensureGreen(indexName);

FailingFsyncEnginePlugin.simulateFsyncFailure.set(true);
var localCheckpointBeforeBulk = getLocalCheckpointForShard(indexName, 0);
var bulkResponse = client().prepareBulk().add(prepareIndex(indexName).setId("1").setSource("key", "foo", "val", 10)).get();
assertTrue(bulkResponse.hasFailures());
var localCheckpointAfterFailedBulk = getLocalCheckpointForShard(indexName, 0);
// fsync for the translog failed, hence the checkpoint doesn't advance
assertThat(localCheckpointBeforeBulk, equalTo(localCheckpointAfterFailedBulk));

// Since background refreshes are disabled, the shard is considered green until the next operation is appended into the translog
ensureGreen(indexName);
TestEnginePlugin.simulateFsyncFailure.set(true);
var response = indexDoc(indexName, "1", "field", "foo");

var bulkResponse2 = client().prepareBulk().add(prepareIndex(indexName).setId("2").setSource("key", "bar", "val", 20)).get();
assertFalse(bulkResponse2.hasFailures());

var localCheckpointAfterSuccessfulBulk = getLocalCheckpointForShard(indexName, 0);
assertThat(localCheckpointAfterSuccessfulBulk, is(greaterThan(localCheckpointAfterFailedBulk)));
}

long getLocalCheckpointForShard(String index, int shardId) {
var indicesService = getInstanceFromNode(IndicesService.class);
var indexShard = indicesService.indexServiceSafe(resolveIndex(index)).getShard(shardId);
return indexShard.getLocalCheckpoint();
}
}

0 comments on commit 5790fc6

Please sign in to comment.