From 30ad72130879c7bd254071378a103006c5f1e994 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 17 Nov 2015 13:51:25 +0100 Subject: [PATCH 1/9] Initial implementation --- .../stats/TransportClusterStatsAction.java | 5 +- .../admin/indices/stats/CommonStatsFlags.java | 1 - .../admin/indices/stats/ShardStats.java | 17 ++- .../stats/TransportIndicesStatsAction.java | 3 +- .../metadata/MetaDataIndexUpgradeService.java | 79 +++++----- .../common/io/stream/StreamInput.java | 25 +-- .../common/io/stream/StreamOutput.java | 12 ++ .../EsRejectedExecutionException.java | 12 +- .../elasticsearch/index/engine/Engine.java | 4 + .../index/engine/InternalEngine.java | 123 ++++++++------- .../index/engine/ShadowEngine.java | 6 + .../index/seqno/LocalCheckpointService.java | 144 ++++++++++++++++++ .../elasticsearch/index/seqno/SeqNoStats.java | 76 +++++++++ .../index/seqno/SequenceNumbersService.java | 29 ++-- .../elasticsearch/index/shard/IndexShard.java | 10 ++ .../index/shard/ShadowIndexShard.java | 10 +- .../elasticsearch/indices/IndicesService.java | 4 +- .../elasticsearch/cluster/DiskUsageTests.java | 4 +- .../index/shard/IndexShardTests.java | 3 +- 19 files changed, 432 insertions(+), 135 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java create mode 100644 core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 3fba14e72bc79..91e1f82658739 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.action.admin.cluster.stats; -import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.indices.stats.CommonStats; @@ -30,6 +29,7 @@ import org.elasticsearch.action.support.nodes.TransportNodesAction; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterStateHealth; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.inject.Inject; @@ -105,7 +105,8 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq for (IndexShard indexShard : indexService) { if (indexShard.routingEntry() != null && indexShard.routingEntry().active()) { // only report on fully started shards - shardsStats.add(new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indexShard, SHARD_STATS_FLAGS), indexShard.commitStats())); + shardsStats.add(new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), + new CommonStats(indexShard, SHARD_STATS_FLAGS), indexShard.commitStats(), indexShard.seqNoStats())); } } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStatsFlags.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStatsFlags.java index fb306337886af..037bf8575eeea 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStatsFlags.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStatsFlags.java @@ -227,7 +227,6 @@ public static enum Flag { RequestCache("request_cache"), Recovery("recovery"); - private final String restName; Flag(String restName) { diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java index 8fea8c795ebd5..e76e1a86eb218 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java @@ -28,7 +28,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.index.engine.CommitStats; -import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.ShardPath; import java.io.IOException; @@ -42,6 +42,8 @@ public class ShardStats implements Streamable, ToXContent { private CommonStats commonStats; @Nullable private CommitStats commitStats; + @Nullable + private SeqNoStats seqNoStats; private String dataPath; private String statePath; private boolean isCustomDataPath; @@ -49,13 +51,14 @@ public class ShardStats implements Streamable, ToXContent { ShardStats() { } - public ShardStats(ShardRouting routing, ShardPath shardPath, CommonStats commonStats, CommitStats commitStats) { + public ShardStats(ShardRouting routing, ShardPath shardPath, CommonStats commonStats, CommitStats commitStats, SeqNoStats seqNoStats) { this.shardRouting = routing; this.dataPath = shardPath.getRootDataPath().toString(); this.statePath = shardPath.getRootStatePath().toString(); this.isCustomDataPath = shardPath.isCustomDataPath(); this.commitStats = commitStats; this.commonStats = commonStats; + this.seqNoStats = seqNoStats; } /** @@ -73,6 +76,11 @@ public CommitStats getCommitStats() { return this.commitStats; } + @Nullable + public SeqNoStats getSeqNoStats() { + return this.seqNoStats; + } + public String getDataPath() { return dataPath; } @@ -99,6 +107,7 @@ public void readFrom(StreamInput in) throws IOException { statePath = in.readString(); dataPath = in.readString(); isCustomDataPath = in.readBoolean(); + seqNoStats = in.readOptionalStreamable(SeqNoStats.PROTOTYPE); } @Override @@ -109,6 +118,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(statePath); out.writeString(dataPath); out.writeBoolean(isCustomDataPath); + out.writeOptionalStreamable(seqNoStats); } @Override @@ -124,6 +134,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (commitStats != null) { commitStats.toXContent(builder, params); } + if (seqNoStats != null) { + seqNoStats.toXContent(builder, params); + } builder.startObject(Fields.SHARD_PATH); builder.field(Fields.STATE_PATH, statePath); builder.field(Fields.DATA_PATH, dataPath); diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java index d5de67da478bb..e5fd4d4120879 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java @@ -162,6 +162,7 @@ protected ShardStats shardOperation(IndicesStatsRequest request, ShardRouting sh flags.set(CommonStatsFlags.Flag.Recovery); } - return new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indexShard, flags), indexShard.commitStats()); + return new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), + new CommonStats(indexShard, flags), indexShard.commitStats(), indexShard.seqNoStats()); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java index 00904af89155b..70a492954d120 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java @@ -19,7 +19,6 @@ package org.elasticsearch.cluster.metadata; import com.carrotsearch.hppc.cursors.ObjectCursor; - import org.apache.lucene.analysis.Analyzer; import org.elasticsearch.Version; import org.elasticsearch.cluster.routing.UnassignedInfo; @@ -30,6 +29,7 @@ import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.analysis.NamedAnalyzer; import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.seqno.LocalCheckpointService; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.indices.mapper.MapperRegistry; @@ -116,42 +116,43 @@ private static boolean isSupportedVersion(IndexMetaData indexMetaData) { /** All known byte-sized settings for an index. */ public static final Set INDEX_BYTES_SIZE_SETTINGS = unmodifiableSet(newHashSet( - "index.merge.policy.floor_segment", - "index.merge.policy.max_merged_segment", - "index.merge.policy.max_merge_size", - "index.merge.policy.min_merge_size", - "index.shard.recovery.file_chunk_size", - "index.shard.recovery.translog_size", - "index.store.throttle.max_bytes_per_sec", - "index.translog.flush_threshold_size", - "index.translog.fs.buffer_size", - "index.version_map_size")); + "index.merge.policy.floor_segment", + "index.merge.policy.max_merged_segment", + "index.merge.policy.max_merge_size", + "index.merge.policy.min_merge_size", + "index.shard.recovery.file_chunk_size", + "index.shard.recovery.translog_size", + "index.store.throttle.max_bytes_per_sec", + "index.translog.flush_threshold_size", + "index.translog.fs.buffer_size", + "index.version_map_size")); /** All known time settings for an index. */ public static final Set INDEX_TIME_SETTINGS = unmodifiableSet(newHashSet( - "index.gateway.wait_for_mapping_update_post_recovery", - "index.shard.wait_for_mapping_update_post_recovery", - "index.gc_deletes", - "index.indexing.slowlog.threshold.index.debug", - "index.indexing.slowlog.threshold.index.info", - "index.indexing.slowlog.threshold.index.trace", - "index.indexing.slowlog.threshold.index.warn", - "index.refresh_interval", - "index.search.slowlog.threshold.fetch.debug", - "index.search.slowlog.threshold.fetch.info", - "index.search.slowlog.threshold.fetch.trace", - "index.search.slowlog.threshold.fetch.warn", - "index.search.slowlog.threshold.query.debug", - "index.search.slowlog.threshold.query.info", - "index.search.slowlog.threshold.query.trace", - "index.search.slowlog.threshold.query.warn", - "index.shadow.wait_for_initial_commit", - "index.store.stats_refresh_interval", - "index.translog.flush_threshold_period", - "index.translog.interval", - "index.translog.sync_interval", - "index.shard.inactive_time", - UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING)); + "index.gateway.wait_for_mapping_update_post_recovery", + "index.shard.wait_for_mapping_update_post_recovery", + "index.gc_deletes", + "index.indexing.slowlog.threshold.index.debug", + "index.indexing.slowlog.threshold.index.info", + "index.indexing.slowlog.threshold.index.trace", + "index.indexing.slowlog.threshold.index.warn", + "index.refresh_interval", + "index.search.slowlog.threshold.fetch.debug", + "index.search.slowlog.threshold.fetch.info", + "index.search.slowlog.threshold.fetch.trace", + "index.search.slowlog.threshold.fetch.warn", + "index.search.slowlog.threshold.query.debug", + "index.search.slowlog.threshold.query.info", + "index.search.slowlog.threshold.query.trace", + "index.search.slowlog.threshold.query.warn", + "index.shadow.wait_for_initial_commit", + "index.store.stats_refresh_interval", + "index.translog.flush_threshold_period", + "index.translog.interval", + "index.translog.sync_interval", + "index.shard.inactive_time", + LocalCheckpointService.SETTINGS_INDEX_LAG_MAX_WAIT, + UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING)); /** * Elasticsearch 2.0 requires units on byte/memory and time settings; this method adds the default unit to any such settings that are @@ -163,7 +164,7 @@ private IndexMetaData addDefaultUnitsIfNeeded(IndexMetaData indexMetaData) { // Created lazily if we find any settings that are missing units: Settings settings = indexMetaData.getSettings(); Settings.Builder newSettings = null; - for(String byteSizeSetting : INDEX_BYTES_SIZE_SETTINGS) { + for (String byteSizeSetting : INDEX_BYTES_SIZE_SETTINGS) { String value = settings.get(byteSizeSetting); if (value != null) { try { @@ -180,7 +181,7 @@ private IndexMetaData addDefaultUnitsIfNeeded(IndexMetaData indexMetaData) { newSettings.put(byteSizeSetting, value + "b"); } } - for(String timeSetting : INDEX_TIME_SETTINGS) { + for (String timeSetting : INDEX_TIME_SETTINGS) { String value = settings.get(timeSetting); if (value != null) { try { @@ -200,9 +201,9 @@ private IndexMetaData addDefaultUnitsIfNeeded(IndexMetaData indexMetaData) { if (newSettings != null) { // At least one setting was changed: return IndexMetaData.builder(indexMetaData) - .version(indexMetaData.getVersion()) - .settings(newSettings.build()) - .build(); + .version(indexMetaData.getVersion()) + .settings(newSettings.build()) + .build(); } } diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index 20859e2716a8d..22c71b3df8f09 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -40,19 +40,9 @@ import org.joda.time.DateTime; import org.joda.time.DateTimeZone; -import java.io.ByteArrayInputStream; -import java.io.EOFException; -import java.io.FileNotFoundException; -import java.io.FilterInputStream; -import java.io.IOException; -import java.io.InputStream; +import java.io.*; import java.nio.file.NoSuchFileException; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.function.Supplier; import static org.elasticsearch.ElasticsearchException.readException; @@ -543,6 +533,17 @@ public T readOptionalStreamable(Supplier supplier) thr } } + /** + * Serializes a potential null value. + */ + public > T readOptionalStreamable(StreamableReader streamableReader) throws IOException { + if (readBoolean()) { + return streamableReader.readFrom(this); + } else { + return null; + } + } + public T readThrowable() throws IOException { if (readBoolean()) { int key = readVInt(); diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java index 5f1e7623d2822..0285e56355792 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java @@ -503,6 +503,18 @@ public void writeOptionalStreamable(@Nullable Streamable streamable) throws IOEx } } + /** + * Serializes a potential null value. + */ + public void writeOptionalStreamable(@Nullable Writeable writeable) throws IOException { + if (writeable != null) { + writeBoolean(true); + writeable.writeTo(this); + } else { + writeBoolean(false); + } + } + public void writeThrowable(Throwable throwable) throws IOException { if (throwable == null) { writeBoolean(false); diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java index d75b3ffa8c264..8033750d1d24e 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java @@ -31,13 +31,17 @@ public class EsRejectedExecutionException extends ElasticsearchException { private final boolean isExecutorShutdown; - public EsRejectedExecutionException(String message, boolean isExecutorShutdown) { - super(message); + public EsRejectedExecutionException(String message, boolean isExecutorShutdown, Object... args) { + super(message, args); this.isExecutorShutdown = isExecutorShutdown; } - public EsRejectedExecutionException(String message) { - this(message, false); + public EsRejectedExecutionException(String message, Object... args) { + this(message, false, args); + } + + public EsRejectedExecutionException(String message, boolean isExecutorShutdown) { + this(message, isExecutorShutdown, new Object[0]); } public EsRejectedExecutionException() { diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index bccae2e46642a..2bd714c2d26b3 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -45,6 +45,7 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.merge.MergeStats; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; @@ -325,6 +326,9 @@ public CommitStats commitStats() { return new CommitStats(getLastCommittedSegmentInfos()); } + /** get sequence number related stats */ + public abstract SeqNoStats seqNoStats(); + /** * Read the last segments info from the commit pointed to by the searcher manager */ diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index dbb62a735a064..81b9ab44851a5 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -50,6 +50,7 @@ import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.ElasticsearchMergePolicy; import org.elasticsearch.index.shard.MergeSchedulerConfig; @@ -348,10 +349,6 @@ public boolean index(Index index) { } catch (OutOfMemoryError | IllegalStateException | IOException t) { maybeFailEngine("index", t); throw new IndexFailedEngineException(shardId, index.type(), index.id(), t); - } finally { - if (index.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { - seqNoService.markSeqNoAsCompleted(index.seqNo()); - } } checkVersionMapRefresh(); return created; @@ -359,7 +356,7 @@ public boolean index(Index index) { private boolean innerIndex(Index index) throws IOException { synchronized (dirtyLock(index.uid())) { - lastWriteNanos = index.startTime(); + lastWriteNanos = index.startTime(); final long currentVersion; final boolean deleted; VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes()); @@ -388,37 +385,47 @@ private boolean innerIndex(Index index) throws IOException { final boolean created; index.updateVersion(updatedVersion); + final long seqNo; if (index.origin() == Operation.Origin.PRIMARY) { - index.updateSeqNo(seqNoService.generateSeqNo()); - } - - if (currentVersion == Versions.NOT_FOUND) { - // document does not exists, we can optimize for create - created = true; - if (index.docs().size() > 1) { - indexWriter.addDocuments(index.docs()); - } else { - indexWriter.addDocument(index.docs().get(0)); - } + seqNo = seqNoService.generateSeqNo(); } else { - if (versionValue != null) { - created = versionValue.delete(); // we have a delete which is not GC'ed... - } else { - created = false; + seqNo = index.seqNo(); + seqNoService.markSeqNoAsStarted(seqNo); + } + try { + if (index.origin() == Operation.Origin.PRIMARY) { + index.updateSeqNo(seqNo); } - if (index.docs().size() > 1) { - indexWriter.updateDocuments(index.uid(), index.docs()); + if (currentVersion == Versions.NOT_FOUND) { + // document does not exists, we can optimize for create + created = true; + if (index.docs().size() > 1) { + indexWriter.addDocuments(index.docs()); + } else { + indexWriter.addDocument(index.docs().get(0)); + } } else { - indexWriter.updateDocument(index.uid(), index.docs().get(0)); + if (versionValue != null) { + created = versionValue.delete(); // we have a delete which is not GC'ed... + } else { + created = false; + } + if (index.docs().size() > 1) { + indexWriter.updateDocuments(index.uid(), index.docs()); + } else { + indexWriter.updateDocument(index.uid(), index.docs().get(0)); + } } - } - Translog.Location translogLocation = translog.add(new Translog.Index(index)); + Translog.Location translogLocation = translog.add(new Translog.Index(index)); - versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation)); - index.setTranslogLocation(translogLocation); + versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation)); + index.setTranslogLocation(translogLocation); - indexingService.postIndexUnderLock(index); - return created; + indexingService.postIndexUnderLock(index); + return created; + } finally { + seqNoService.markSeqNoAsCompleted(seqNo); + } } } @@ -458,10 +465,6 @@ public void delete(Delete delete) throws EngineException { } catch (OutOfMemoryError | IllegalStateException | IOException t) { maybeFailEngine("delete", t); throw new DeleteFailedEngineException(shardId, delete, t); - } finally { - if (delete.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { - seqNoService.markSeqNoAsCompleted(delete.seqNo()); - } } maybePruneDeletedTombstones(); @@ -506,28 +509,39 @@ private void innerDelete(Delete delete) throws IOException { } updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion); + final long seqNo; if (delete.origin() == Operation.Origin.PRIMARY) { - delete.updateSeqNo(seqNoService.generateSeqNo()); - } - - final boolean found; - if (currentVersion == Versions.NOT_FOUND) { - // doc does not exist and no prior deletes - found = false; - } else if (versionValue != null && versionValue.delete()) { - // a "delete on delete", in this case, we still increment the version, log it, and return that version - found = false; + seqNo = seqNoService.generateSeqNo(); } else { - // we deleted a currently existing document - indexWriter.deleteDocuments(delete.uid()); - found = true; + seqNo = delete.seqNo(); + seqNoService.markSeqNoAsStarted(seqNo); } + try { + if (delete.origin() == Operation.Origin.PRIMARY) { + delete.updateSeqNo(seqNo); + } - delete.updateVersion(updatedVersion, found); - Translog.Location translogLocation = translog.add(new Translog.Delete(delete)); - versionMap.putUnderLock(delete.uid().bytes(), new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis(), translogLocation)); - delete.setTranslogLocation(translogLocation); - indexingService.postDeleteUnderLock(delete); + final boolean found; + if (currentVersion == Versions.NOT_FOUND) { + // doc does not exist and no prior deletes + found = false; + } else if (versionValue != null && versionValue.delete()) { + // a "delete on delete", in this case, we still increment the version, log it, and return that version + found = false; + } else { + // we deleted a currently existing document + indexWriter.deleteDocuments(delete.uid()); + found = true; + } + + delete.updateVersion(updatedVersion, found); + Translog.Location translogLocation = translog.add(new Translog.Delete(delete)); + versionMap.putUnderLock(delete.uid().bytes(), new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis(), translogLocation)); + delete.setTranslogLocation(translogLocation); + indexingService.postDeleteUnderLock(delete); + } finally { + seqNoService.markSeqNoAsCompleted(seqNo); + } } } @@ -988,7 +1002,7 @@ final static class SearchFactory extends EngineSearcherFactory { @Override public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) throws IOException { IndexSearcher searcher = super.newSearcher(reader, previousReader); - if (reader instanceof LeafReader && isMergedSegment((LeafReader)reader)) { + if (reader instanceof LeafReader && isMergedSegment((LeafReader) reader)) { // we call newSearcher from the IndexReaderWarmer which warms segments during merging // in that case the reader is a LeafReader and all we need to do is to build a new Searcher // and return it since it does it's own warming for that particular reader. @@ -1178,4 +1192,9 @@ public void onSettingsChanged() { public MergeStats getMergeStats() { return mergeScheduler.stats(); } + + @Override + public SeqNoStats seqNoStats() { + return seqNoService.stats(); + } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java index af3e0ae82a8ec..dd9ff4375e854 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.translog.Translog; import java.io.IOException; @@ -231,4 +232,9 @@ public long indexWriterRAMBytesUsed() { // No IndexWriter throw new UnsupportedOperationException("ShadowEngine has no IndexWriter"); } + + @Override + public SeqNoStats seqNoStats() { + throw new UnsupportedOperationException("ShadowEngine doesn't track sequence numbers"); + } } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java new file mode 100644 index 0000000000000..bad0b0e5bd88d --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java @@ -0,0 +1,144 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.seqno; + +import org.apache.lucene.util.FixedBitSet; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.ShardId; + +public class LocalCheckpointService extends AbstractIndexShardComponent { + + public static String SETTINGS_INDEX_LAG_THRESHOLD = "index.seq_no.index_lag.threshold"; + public static String SETTINGS_INDEX_LAG_MAX_WAIT = "index.seq_no.index_lag.max_wait"; + + final Object mutex = new Object(); + final FixedBitSet processedSeqNo; + final int indexLagThreshold; + final TimeValue indexLagMaxWait; + + + volatile long nextSeqNo = 0; + volatile long checkpoint = -1; + + public LocalCheckpointService(ShardId shardId, IndexSettings indexSettings) { + super(shardId, indexSettings); + indexLagThreshold = indexSettings.getSettings().getAsInt(SETTINGS_INDEX_LAG_THRESHOLD, 1024); + indexLagMaxWait = indexSettings.getSettings().getAsTime(SETTINGS_INDEX_LAG_MAX_WAIT, TimeValue.timeValueSeconds(30)); + processedSeqNo = new FixedBitSet(indexLagThreshold); + + } + + public long generateSeqNo() { + synchronized (mutex) { + // we have to keep checking when ensure capacity returns because it release the lock and nextSeqNo may change + while (hasCapacity(nextSeqNo) == false) { + ensureCapacity(nextSeqNo); + } + return nextSeqNo++; + } + } + + public void markSeqNoAsStarted(long seqNo) { + synchronized (mutex) { + // make sure we track highest seen seqNo + if (seqNo >= nextSeqNo) { + nextSeqNo = seqNo + 1; + } + if (seqNo <= checkpoint) { + // this is possible during recover where we might replay an op that was also replicated + return; + } + ensureCapacity(seqNo); + assert processedSeqNo.get(seqNoToOffset(seqNo)) == false : "expected [" + seqNo + "] not to be marked as started"; + } + } + + public long markSeqNoAsCompleted(long seqNo) { + synchronized (mutex) { + if (seqNo <= checkpoint) { + // this is possible during recover where we might replay an op that was also replicated + return checkpoint; + } + // just to be safe (previous calls to generateSeqNo/markSeqNoAsStarted should ensure this is OK) + ensureCapacity(seqNo); + int offset = seqNoToOffset(seqNo); + processedSeqNo.set(offset); + if (seqNo == checkpoint + 1) { + do { + // clear the flag as we are making it free for future operations. do se before we expose it + // by moving the checkpoint + processedSeqNo.clear(offset); + checkpoint++; + offset = seqNoToOffset(checkpoint + 1); + } while (processedSeqNo.get(offset)); + mutex.notifyAll(); + } + } + return checkpoint; + } + + public long getCheckpoint() { + return checkpoint; + } + + public long getMaxSeqNo() { + return nextSeqNo - 1; + } + + + private boolean hasCapacity(long seqNo) { + assert Thread.holdsLock(mutex); + return (seqNo - checkpoint) < indexLagThreshold; + } + + private void ensureCapacity(long seqNo) { + assert Thread.holdsLock(mutex); + long retry = 0; + final long maxRetries = indexLagMaxWait.seconds(); + while (hasCapacity(seqNo) == false) { + try { + if (retry > maxRetries) { + ElasticsearchException e = new EsRejectedExecutionException("indexing lag exceeds [{}] (seq# requested [{}], local checkpoint [{}]", + indexLagThreshold, seqNo, checkpoint); + e.setShard(shardId()); + throw e; + } + + // this temporary releases the lock on mutex + mutex.wait(Math.min(1000, indexLagMaxWait.millis() - retry * 1000)); + retry++; + } catch (InterruptedException ie) { + ElasticsearchException exp = new ElasticsearchException("interrupted while waiting on index lag"); + exp.setShard(shardId()); + throw exp; + } + } + } + + private int seqNoToOffset(long seqNo) { + assert seqNo - checkpoint < indexLagThreshold; + assert seqNo > checkpoint; + return (int) (seqNo % indexLagThreshold); + } + +} diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java b/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java new file mode 100644 index 0000000000000..a2052f75976a0 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java @@ -0,0 +1,76 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.seqno; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; + +import java.io.IOException; + +public class SeqNoStats implements ToXContent, Writeable { + + public static final SeqNoStats PROTOTYPE = new SeqNoStats(0, 0); + + final long maxSeqNo; + final long localCheckpoint; + + public SeqNoStats(long maxSeqNo, long localCheckpoint) { + this.maxSeqNo = maxSeqNo; + this.localCheckpoint = localCheckpoint; + } + + public long getMaxSeqNo() { + return maxSeqNo; + } + + public long getLocalCheckpoint() { + return localCheckpoint; + } + + @Override + public SeqNoStats readFrom(StreamInput in) throws IOException { + return new SeqNoStats(in.readLong(), in.readLong()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(maxSeqNo); + out.writeLong(localCheckpoint); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields.SEQ_NO); + builder.field(Fields.MAX_SEQ_NO, maxSeqNo); + builder.field(Fields.LOCAL_CHECKPOINT, localCheckpoint); + builder.endObject(); + return builder; + } + + + static final class Fields { + static final XContentBuilderString SEQ_NO = new XContentBuilderString("seq_no"); + static final XContentBuilderString MAX_SEQ_NO = new XContentBuilderString("max"); + static final XContentBuilderString LOCAL_CHECKPOINT = new XContentBuilderString("local_checkpoint"); + } +} diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index 46b033622432b..12a4e8bad5d50 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -22,39 +22,36 @@ import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; -import java.util.concurrent.atomic.AtomicLong; - /** a very light weight implementation. will be replaced with proper machinery later */ public class SequenceNumbersService extends AbstractIndexShardComponent { public final static long UNASSIGNED_SEQ_NO = -1L; - - AtomicLong seqNoGenerator = new AtomicLong(); + final LocalCheckpointService localCheckpointService; public SequenceNumbersService(ShardId shardId, IndexSettings indexSettings) { super(shardId, indexSettings); + localCheckpointService = new LocalCheckpointService(shardId, indexSettings); } /** * generates a new sequence number. * Note: you must call {@link #markSeqNoAsCompleted(long)} after the operation for which this seq# was generated - * was completed (whether successfully or with a failure + * was completed (whether successfully or with a failure) */ public long generateSeqNo() { - return seqNoGenerator.getAndIncrement(); + return localCheckpointService.generateSeqNo(); + } + + public void markSeqNoAsStarted(long seqNo) { + localCheckpointService.markSeqNoAsStarted(seqNo); + } public void markSeqNoAsCompleted(long seqNo) { - // this is temporary to make things semi sane on primary promotion and recovery. will be replaced with better machinery - boolean success; - do { - long maxSeqNo = seqNoGenerator.get(); - if (seqNo > maxSeqNo) { - success = seqNoGenerator.compareAndSet(maxSeqNo, seqNo); - } else { - success = true; - } - } while (success == false); + localCheckpointService.markSeqNoAsCompleted(seqNo); } + public SeqNoStats stats() { + return new SeqNoStats(localCheckpointService.getMaxSeqNo(), localCheckpointService.getCheckpoint()); + } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 443696986fd11..5bc38e5616e64 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -79,6 +79,7 @@ import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.search.stats.ShardSearchStats; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.snapshots.IndexShardRepository; @@ -591,6 +592,15 @@ public CommitStats commitStats() { return engine == null ? null : engine.commitStats(); } + /** + * @return {@link SeqNoStats} if engine is open, otherwise null + */ + @Nullable + public SeqNoStats seqNoStats() { + Engine engine = getEngineOrNull(); + return engine == null ? null : engine.seqNoStats(); + } + public IndexingStats indexingStats(String... types) { return indexingService.stats(types); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java index 50a16fa1cee70..d99e2ccd0e803 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -18,8 +18,6 @@ */ package org.elasticsearch.index.shard; -import java.io.IOException; - import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.index.IndexSettings; @@ -31,10 +29,13 @@ import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.merge.MergeStats; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.TranslogStats; +import java.io.IOException; + /** * ShadowIndexShard extends {@link IndexShard} to add file synchronization * from the primary when a flush happens. It also ensures that a replica being @@ -67,6 +68,11 @@ public MergeStats mergeStats() { return new MergeStats(); } + @Override + public SeqNoStats seqNoStats() { + return null; + } + @Override public boolean canIndex() { return false; diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesService.java b/core/src/main/java/org/elasticsearch/indices/IndicesService.java index dead72aee8b4f..352ca12740516 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -198,7 +198,9 @@ public NodeIndicesStats stats(boolean includePrevious, CommonStatsFlags flags) { if (indexShard.routingEntry() == null) { continue; } - IndexShardStats indexShardStats = new IndexShardStats(indexShard.shardId(), new ShardStats[] { new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indexShard, flags), indexShard.commitStats()) }); + IndexShardStats indexShardStats = new IndexShardStats(indexShard.shardId(), + new ShardStats[]{new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), + new CommonStats(indexShard, flags), indexShard.commitStats(), indexShard.seqNoStats())}); if (!statsByShard.containsKey(indexService.index())) { statsByShard.put(indexService.index(), arrayAsArrayList(indexShardStats)); } else { diff --git a/core/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java b/core/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java index 1be999d8fedce..9ff9ee9ee437c 100644 --- a/core/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java @@ -104,8 +104,8 @@ public void testFillShardLevelInfo() { CommonStats commonStats1 = new CommonStats(); commonStats1.store = new StoreStats(1000, 1); ShardStats[] stats = new ShardStats[]{ - new ShardStats(test_0, new ShardPath(false, test0Path, test0Path, "0xdeadbeef", test_0.shardId()), commonStats0, null), - new ShardStats(test_1, new ShardPath(false, test1Path, test1Path, "0xdeadbeef", test_1.shardId()), commonStats1, null) + new ShardStats(test_0, new ShardPath(false, test0Path, test0Path, "0xdeadbeef", test_0.shardId()), commonStats0, null, null), + new ShardStats(test_1, new ShardPath(false, test1Path, test1Path, "0xdeadbeef", test_1.shardId()), commonStats1, null, null) }; ImmutableOpenMap.Builder shardSizes = ImmutableOpenMap.builder(); ImmutableOpenMap.Builder routingToPath = ImmutableOpenMap.builder(); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 1923c1e2cffa4..0fd9939e613a3 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -591,7 +591,8 @@ public void testShardStats() throws IOException { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService test = indicesService.indexService("test"); IndexShard shard = test.getShardOrNull(0); - ShardStats stats = new ShardStats(shard.routingEntry(), shard.shardPath(), new CommonStats(shard, new CommonStatsFlags()), shard.commitStats()); + ShardStats stats = new ShardStats(shard.routingEntry(), shard.shardPath(), new CommonStats(shard, new CommonStatsFlags()), + shard.commitStats(), shard.seqNoStats()); assertEquals(shard.shardPath().getRootDataPath().toString(), stats.getDataPath()); assertEquals(shard.shardPath().getRootStatePath().toString(), stats.getStatePath()); assertEquals(shard.shardPath().isCustomDataPath(), stats.isCustomDataPath()); From 9aea462fe8d67baf96e4dbb16e02ff9f466b115c Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 29 Nov 2015 22:58:55 +0100 Subject: [PATCH 2/9] Add tests plus other changes --- .../admin/indices/stats/ShardStats.java | 2 +- .../common/io/stream/StreamInput.java | 2 +- .../index/engine/InternalEngine.java | 124 ++++----- .../index/seqno/LocalCheckpointService.java | 23 +- .../index/seqno/SequenceNumbersService.java | 5 - .../index/engine/InternalEngineTests.java | 47 ++++ .../seqno/LocalCheckpointServiceTests.java | 257 ++++++++++++++++++ .../org/elasticsearch/test/ESTestCase.java | 22 +- 8 files changed, 384 insertions(+), 98 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java index e76e1a86eb218..3b1ff2e538de8 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java @@ -107,7 +107,7 @@ public void readFrom(StreamInput in) throws IOException { statePath = in.readString(); dataPath = in.readString(); isCustomDataPath = in.readBoolean(); - seqNoStats = in.readOptionalStreamable(SeqNoStats.PROTOTYPE); + seqNoStats = in.readOptionalStreamableReader(SeqNoStats.PROTOTYPE); } @Override diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index 22c71b3df8f09..23d414c851909 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -536,7 +536,7 @@ public T readOptionalStreamable(Supplier supplier) thr /** * Serializes a potential null value. */ - public > T readOptionalStreamable(StreamableReader streamableReader) throws IOException { + public > T readOptionalStreamableReader(StreamableReader streamableReader) throws IOException { if (readBoolean()) { return streamableReader.readFrom(this); } else { diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 81b9ab44851a5..488222487630f 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -356,45 +356,38 @@ public boolean index(Index index) { private boolean innerIndex(Index index) throws IOException { synchronized (dirtyLock(index.uid())) { - lastWriteNanos = index.startTime(); - final long currentVersion; - final boolean deleted; - VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes()); - if (versionValue == null) { - currentVersion = loadCurrentVersionFromIndex(index.uid()); - deleted = currentVersion == Versions.NOT_FOUND; - } else { - deleted = versionValue.delete(); - if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) { - currentVersion = Versions.NOT_FOUND; // deleted, and GC + try { + lastWriteNanos = index.startTime(); + final long currentVersion; + final boolean deleted; + VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes()); + if (versionValue == null) { + currentVersion = loadCurrentVersionFromIndex(index.uid()); + deleted = currentVersion == Versions.NOT_FOUND; } else { - currentVersion = versionValue.version(); + deleted = versionValue.delete(); + if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) { + currentVersion = Versions.NOT_FOUND; // deleted, and GC + } else { + currentVersion = versionValue.version(); + } } - } - long expectedVersion = index.version(); - if (index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) { - if (index.origin() == Operation.Origin.RECOVERY) { - return false; - } else { - throw new VersionConflictEngineException(shardId, index.type(), index.id(), - index.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted)); + long expectedVersion = index.version(); + if (index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) { + if (index.origin() == Operation.Origin.RECOVERY) { + return false; + } else { + throw new VersionConflictEngineException(shardId, index.type(), index.id(), + index.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted)); + } } - } - long updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); + long updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); - final boolean created; - index.updateVersion(updatedVersion); - final long seqNo; - if (index.origin() == Operation.Origin.PRIMARY) { - seqNo = seqNoService.generateSeqNo(); - } else { - seqNo = index.seqNo(); - seqNoService.markSeqNoAsStarted(seqNo); - } - try { + final boolean created; + index.updateVersion(updatedVersion); if (index.origin() == Operation.Origin.PRIMARY) { - index.updateSeqNo(seqNo); + index.updateSeqNo(seqNoService.generateSeqNo()); } if (currentVersion == Versions.NOT_FOUND) { // document does not exists, we can optimize for create @@ -424,7 +417,9 @@ private boolean innerIndex(Index index) throws IOException { indexingService.postIndexUnderLock(index); return created; } finally { - seqNoService.markSeqNoAsCompleted(seqNo); + if (index.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + seqNoService.markSeqNoAsCompleted(index.seqNo()); + } } } } @@ -481,44 +476,37 @@ private void maybePruneDeletedTombstones() { private void innerDelete(Delete delete) throws IOException { synchronized (dirtyLock(delete.uid())) { - lastWriteNanos = delete.startTime(); - final long currentVersion; - final boolean deleted; - VersionValue versionValue = versionMap.getUnderLock(delete.uid().bytes()); - if (versionValue == null) { - currentVersion = loadCurrentVersionFromIndex(delete.uid()); - deleted = currentVersion == Versions.NOT_FOUND; - } else { - deleted = versionValue.delete(); - if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) { - currentVersion = Versions.NOT_FOUND; // deleted, and GC + try { + lastWriteNanos = delete.startTime(); + final long currentVersion; + final boolean deleted; + VersionValue versionValue = versionMap.getUnderLock(delete.uid().bytes()); + if (versionValue == null) { + currentVersion = loadCurrentVersionFromIndex(delete.uid()); + deleted = currentVersion == Versions.NOT_FOUND; } else { - currentVersion = versionValue.version(); + deleted = versionValue.delete(); + if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) { + currentVersion = Versions.NOT_FOUND; // deleted, and GC + } else { + currentVersion = versionValue.version(); + } } - } - long updatedVersion; - long expectedVersion = delete.version(); - if (delete.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) { - if (delete.origin() == Operation.Origin.RECOVERY) { - return; - } else { - throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), - delete.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted)); + long updatedVersion; + long expectedVersion = delete.version(); + if (delete.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) { + if (delete.origin() == Operation.Origin.RECOVERY) { + return; + } else { + throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), + delete.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted)); + } } - } - updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion); + updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion); - final long seqNo; - if (delete.origin() == Operation.Origin.PRIMARY) { - seqNo = seqNoService.generateSeqNo(); - } else { - seqNo = delete.seqNo(); - seqNoService.markSeqNoAsStarted(seqNo); - } - try { if (delete.origin() == Operation.Origin.PRIMARY) { - delete.updateSeqNo(seqNo); + delete.updateSeqNo(seqNoService.generateSeqNo()); } final boolean found; @@ -540,7 +528,9 @@ private void innerDelete(Delete delete) throws IOException { delete.setTranslogLocation(translogLocation); indexingService.postDeleteUnderLock(delete); } finally { - seqNoService.markSeqNoAsCompleted(seqNo); + if (delete.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + seqNoService.markSeqNoAsCompleted(delete.seqNo()); + } } } } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java index bad0b0e5bd88d..49cdb7d921fdd 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java @@ -30,6 +30,8 @@ public class LocalCheckpointService extends AbstractIndexShardComponent { public static String SETTINGS_INDEX_LAG_THRESHOLD = "index.seq_no.index_lag.threshold"; public static String SETTINGS_INDEX_LAG_MAX_WAIT = "index.seq_no.index_lag.max_wait"; + final static int DEFAULT_INDEX_LAG_THRESHOLD = 1024; + final static TimeValue DEFAULT_INDEX_LAG_MAX_WAIT = TimeValue.timeValueSeconds(30); final Object mutex = new Object(); final FixedBitSet processedSeqNo; @@ -42,8 +44,8 @@ public class LocalCheckpointService extends AbstractIndexShardComponent { public LocalCheckpointService(ShardId shardId, IndexSettings indexSettings) { super(shardId, indexSettings); - indexLagThreshold = indexSettings.getSettings().getAsInt(SETTINGS_INDEX_LAG_THRESHOLD, 1024); - indexLagMaxWait = indexSettings.getSettings().getAsTime(SETTINGS_INDEX_LAG_MAX_WAIT, TimeValue.timeValueSeconds(30)); + indexLagThreshold = indexSettings.getSettings().getAsInt(SETTINGS_INDEX_LAG_THRESHOLD, DEFAULT_INDEX_LAG_THRESHOLD); + indexLagMaxWait = indexSettings.getSettings().getAsTime(SETTINGS_INDEX_LAG_MAX_WAIT, DEFAULT_INDEX_LAG_MAX_WAIT); processedSeqNo = new FixedBitSet(indexLagThreshold); } @@ -58,23 +60,12 @@ public long generateSeqNo() { } } - public void markSeqNoAsStarted(long seqNo) { + public long markSeqNoAsCompleted(long seqNo) { synchronized (mutex) { // make sure we track highest seen seqNo if (seqNo >= nextSeqNo) { nextSeqNo = seqNo + 1; } - if (seqNo <= checkpoint) { - // this is possible during recover where we might replay an op that was also replicated - return; - } - ensureCapacity(seqNo); - assert processedSeqNo.get(seqNoToOffset(seqNo)) == false : "expected [" + seqNo + "] not to be marked as started"; - } - } - - public long markSeqNoAsCompleted(long seqNo) { - synchronized (mutex) { if (seqNo <= checkpoint) { // this is possible during recover where we might replay an op that was also replicated return checkpoint; @@ -108,7 +99,7 @@ public long getMaxSeqNo() { private boolean hasCapacity(long seqNo) { assert Thread.holdsLock(mutex); - return (seqNo - checkpoint) < indexLagThreshold; + return (seqNo - checkpoint) <= indexLagThreshold; } private void ensureCapacity(long seqNo) { @@ -136,7 +127,7 @@ private void ensureCapacity(long seqNo) { } private int seqNoToOffset(long seqNo) { - assert seqNo - checkpoint < indexLagThreshold; + assert seqNo - checkpoint <= indexLagThreshold; assert seqNo > checkpoint; return (int) (seqNo % indexLagThreshold); } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index 12a4e8bad5d50..fd6fa7387e687 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -42,11 +42,6 @@ public long generateSeqNo() { return localCheckpointService.generateSeqNo(); } - public void markSeqNoAsStarted(long seqNo) { - localCheckpointService.markSeqNoAsStarted(seqNo); - - } - public void markSeqNoAsCompleted(long seqNo) { localCheckpointService.markSeqNoAsCompleted(seqNo); } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f38092d131e34..d1bf85e1f2125 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1398,6 +1398,53 @@ public void testIndexWriterInfoStream() { } } + public void testSeqNoAndLocalCheckpoint() { + int opCount = randomIntBetween(1, 10); + long seqNoCount = -1; + for (int op = 0; op < opCount; op++) { + final String id = randomFrom("1", "2", "3"); + ParsedDocument doc = testParsedDocument(id, id, "test", null, -1, -1, testDocumentWithTextField(), B_1, null); + if (randomBoolean()) { + final Engine.Index index = new Engine.Index(newUid(id), doc, + SequenceNumbersService.UNASSIGNED_SEQ_NO, + rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL, + PRIMARY, System.currentTimeMillis()); + + try { + engine.index(index); + } catch (VersionConflictEngineException e) { + // OK + } + if (index.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + seqNoCount++; + Engine.Index replica = new Engine.Index(index.uid(), index.parsedDoc(), index.seqNo(), + index.version(), VersionType.EXTERNAL, REPLICA, System.currentTimeMillis()); + replicaEngine.index(replica); + } + } else { + final Engine.Delete delete = new Engine.Delete("test", id, newUid(id), + SequenceNumbersService.UNASSIGNED_SEQ_NO, + rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL, + PRIMARY, System.currentTimeMillis(), false); + try { + engine.delete(delete); + } catch (VersionConflictEngineException e) { + // OK + } + if (delete.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + seqNoCount++; + Engine.Delete replica = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), + delete.version(), VersionType.EXTERNAL, REPLICA, System.currentTimeMillis(), false); + replicaEngine.delete(replica); + } + } + } + assertThat(engine.seqNoStats().getMaxSeqNo(), equalTo(seqNoCount)); + assertThat(engine.seqNoStats().getLocalCheckpoint(), equalTo(seqNoCount)); + assertThat(replicaEngine.seqNoStats().getMaxSeqNo(), equalTo(seqNoCount)); + assertThat(replicaEngine.seqNoStats().getLocalCheckpoint(), equalTo(seqNoCount)); + } + // #8603: make sure we can separately log IFD's messages public void testIndexWriterIFDInfoStream() { assumeFalse("who tests the tester?", VERBOSE); diff --git a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java new file mode 100644 index 0000000000000..73e7f839d1a26 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java @@ -0,0 +1,257 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.seqno; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.IndexSettingsModule; +import org.junit.Before; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.Matchers.equalTo; + +public class LocalCheckpointServiceTests extends ESTestCase { + + LocalCheckpointService checkpointService; + + final int SMALL_INDEX_LAG_THRESHOLD = 10; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + checkpointService = getCheckpointService(SMALL_INDEX_LAG_THRESHOLD, LocalCheckpointService.DEFAULT_INDEX_LAG_MAX_WAIT); + } + + protected LocalCheckpointService getCheckpointService(int thresholdLag, TimeValue thresholdDelay) { + return new LocalCheckpointService( + new ShardId("test", 0), + IndexSettingsModule.newIndexSettings("test", + Settings.builder() + .put(LocalCheckpointService.SETTINGS_INDEX_LAG_THRESHOLD, thresholdLag) + .put(LocalCheckpointService.SETTINGS_INDEX_LAG_MAX_WAIT, thresholdDelay) + .build() + )); + } + + public void testSimplePrimary() { + long seqNo1, seqNo2; + assertThat(checkpointService.getCheckpoint(), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + seqNo1 = checkpointService.generateSeqNo(); + assertThat(seqNo1, equalTo(0L)); + checkpointService.markSeqNoAsCompleted(seqNo1); + assertThat(checkpointService.getCheckpoint(), equalTo(0L)); + seqNo1 = checkpointService.generateSeqNo(); + seqNo2 = checkpointService.generateSeqNo(); + assertThat(seqNo1, equalTo(1L)); + assertThat(seqNo2, equalTo(2L)); + checkpointService.markSeqNoAsCompleted(seqNo2); + assertThat(checkpointService.getCheckpoint(), equalTo(0L)); + checkpointService.markSeqNoAsCompleted(seqNo1); + assertThat(checkpointService.getCheckpoint(), equalTo(2L)); + } + + public void testSimpleReplica() { + assertThat(checkpointService.getCheckpoint(), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + checkpointService.markSeqNoAsCompleted(0L); + assertThat(checkpointService.getCheckpoint(), equalTo(0L)); + checkpointService.markSeqNoAsCompleted(2L); + assertThat(checkpointService.getCheckpoint(), equalTo(0L)); + checkpointService.markSeqNoAsCompleted(1L); + assertThat(checkpointService.getCheckpoint(), equalTo(2L)); + } + + public void testIndexThrottleSuccessPrimary() throws Exception { + LocalCheckpointService checkpoint = getCheckpointService(3, TimeValue.timeValueHours(1)); + final long seq1 = checkpoint.generateSeqNo(); + final long seq2 = checkpoint.generateSeqNo(); + final long seq3 = checkpoint.generateSeqNo(); + final CountDownLatch threadStarted = new CountDownLatch(1); + final AtomicBoolean threadDone = new AtomicBoolean(false); + Thread backgroundThread = new Thread(() -> { + threadStarted.countDown(); + checkpoint.generateSeqNo(); + threadDone.set(true); + }, "testIndexDelayPrimary"); + backgroundThread.start(); + logger.info("--> waiting for thread to start"); + threadStarted.await(); + assertFalse("background thread finished but should have waited", threadDone.get()); + checkpoint.markSeqNoAsCompleted(seq2); + assertFalse("background thread finished but should have waited (seq2 completed)", threadDone.get()); + checkpoint.markSeqNoAsCompleted(seq1); + logger.info("--> waiting for thread to stop"); + assertBusy(() -> { + assertTrue("background thread should finished after finishing seq1", threadDone.get()); + }); + } + + public void testIndexThrottleTimeoutPrimary() throws Exception { + LocalCheckpointService checkpoint = getCheckpointService(2, TimeValue.timeValueMillis(100)); + checkpoint.generateSeqNo(); + checkpoint.generateSeqNo(); + try { + checkpoint.generateSeqNo(); + fail("index operation should time out due to a large lag"); + } catch (EsRejectedExecutionException e) { + // OK! + } + } + + public void testIndexThrottleSuccessReplica() throws Exception { + LocalCheckpointService checkpoint = getCheckpointService(3, TimeValue.timeValueHours(1)); + final CountDownLatch threadStarted = new CountDownLatch(1); + final AtomicBoolean threadDone = new AtomicBoolean(false); + checkpoint.markSeqNoAsCompleted(1); + Thread backgroundThread = new Thread(() -> { + threadStarted.countDown(); + checkpoint.markSeqNoAsCompleted(3); + threadDone.set(true); + }, "testIndexDelayReplica"); + backgroundThread.start(); + logger.info("--> waiting for thread to start"); + threadStarted.await(); + assertFalse("background thread finished but should have waited", threadDone.get()); + checkpoint.markSeqNoAsCompleted(0); + logger.info("--> waiting for thread to stop"); + assertBusy(() -> { + assertTrue("background thread should finished after finishing seq1", threadDone.get()); + }); + } + + public void testIndexThrottleTimeoutReplica() throws Exception { + LocalCheckpointService checkpoint = getCheckpointService(1, TimeValue.timeValueMillis(100)); + try { + checkpoint.markSeqNoAsCompleted(1L); + fail("index operation should time out due to a large lag"); + } catch (EsRejectedExecutionException e) { + // OK! + } + checkpoint.markSeqNoAsCompleted(0L); + try { + checkpoint.markSeqNoAsCompleted(2L); + fail("index operation should time out due to a large lag"); + } catch (EsRejectedExecutionException e) { + // OK! + } + + } + + public void testConcurrentPrimary() throws InterruptedException { + Thread[] threads = new Thread[randomIntBetween(2, 5)]; + final int opsPerThread = randomIntBetween(10, 20); + final int maxOps = opsPerThread * threads.length; + final long unFinisshedSeq = randomIntBetween(maxOps - SMALL_INDEX_LAG_THRESHOLD, maxOps - 2); // make sure we won't be blocked + logger.info("--> will run [{}] threads, maxOps [{}], unfinished seq no [{}]", threads.length, maxOps, unFinisshedSeq); + final CyclicBarrier barrier = new CyclicBarrier(threads.length); + for (int t = 0; t < threads.length; t++) { + final int threadId = t; + threads[t] = new Thread(new AbstractRunnable() { + @Override + public void onFailure(Throwable t) { + throw new ElasticsearchException("failure in background thread", t); + } + + @Override + protected void doRun() throws Exception { + barrier.await(); + for (int i = 0; i < opsPerThread; i++) { + long seqNo = checkpointService.generateSeqNo(); + logger.info("[t{}] started [{}]", threadId, seqNo); + if (seqNo != unFinisshedSeq) { + checkpointService.markSeqNoAsCompleted(seqNo); + logger.info("[t{}] completed [{}]", threadId, seqNo); + } + } + } + }, "testConcurrentPrimary_" + threadId); + threads[t].start(); + } + for (Thread thread : threads) { + thread.join(); + } + assertThat(checkpointService.getMaxSeqNo(), equalTo(maxOps - 1L)); + assertThat(checkpointService.getCheckpoint(), equalTo(unFinisshedSeq - 1L)); + checkpointService.markSeqNoAsCompleted(unFinisshedSeq); + assertThat(checkpointService.getCheckpoint(), equalTo(maxOps - 1L)); + } + + public void testConcurrentReplica() throws InterruptedException { + Thread[] threads = new Thread[randomIntBetween(2, 5)]; + final int opsPerThread = randomIntBetween(10, 20); + final int maxOps = opsPerThread * threads.length; + final long unFinisshedSeq = randomIntBetween(maxOps - SMALL_INDEX_LAG_THRESHOLD, maxOps - 2); // make sure we won't be blocked + Set seqNoList = new HashSet<>(); + for (int i = 0; i < maxOps; i++) { + seqNoList.add(i); + } + + final Integer[][] seqNoPerThread = new Integer[threads.length][]; + for (int t = 0; t < threads.length - 1; t++) { + int size = Math.min(seqNoList.size(), randomIntBetween(opsPerThread - 4, opsPerThread + 4)); + seqNoPerThread[t] = randomSubsetOf(size, seqNoList).toArray(new Integer[size]); + Arrays.sort(seqNoPerThread[t]); + seqNoList.removeAll(Arrays.asList(seqNoPerThread[t])); + } + seqNoPerThread[threads.length - 1] = seqNoList.toArray(new Integer[seqNoList.size()]); + logger.info("--> will run [{}] threads, maxOps [{}], unfinished seq no [{}]", threads.length, maxOps, unFinisshedSeq); + final CyclicBarrier barrier = new CyclicBarrier(threads.length); + for (int t = 0; t < threads.length; t++) { + final int threadId = t; + threads[t] = new Thread(new AbstractRunnable() { + @Override + public void onFailure(Throwable t) { + throw new ElasticsearchException("failure in background thread", t); + } + + @Override + protected void doRun() throws Exception { + barrier.await(); + Integer[] ops = seqNoPerThread[threadId]; + for (int seqNo : ops) { + if (seqNo != unFinisshedSeq) { + checkpointService.markSeqNoAsCompleted(seqNo); + logger.info("[t{}] completed [{}]", threadId, seqNo); + } + } + } + }, "testConcurrentPrimary_" + threadId); + threads[t].start(); + } + for (Thread thread : threads) { + thread.join(); + } + assertThat(checkpointService.getMaxSeqNo(), equalTo(maxOps - 1L)); + assertThat(checkpointService.getCheckpoint(), equalTo(unFinisshedSeq - 1L)); + checkpointService.markSeqNoAsCompleted(unFinisshedSeq); + assertThat(checkpointService.getCheckpoint(), equalTo(maxOps - 1L)); + } + +} \ No newline at end of file diff --git a/test-framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test-framework/src/main/java/org/elasticsearch/test/ESTestCase.java index c59c3ba4d4e6f..53f69b42ec1ca 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test-framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -29,14 +29,12 @@ import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.generators.RandomStrings; import com.carrotsearch.randomizedtesting.rules.TestRuleAdapter; - import org.apache.lucene.uninverting.UninvertingReader; import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase.SuppressCodecs; import org.apache.lucene.util.TestRuleMarkFailure; import org.apache.lucene.util.TestUtil; import org.apache.lucene.util.TimeUnits; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.bootstrap.BootstrapForTesting; import org.elasticsearch.cache.recycler.MockPageCacheRecycler; @@ -50,7 +48,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; @@ -58,11 +55,7 @@ import org.elasticsearch.test.junit.listeners.LoggingListener; import org.elasticsearch.test.junit.listeners.ReproduceInfoPrinter; import org.elasticsearch.threadpool.ThreadPool; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; +import org.junit.*; import org.junit.rules.RuleChain; import java.io.IOException; @@ -570,6 +563,19 @@ public static List randomSubsetOf(int size, T... values) { return list.subList(0, size); } + /** + * Returns size random values + */ + public static List randomSubsetOf(int size, Collection values) { + if (size > values.size()) { + throw new IllegalArgumentException("Can\'t pick " + size + " random objects from a list of " + values.size() + " objects"); + } + List list = new ArrayList<>(values); + Collections.shuffle(list); + return list.subList(0, size); + } + + /** * Returns true iff assertions for elasticsearch packages are enabled */ From 1ef8a0d342492629ba1a647574440b299a81b279 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 30 Nov 2015 14:56:20 +0100 Subject: [PATCH 3/9] Java Docs! --- .../index/seqno/LocalCheckpointService.java | 37 +++++++++++++++++++ .../elasticsearch/index/seqno/SeqNoStats.java | 2 + .../index/seqno/SequenceNumbersService.java | 7 ++++ 3 files changed, 46 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java index 49cdb7d921fdd..8dbfdd8d8f743 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java @@ -26,20 +26,43 @@ import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; +/** + * This class generates sequences numbers and keeps track of the so called local checkpoint - the highest number for which + * all previous seqNo have been processed (including) + */ public class LocalCheckpointService extends AbstractIndexShardComponent { + /** sets the maximum spread between lowest and highest seq no in flight */ public static String SETTINGS_INDEX_LAG_THRESHOLD = "index.seq_no.index_lag.threshold"; + + /** + * how long should an incoming indexing request which violates {@link #SETTINGS_INDEX_LAG_THRESHOLD } should wait + * before being rejected + */ public static String SETTINGS_INDEX_LAG_MAX_WAIT = "index.seq_no.index_lag.max_wait"; + + /** default value for {@link #SETTINGS_INDEX_LAG_THRESHOLD} */ final static int DEFAULT_INDEX_LAG_THRESHOLD = 1024; + + /** default value for {@link #SETTINGS_INDEX_LAG_MAX_WAIT} */ final static TimeValue DEFAULT_INDEX_LAG_MAX_WAIT = TimeValue.timeValueSeconds(30); + /** protects changes to all internal state and signals changes in {@link #checkpoint} */ final Object mutex = new Object(); + + /** each bits maps to a seqNo in round robin fashion. a set bit means the seqNo has been processed */ final FixedBitSet processedSeqNo; + + /** value of {@link #SETTINGS_INDEX_LAG_THRESHOLD } */ final int indexLagThreshold; + /** value of {#link #SETTINGS_INDEX_LAG_THRESHOLD } */ final TimeValue indexLagMaxWait; + /** the next available seqNo - used for seqNo generation */ volatile long nextSeqNo = 0; + + /** the current local checkpoint, i.e., all seqNo lower<= this number have been completed */ volatile long checkpoint = -1; public LocalCheckpointService(ShardId shardId, IndexSettings indexSettings) { @@ -50,6 +73,11 @@ public LocalCheckpointService(ShardId shardId, IndexSettings indexSettings) { } + /** + * issue the next sequence number + * + * Note that this method can block to honour maximum indexing lag . See {@link #SETTINGS_INDEX_LAG_THRESHOLD } + **/ public long generateSeqNo() { synchronized (mutex) { // we have to keep checking when ensure capacity returns because it release the lock and nextSeqNo may change @@ -60,6 +88,10 @@ public long generateSeqNo() { } } + /** + * marks the processing of the given seqNo have been completed + * Note that this method can block to honour maximum indexing lag . See {@link #SETTINGS_INDEX_LAG_THRESHOLD } + **/ public long markSeqNoAsCompleted(long seqNo) { synchronized (mutex) { // make sure we track highest seen seqNo @@ -88,20 +120,24 @@ public long markSeqNoAsCompleted(long seqNo) { return checkpoint; } + /** get's the current check point */ public long getCheckpoint() { return checkpoint; } + /** get's the maximum seqno seen so far */ public long getMaxSeqNo() { return nextSeqNo - 1; } + /** checks if seqNo violates {@link #SETTINGS_INDEX_LAG_THRESHOLD } */ private boolean hasCapacity(long seqNo) { assert Thread.holdsLock(mutex); return (seqNo - checkpoint) <= indexLagThreshold; } + /** blocks until {@link #SETTINGS_INDEX_LAG_THRESHOLD } is honoured or raises {@link EsRejectedExecutionException }*/ private void ensureCapacity(long seqNo) { assert Thread.holdsLock(mutex); long retry = 0; @@ -126,6 +162,7 @@ private void ensureCapacity(long seqNo) { } } + /** maps the given seqNo to a position in {@link #processedSeqNo} */ private int seqNoToOffset(long seqNo) { assert seqNo - checkpoint <= indexLagThreshold; assert seqNo > checkpoint; diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java b/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java index a2052f75976a0..faf93eb276607 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java @@ -39,10 +39,12 @@ public SeqNoStats(long maxSeqNo, long localCheckpoint) { this.localCheckpoint = localCheckpoint; } + /** the maximum sequence number seen so far */ public long getMaxSeqNo() { return maxSeqNo; } + /** the maximum sequence number for which all previous operations (including) have been completed */ public long getLocalCheckpoint() { return localCheckpoint; } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index fd6fa7387e687..3ef8607c4c230 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -42,10 +42,17 @@ public long generateSeqNo() { return localCheckpointService.generateSeqNo(); } + /** + * marks the given seqNo as completed. See {@link LocalCheckpointService#markSeqNoAsCompleted(long)} + * more details + */ public void markSeqNoAsCompleted(long seqNo) { localCheckpointService.markSeqNoAsCompleted(seqNo); } + /** + * Gets sequence number related stats + */ public SeqNoStats stats() { return new SeqNoStats(localCheckpointService.getMaxSeqNo(), localCheckpointService.getCheckpoint()); } From 4edb7aafc70a073f1c5c5694ed82be72095d2b21 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 30 Nov 2015 15:00:00 +0100 Subject: [PATCH 4/9] tweak --- .../elasticsearch/action/admin/indices/stats/ShardStats.java | 2 +- .../java/org/elasticsearch/common/io/stream/StreamOutput.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java index 3b1ff2e538de8..e921c13b979b6 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java @@ -118,7 +118,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(statePath); out.writeString(dataPath); out.writeBoolean(isCustomDataPath); - out.writeOptionalStreamable(seqNoStats); + out.writeOptionalWritable(seqNoStats); } @Override diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java index 0285e56355792..d5e96d9b0dbdb 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java @@ -506,7 +506,7 @@ public void writeOptionalStreamable(@Nullable Streamable streamable) throws IOEx /** * Serializes a potential null value. */ - public void writeOptionalStreamable(@Nullable Writeable writeable) throws IOException { + public void writeOptionalWritable(@Nullable Writeable writeable) throws IOException { if (writeable != null) { writeBoolean(true); writeable.writeTo(this); From 0b50630e4e69fb86aead55230a6a34df728891f7 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 10 Dec 2015 23:26:11 +0100 Subject: [PATCH 5/9] Another checkpoint implementation --- .../metadata/MetaDataIndexUpgradeService.java | 2 +- .../index/seqno/LocalCheckpointService.java | 159 +++++++----------- .../seqno/LocalCheckpointServiceTests.java | 133 ++++----------- 3 files changed, 94 insertions(+), 200 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java index 70a492954d120..510faebcbdad4 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java @@ -151,7 +151,7 @@ private static boolean isSupportedVersion(IndexMetaData indexMetaData) { "index.translog.interval", "index.translog.sync_interval", "index.shard.inactive_time", - LocalCheckpointService.SETTINGS_INDEX_LAG_MAX_WAIT, + LocalCheckpointService.SETTINGS_BIT_ARRAY_CHUNK_SIZE, UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING)); /** diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java index 8dbfdd8d8f743..b3a4179417f64 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java @@ -19,105 +19,66 @@ package org.elasticsearch.index.seqno; import org.apache.lucene.util.FixedBitSet; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; +import java.util.LinkedList; + /** * This class generates sequences numbers and keeps track of the so called local checkpoint - the highest number for which * all previous seqNo have been processed (including) */ public class LocalCheckpointService extends AbstractIndexShardComponent { - /** sets the maximum spread between lowest and highest seq no in flight */ - public static String SETTINGS_INDEX_LAG_THRESHOLD = "index.seq_no.index_lag.threshold"; - - /** - * how long should an incoming indexing request which violates {@link #SETTINGS_INDEX_LAG_THRESHOLD } should wait - * before being rejected - */ - public static String SETTINGS_INDEX_LAG_MAX_WAIT = "index.seq_no.index_lag.max_wait"; - - /** default value for {@link #SETTINGS_INDEX_LAG_THRESHOLD} */ - final static int DEFAULT_INDEX_LAG_THRESHOLD = 1024; - - /** default value for {@link #SETTINGS_INDEX_LAG_MAX_WAIT} */ - final static TimeValue DEFAULT_INDEX_LAG_MAX_WAIT = TimeValue.timeValueSeconds(30); + public static String SETTINGS_BIT_ARRAY_CHUNK_SIZE = "index.seq_no.checkpoint.bit_array_chunk_size"; - /** protects changes to all internal state and signals changes in {@link #checkpoint} */ - final Object mutex = new Object(); + /** default value for {@link #SETTINGS_BIT_ARRAY_CHUNK_SIZE} */ + final static int DEFAULT_BIT_ARRAY_CHUNK_SIZE = 1024; - /** each bits maps to a seqNo in round robin fashion. a set bit means the seqNo has been processed */ - final FixedBitSet processedSeqNo; - /** value of {@link #SETTINGS_INDEX_LAG_THRESHOLD } */ - final int indexLagThreshold; - /** value of {#link #SETTINGS_INDEX_LAG_THRESHOLD } */ - final TimeValue indexLagMaxWait; + final LinkedList processedSeqNo; + final int processedSeqNoChunkSize; + long minSeqNoInProcessSeqNo = 0; + /** the current local checkpoint, i.e., all seqNo lower<= this number have been completed */ + volatile long checkpoint = -1; /** the next available seqNo - used for seqNo generation */ volatile long nextSeqNo = 0; - /** the current local checkpoint, i.e., all seqNo lower<= this number have been completed */ - volatile long checkpoint = -1; public LocalCheckpointService(ShardId shardId, IndexSettings indexSettings) { super(shardId, indexSettings); - indexLagThreshold = indexSettings.getSettings().getAsInt(SETTINGS_INDEX_LAG_THRESHOLD, DEFAULT_INDEX_LAG_THRESHOLD); - indexLagMaxWait = indexSettings.getSettings().getAsTime(SETTINGS_INDEX_LAG_MAX_WAIT, DEFAULT_INDEX_LAG_MAX_WAIT); - processedSeqNo = new FixedBitSet(indexLagThreshold); - + processedSeqNoChunkSize = indexSettings.getSettings().getAsInt(SETTINGS_BIT_ARRAY_CHUNK_SIZE, DEFAULT_BIT_ARRAY_CHUNK_SIZE); + processedSeqNo = new LinkedList<>(); } /** * issue the next sequence number - * - * Note that this method can block to honour maximum indexing lag . See {@link #SETTINGS_INDEX_LAG_THRESHOLD } **/ - public long generateSeqNo() { - synchronized (mutex) { - // we have to keep checking when ensure capacity returns because it release the lock and nextSeqNo may change - while (hasCapacity(nextSeqNo) == false) { - ensureCapacity(nextSeqNo); - } - return nextSeqNo++; - } + public synchronized long generateSeqNo() { + return nextSeqNo++; } /** * marks the processing of the given seqNo have been completed - * Note that this method can block to honour maximum indexing lag . See {@link #SETTINGS_INDEX_LAG_THRESHOLD } **/ - public long markSeqNoAsCompleted(long seqNo) { - synchronized (mutex) { - // make sure we track highest seen seqNo - if (seqNo >= nextSeqNo) { - nextSeqNo = seqNo + 1; - } - if (seqNo <= checkpoint) { - // this is possible during recover where we might replay an op that was also replicated - return checkpoint; - } - // just to be safe (previous calls to generateSeqNo/markSeqNoAsStarted should ensure this is OK) - ensureCapacity(seqNo); - int offset = seqNoToOffset(seqNo); - processedSeqNo.set(offset); - if (seqNo == checkpoint + 1) { - do { - // clear the flag as we are making it free for future operations. do se before we expose it - // by moving the checkpoint - processedSeqNo.clear(offset); - checkpoint++; - offset = seqNoToOffset(checkpoint + 1); - } while (processedSeqNo.get(offset)); - mutex.notifyAll(); - } + public synchronized void markSeqNoAsCompleted(long seqNo) { + // make sure we track highest seen seqNo + if (seqNo >= nextSeqNo) { + nextSeqNo = seqNo + 1; + } + if (seqNo <= checkpoint) { + // this is possible during recover where we might replay an op that was also replicated + return; + } + FixedBitSet bitSet = getBitSetForSeqNo(seqNo); + int offset = seqNoToBitSetOffset(seqNo); + bitSet.set(offset); + if (seqNo == checkpoint + 1) { + updateCheckpoint(); } - return checkpoint; } /** get's the current check point */ @@ -130,43 +91,41 @@ public long getMaxSeqNo() { return nextSeqNo - 1; } - - /** checks if seqNo violates {@link #SETTINGS_INDEX_LAG_THRESHOLD } */ - private boolean hasCapacity(long seqNo) { - assert Thread.holdsLock(mutex); - return (seqNo - checkpoint) <= indexLagThreshold; + private void updateCheckpoint() { + assert Thread.holdsLock(this); + assert checkpoint - minSeqNoInProcessSeqNo < processedSeqNoChunkSize : "checkpoint to minSeqNoInProcessSeqNo is larger then a bit set"; + assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)) : "updateCheckpoint is called but the bit following the checkpoint is not set"; + assert getBitSetForSeqNo(checkpoint + 1) == processedSeqNo.getFirst() : "checkpoint + 1 doesn't point to the first bit set"; + // keep it simple for now, get the checkpoint one by one. in the future we can optimize and read words + FixedBitSet current = processedSeqNo.getFirst(); + do { + checkpoint++; + // the checkpoint always falls in the first bit set or just before. If it falls + // on the last bit of the current bit set, we can clean it. + if (checkpoint == minSeqNoInProcessSeqNo + processedSeqNoChunkSize - 1) { + processedSeqNo.pop(); + minSeqNoInProcessSeqNo += processedSeqNoChunkSize; + assert checkpoint - minSeqNoInProcessSeqNo < processedSeqNoChunkSize; + current = processedSeqNo.peekFirst(); + } + } while (current != null && current.get(seqNoToBitSetOffset(checkpoint + 1))); } - /** blocks until {@link #SETTINGS_INDEX_LAG_THRESHOLD } is honoured or raises {@link EsRejectedExecutionException }*/ - private void ensureCapacity(long seqNo) { - assert Thread.holdsLock(mutex); - long retry = 0; - final long maxRetries = indexLagMaxWait.seconds(); - while (hasCapacity(seqNo) == false) { - try { - if (retry > maxRetries) { - ElasticsearchException e = new EsRejectedExecutionException("indexing lag exceeds [{}] (seq# requested [{}], local checkpoint [{}]", - indexLagThreshold, seqNo, checkpoint); - e.setShard(shardId()); - throw e; - } - - // this temporary releases the lock on mutex - mutex.wait(Math.min(1000, indexLagMaxWait.millis() - retry * 1000)); - retry++; - } catch (InterruptedException ie) { - ElasticsearchException exp = new ElasticsearchException("interrupted while waiting on index lag"); - exp.setShard(shardId()); - throw exp; - } + private FixedBitSet getBitSetForSeqNo(long seqNo) { + assert Thread.holdsLock(this); + assert seqNo >= minSeqNoInProcessSeqNo; + int bitSetOffset = ((int) (seqNo - minSeqNoInProcessSeqNo)) / processedSeqNoChunkSize; + while (bitSetOffset >= processedSeqNo.size()) { + processedSeqNo.add(new FixedBitSet(processedSeqNoChunkSize)); } + return processedSeqNo.get(bitSetOffset); } - /** maps the given seqNo to a position in {@link #processedSeqNo} */ - private int seqNoToOffset(long seqNo) { - assert seqNo - checkpoint <= indexLagThreshold; - assert seqNo > checkpoint; - return (int) (seqNo % indexLagThreshold); - } + /** maps the given seqNo to a position in the bit set returned by {@link #getBitSetForSeqNo} */ + private int seqNoToBitSetOffset(long seqNo) { + assert Thread.holdsLock(this); + assert seqNo >= minSeqNoInProcessSeqNo; + return ((int) (seqNo - minSeqNoInProcessSeqNo)) % processedSeqNoChunkSize; + } } diff --git a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java index 73e7f839d1a26..53c1de7dd5208 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java @@ -20,45 +20,39 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; import org.junit.Before; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.CountDownLatch; +import java.util.*; import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.isOneOf; public class LocalCheckpointServiceTests extends ESTestCase { LocalCheckpointService checkpointService; - final int SMALL_INDEX_LAG_THRESHOLD = 10; + final int SMALL_CHUNK_SIZE = 4; @Override @Before public void setUp() throws Exception { super.setUp(); - checkpointService = getCheckpointService(SMALL_INDEX_LAG_THRESHOLD, LocalCheckpointService.DEFAULT_INDEX_LAG_MAX_WAIT); + checkpointService = getCheckpointService(); } - protected LocalCheckpointService getCheckpointService(int thresholdLag, TimeValue thresholdDelay) { + protected LocalCheckpointService getCheckpointService() { return new LocalCheckpointService( - new ShardId("test", 0), - IndexSettingsModule.newIndexSettings("test", - Settings.builder() - .put(LocalCheckpointService.SETTINGS_INDEX_LAG_THRESHOLD, thresholdLag) - .put(LocalCheckpointService.SETTINGS_INDEX_LAG_MAX_WAIT, thresholdDelay) - .build() - )); + new ShardId("test", 0), + IndexSettingsModule.newIndexSettings("test", + Settings.builder() + .put(LocalCheckpointService.SETTINGS_BIT_ARRAY_CHUNK_SIZE, SMALL_CHUNK_SIZE) + .build() + )); } public void testSimplePrimary() { @@ -88,88 +82,28 @@ public void testSimpleReplica() { assertThat(checkpointService.getCheckpoint(), equalTo(2L)); } - public void testIndexThrottleSuccessPrimary() throws Exception { - LocalCheckpointService checkpoint = getCheckpointService(3, TimeValue.timeValueHours(1)); - final long seq1 = checkpoint.generateSeqNo(); - final long seq2 = checkpoint.generateSeqNo(); - final long seq3 = checkpoint.generateSeqNo(); - final CountDownLatch threadStarted = new CountDownLatch(1); - final AtomicBoolean threadDone = new AtomicBoolean(false); - Thread backgroundThread = new Thread(() -> { - threadStarted.countDown(); - checkpoint.generateSeqNo(); - threadDone.set(true); - }, "testIndexDelayPrimary"); - backgroundThread.start(); - logger.info("--> waiting for thread to start"); - threadStarted.await(); - assertFalse("background thread finished but should have waited", threadDone.get()); - checkpoint.markSeqNoAsCompleted(seq2); - assertFalse("background thread finished but should have waited (seq2 completed)", threadDone.get()); - checkpoint.markSeqNoAsCompleted(seq1); - logger.info("--> waiting for thread to stop"); - assertBusy(() -> { - assertTrue("background thread should finished after finishing seq1", threadDone.get()); - }); - } + public void testSimpleOverFlow() { + List seqNoList = new ArrayList<>(); + final boolean aligned = randomBoolean(); + final int maxOps = SMALL_CHUNK_SIZE * randomIntBetween(1, 5) + (aligned ? 0 : randomIntBetween(1, SMALL_CHUNK_SIZE - 1)); - public void testIndexThrottleTimeoutPrimary() throws Exception { - LocalCheckpointService checkpoint = getCheckpointService(2, TimeValue.timeValueMillis(100)); - checkpoint.generateSeqNo(); - checkpoint.generateSeqNo(); - try { - checkpoint.generateSeqNo(); - fail("index operation should time out due to a large lag"); - } catch (EsRejectedExecutionException e) { - // OK! - } - } - - public void testIndexThrottleSuccessReplica() throws Exception { - LocalCheckpointService checkpoint = getCheckpointService(3, TimeValue.timeValueHours(1)); - final CountDownLatch threadStarted = new CountDownLatch(1); - final AtomicBoolean threadDone = new AtomicBoolean(false); - checkpoint.markSeqNoAsCompleted(1); - Thread backgroundThread = new Thread(() -> { - threadStarted.countDown(); - checkpoint.markSeqNoAsCompleted(3); - threadDone.set(true); - }, "testIndexDelayReplica"); - backgroundThread.start(); - logger.info("--> waiting for thread to start"); - threadStarted.await(); - assertFalse("background thread finished but should have waited", threadDone.get()); - checkpoint.markSeqNoAsCompleted(0); - logger.info("--> waiting for thread to stop"); - assertBusy(() -> { - assertTrue("background thread should finished after finishing seq1", threadDone.get()); - }); - } - - public void testIndexThrottleTimeoutReplica() throws Exception { - LocalCheckpointService checkpoint = getCheckpointService(1, TimeValue.timeValueMillis(100)); - try { - checkpoint.markSeqNoAsCompleted(1L); - fail("index operation should time out due to a large lag"); - } catch (EsRejectedExecutionException e) { - // OK! + for (int i = 0; i < maxOps; i++) { + seqNoList.add(i); } - checkpoint.markSeqNoAsCompleted(0L); - try { - checkpoint.markSeqNoAsCompleted(2L); - fail("index operation should time out due to a large lag"); - } catch (EsRejectedExecutionException e) { - // OK! + Collections.shuffle(seqNoList, random()); + for (Integer seqNo : seqNoList) { + checkpointService.markSeqNoAsCompleted(seqNo); } - + assertThat(checkpointService.checkpoint, equalTo(maxOps - 1L)); + assertThat(checkpointService.processedSeqNo.size(), equalTo(aligned ? 0 : 1)); } public void testConcurrentPrimary() throws InterruptedException { Thread[] threads = new Thread[randomIntBetween(2, 5)]; final int opsPerThread = randomIntBetween(10, 20); final int maxOps = opsPerThread * threads.length; - final long unFinisshedSeq = randomIntBetween(maxOps - SMALL_INDEX_LAG_THRESHOLD, maxOps - 2); // make sure we won't be blocked - logger.info("--> will run [{}] threads, maxOps [{}], unfinished seq no [{}]", threads.length, maxOps, unFinisshedSeq); + final long unFinishedSeq = randomIntBetween(0, maxOps - 2); // make sure we always index the last seqNo to simplify maxSeq checks + logger.info("--> will run [{}] threads, maxOps [{}], unfinished seq no [{}]", threads.length, maxOps, unFinishedSeq); final CyclicBarrier barrier = new CyclicBarrier(threads.length); for (int t = 0; t < threads.length; t++) { final int threadId = t; @@ -185,7 +119,7 @@ protected void doRun() throws Exception { for (int i = 0; i < opsPerThread; i++) { long seqNo = checkpointService.generateSeqNo(); logger.info("[t{}] started [{}]", threadId, seqNo); - if (seqNo != unFinisshedSeq) { + if (seqNo != unFinishedSeq) { checkpointService.markSeqNoAsCompleted(seqNo); logger.info("[t{}] completed [{}]", threadId, seqNo); } @@ -198,16 +132,17 @@ protected void doRun() throws Exception { thread.join(); } assertThat(checkpointService.getMaxSeqNo(), equalTo(maxOps - 1L)); - assertThat(checkpointService.getCheckpoint(), equalTo(unFinisshedSeq - 1L)); - checkpointService.markSeqNoAsCompleted(unFinisshedSeq); + assertThat(checkpointService.getCheckpoint(), equalTo(unFinishedSeq - 1L)); + checkpointService.markSeqNoAsCompleted(unFinishedSeq); assertThat(checkpointService.getCheckpoint(), equalTo(maxOps - 1L)); + assertThat(checkpointService.processedSeqNo.size(), isOneOf(0, 1)); } public void testConcurrentReplica() throws InterruptedException { Thread[] threads = new Thread[randomIntBetween(2, 5)]; final int opsPerThread = randomIntBetween(10, 20); final int maxOps = opsPerThread * threads.length; - final long unFinisshedSeq = randomIntBetween(maxOps - SMALL_INDEX_LAG_THRESHOLD, maxOps - 2); // make sure we won't be blocked + final long unFinishedSeq = randomIntBetween(0, maxOps - 2); // make sure we always index the last seqNo to simplify maxSeq checks Set seqNoList = new HashSet<>(); for (int i = 0; i < maxOps; i++) { seqNoList.add(i); @@ -221,7 +156,7 @@ public void testConcurrentReplica() throws InterruptedException { seqNoList.removeAll(Arrays.asList(seqNoPerThread[t])); } seqNoPerThread[threads.length - 1] = seqNoList.toArray(new Integer[seqNoList.size()]); - logger.info("--> will run [{}] threads, maxOps [{}], unfinished seq no [{}]", threads.length, maxOps, unFinisshedSeq); + logger.info("--> will run [{}] threads, maxOps [{}], unfinished seq no [{}]", threads.length, maxOps, unFinishedSeq); final CyclicBarrier barrier = new CyclicBarrier(threads.length); for (int t = 0; t < threads.length; t++) { final int threadId = t; @@ -236,7 +171,7 @@ protected void doRun() throws Exception { barrier.await(); Integer[] ops = seqNoPerThread[threadId]; for (int seqNo : ops) { - if (seqNo != unFinisshedSeq) { + if (seqNo != unFinishedSeq) { checkpointService.markSeqNoAsCompleted(seqNo); logger.info("[t{}] completed [{}]", threadId, seqNo); } @@ -249,9 +184,9 @@ protected void doRun() throws Exception { thread.join(); } assertThat(checkpointService.getMaxSeqNo(), equalTo(maxOps - 1L)); - assertThat(checkpointService.getCheckpoint(), equalTo(unFinisshedSeq - 1L)); - checkpointService.markSeqNoAsCompleted(unFinisshedSeq); + assertThat(checkpointService.getCheckpoint(), equalTo(unFinishedSeq - 1L)); + checkpointService.markSeqNoAsCompleted(unFinishedSeq); assertThat(checkpointService.getCheckpoint(), equalTo(maxOps - 1L)); } -} \ No newline at end of file +} From 7f49c1a6ecec69a511dc5534959e696b961b71a1 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 11 Dec 2015 09:46:07 +0100 Subject: [PATCH 6/9] feedback --- .../admin/indices/stats/ShardStats.java | 2 +- .../metadata/MetaDataIndexUpgradeService.java | 2 +- .../common/io/stream/StreamInput.java | 2 +- .../index/seqno/LocalCheckpointService.java | 50 +++++++++++++------ .../elasticsearch/index/seqno/SeqNoStats.java | 10 ++-- .../seqno/LocalCheckpointServiceTests.java | 2 +- .../org/elasticsearch/test/ESTestCase.java | 11 ++-- 7 files changed, 47 insertions(+), 32 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java index e921c13b979b6..81586f0fa7cae 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java @@ -107,7 +107,7 @@ public void readFrom(StreamInput in) throws IOException { statePath = in.readString(); dataPath = in.readString(); isCustomDataPath = in.readBoolean(); - seqNoStats = in.readOptionalStreamableReader(SeqNoStats.PROTOTYPE); + seqNoStats = in.readOptionalStreamableReader(SeqNoStats::new); } @Override diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java index 510faebcbdad4..6c1d41da98301 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java @@ -151,7 +151,7 @@ private static boolean isSupportedVersion(IndexMetaData indexMetaData) { "index.translog.interval", "index.translog.sync_interval", "index.shard.inactive_time", - LocalCheckpointService.SETTINGS_BIT_ARRAY_CHUNK_SIZE, + LocalCheckpointService.SETTINGS_BIT_ARRAYS_SIZE, UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING)); /** diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index 23d414c851909..4a47aa65f6754 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -536,7 +536,7 @@ public T readOptionalStreamable(Supplier supplier) thr /** * Serializes a potential null value. */ - public > T readOptionalStreamableReader(StreamableReader streamableReader) throws IOException { + public T readOptionalStreamableReader(StreamableReader streamableReader) throws IOException { if (readBoolean()) { return streamableReader.readFrom(this); } else { diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java index b3a4179417f64..a1835171802c2 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java @@ -31,15 +31,23 @@ */ public class LocalCheckpointService extends AbstractIndexShardComponent { - public static String SETTINGS_BIT_ARRAY_CHUNK_SIZE = "index.seq_no.checkpoint.bit_array_chunk_size"; + /** + * we keep a bit for each seq No that is still pending. to optimize allocation, we do so in multiple arrays + * allocating them on demand and cleaning up while completed. This setting controls the size of the arrays + */ + public static String SETTINGS_BIT_ARRAYS_SIZE = "index.seq_no.checkpoint.bit_arrays_size"; - /** default value for {@link #SETTINGS_BIT_ARRAY_CHUNK_SIZE} */ - final static int DEFAULT_BIT_ARRAY_CHUNK_SIZE = 1024; + /** default value for {@link #SETTINGS_BIT_ARRAYS_SIZE} */ + final static int DEFAULT_BIT_ARRAYS_SIZE = 1024; + /** + * an order list of bit arrays representing pending seq nos. The list is "anchored" in {@link #firstSeqNoInProcessSeqNo} + * which marks the seqNo the fist bit in the first array corresponds to. + */ final LinkedList processedSeqNo; - final int processedSeqNoChunkSize; - long minSeqNoInProcessSeqNo = 0; + final int bitArraysSize; + long firstSeqNoInProcessSeqNo = 0; /** the current local checkpoint, i.e., all seqNo lower<= this number have been completed */ volatile long checkpoint = -1; @@ -50,7 +58,10 @@ public class LocalCheckpointService extends AbstractIndexShardComponent { public LocalCheckpointService(ShardId shardId, IndexSettings indexSettings) { super(shardId, indexSettings); - processedSeqNoChunkSize = indexSettings.getSettings().getAsInt(SETTINGS_BIT_ARRAY_CHUNK_SIZE, DEFAULT_BIT_ARRAY_CHUNK_SIZE); + bitArraysSize = indexSettings.getSettings().getAsInt(SETTINGS_BIT_ARRAYS_SIZE, DEFAULT_BIT_ARRAYS_SIZE); + if (bitArraysSize <= 0) { + throw new IllegalArgumentException("[" + SETTINGS_BIT_ARRAYS_SIZE + "] must be positive. got [" + bitArraysSize + "]"); + } processedSeqNo = new LinkedList<>(); } @@ -70,7 +81,7 @@ public synchronized void markSeqNoAsCompleted(long seqNo) { nextSeqNo = seqNo + 1; } if (seqNo <= checkpoint) { - // this is possible during recover where we might replay an op that was also replicated + // this is possible during recovery where we might replay an op that was also replicated return; } FixedBitSet bitSet = getBitSetForSeqNo(seqNo); @@ -91,9 +102,13 @@ public long getMaxSeqNo() { return nextSeqNo - 1; } + /** + * moves the checkpoint to the last consecutively processed seqNo + * Note: this method assumes that the seqNo following the current checkpoint is processed. + */ private void updateCheckpoint() { assert Thread.holdsLock(this); - assert checkpoint - minSeqNoInProcessSeqNo < processedSeqNoChunkSize : "checkpoint to minSeqNoInProcessSeqNo is larger then a bit set"; + assert checkpoint - firstSeqNoInProcessSeqNo < bitArraysSize : "checkpoint to firstSeqNoInProcessSeqNo is larger then a bit set"; assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)) : "updateCheckpoint is called but the bit following the checkpoint is not set"; assert getBitSetForSeqNo(checkpoint + 1) == processedSeqNo.getFirst() : "checkpoint + 1 doesn't point to the first bit set"; // keep it simple for now, get the checkpoint one by one. in the future we can optimize and read words @@ -102,21 +117,24 @@ private void updateCheckpoint() { checkpoint++; // the checkpoint always falls in the first bit set or just before. If it falls // on the last bit of the current bit set, we can clean it. - if (checkpoint == minSeqNoInProcessSeqNo + processedSeqNoChunkSize - 1) { + if (checkpoint == firstSeqNoInProcessSeqNo + bitArraysSize - 1) { processedSeqNo.pop(); - minSeqNoInProcessSeqNo += processedSeqNoChunkSize; - assert checkpoint - minSeqNoInProcessSeqNo < processedSeqNoChunkSize; + firstSeqNoInProcessSeqNo += bitArraysSize; + assert checkpoint - firstSeqNoInProcessSeqNo < bitArraysSize; current = processedSeqNo.peekFirst(); } } while (current != null && current.get(seqNoToBitSetOffset(checkpoint + 1))); } + /** + * gets the bit array for the give seqNo, allocating new ones if needed. + */ private FixedBitSet getBitSetForSeqNo(long seqNo) { assert Thread.holdsLock(this); - assert seqNo >= minSeqNoInProcessSeqNo; - int bitSetOffset = ((int) (seqNo - minSeqNoInProcessSeqNo)) / processedSeqNoChunkSize; + assert seqNo >= firstSeqNoInProcessSeqNo; + int bitSetOffset = ((int) (seqNo - firstSeqNoInProcessSeqNo)) / bitArraysSize; while (bitSetOffset >= processedSeqNo.size()) { - processedSeqNo.add(new FixedBitSet(processedSeqNoChunkSize)); + processedSeqNo.add(new FixedBitSet(bitArraysSize)); } return processedSeqNo.get(bitSetOffset); } @@ -125,7 +143,7 @@ private FixedBitSet getBitSetForSeqNo(long seqNo) { /** maps the given seqNo to a position in the bit set returned by {@link #getBitSetForSeqNo} */ private int seqNoToBitSetOffset(long seqNo) { assert Thread.holdsLock(this); - assert seqNo >= minSeqNoInProcessSeqNo; - return ((int) (seqNo - minSeqNoInProcessSeqNo)) % processedSeqNoChunkSize; + assert seqNo >= firstSeqNoInProcessSeqNo; + return ((int) (seqNo - firstSeqNoInProcessSeqNo)) % bitArraysSize; } } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java b/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java index faf93eb276607..99ffb6ad54794 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java @@ -29,8 +29,6 @@ public class SeqNoStats implements ToXContent, Writeable { - public static final SeqNoStats PROTOTYPE = new SeqNoStats(0, 0); - final long maxSeqNo; final long localCheckpoint; @@ -39,6 +37,10 @@ public SeqNoStats(long maxSeqNo, long localCheckpoint) { this.localCheckpoint = localCheckpoint; } + public SeqNoStats(StreamInput in) throws IOException { + this(in.readZLong(), in.readZLong()); + } + /** the maximum sequence number seen so far */ public long getMaxSeqNo() { return maxSeqNo; @@ -56,8 +58,8 @@ public SeqNoStats readFrom(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeLong(maxSeqNo); - out.writeLong(localCheckpoint); + out.writeZLong(maxSeqNo); + out.writeZLong(localCheckpoint); } @Override diff --git a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java index 53c1de7dd5208..429c60af00d76 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java @@ -50,7 +50,7 @@ protected LocalCheckpointService getCheckpointService() { new ShardId("test", 0), IndexSettingsModule.newIndexSettings("test", Settings.builder() - .put(LocalCheckpointService.SETTINGS_BIT_ARRAY_CHUNK_SIZE, SMALL_CHUNK_SIZE) + .put(LocalCheckpointService.SETTINGS_BIT_ARRAYS_SIZE, SMALL_CHUNK_SIZE) .build() )); } diff --git a/test-framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test-framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 53f69b42ec1ca..fb626d9963516 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test-framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -555,12 +555,7 @@ private static String groupName(ThreadGroup threadGroup) { * Returns size random values */ public static List randomSubsetOf(int size, T... values) { - if (size > values.length) { - throw new IllegalArgumentException("Can\'t pick " + size + " random objects from a list of " + values.length + " objects"); - } - List list = arrayAsArrayList(values); - Collections.shuffle(list); - return list.subList(0, size); + return randomSubsetOf(size, Arrays.asList(values)); } /** @@ -571,7 +566,7 @@ public static List randomSubsetOf(int size, Collection values) { throw new IllegalArgumentException("Can\'t pick " + size + " random objects from a list of " + values.size() + " objects"); } List list = new ArrayList<>(values); - Collections.shuffle(list); + Collections.shuffle(list, random()); return list.subList(0, size); } @@ -621,7 +616,7 @@ public void assertPathHasBeenCleared(Path path) throws Exception { sb.append("]"); assertThat(count + " files exist that should have been cleaned:\n" + sb.toString(), count, equalTo(0)); } - + /** Returns the suite failure marker: internal use only! */ public static TestRuleMarkFailure getSuiteFailureMarker() { return suiteFailureMarker; From 861378c248c6dd2bb94d4e6e919772b9f4afe524 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 14 Dec 2015 09:18:45 +0100 Subject: [PATCH 7/9] feedback --- .../common/io/stream/StreamInput.java | 16 +++++++++++++--- .../index/seqno/LocalCheckpointService.java | 16 ++++++++-------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index 4a47aa65f6754..1fe06600cdefa 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -40,9 +40,19 @@ import org.joda.time.DateTime; import org.joda.time.DateTimeZone; -import java.io.*; +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.FileNotFoundException; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; import java.nio.file.NoSuchFileException; -import java.util.*; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import java.util.function.Supplier; import static org.elasticsearch.ElasticsearchException.readException; @@ -133,7 +143,7 @@ public short readShort() throws IOException { */ public int readInt() throws IOException { return ((readByte() & 0xFF) << 24) | ((readByte() & 0xFF) << 16) - | ((readByte() & 0xFF) << 8) | (readByte() & 0xFF); + | ((readByte() & 0xFF) << 8) | (readByte() & 0xFF); } /** diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java index a1835171802c2..a47fd345b86b7 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java @@ -42,14 +42,14 @@ public class LocalCheckpointService extends AbstractIndexShardComponent { /** - * an order list of bit arrays representing pending seq nos. The list is "anchored" in {@link #firstSeqNoInProcessSeqNo} + * an ordered list of bit arrays representing pending seq nos. The list is "anchored" in {@link #firstSeqNoInProcessSeqNo} * which marks the seqNo the fist bit in the first array corresponds to. */ final LinkedList processedSeqNo; final int bitArraysSize; long firstSeqNoInProcessSeqNo = 0; - /** the current local checkpoint, i.e., all seqNo lower<= this number have been completed */ + /** the current local checkpoint, i.e., all seqNo lower (<=) than this number have been completed */ volatile long checkpoint = -1; /** the next available seqNo - used for seqNo generation */ @@ -92,12 +92,12 @@ public synchronized void markSeqNoAsCompleted(long seqNo) { } } - /** get's the current check point */ + /** gets the current check point */ public long getCheckpoint() { return checkpoint; } - /** get's the maximum seqno seen so far */ + /** gets the maximum seqno seen so far */ public long getMaxSeqNo() { return nextSeqNo - 1; } @@ -108,9 +108,9 @@ public long getMaxSeqNo() { */ private void updateCheckpoint() { assert Thread.holdsLock(this); - assert checkpoint - firstSeqNoInProcessSeqNo < bitArraysSize : "checkpoint to firstSeqNoInProcessSeqNo is larger then a bit set"; + assert checkpoint < firstSeqNoInProcessSeqNo + bitArraysSize - 1 : "checkpoint should be bellow the end of the first bit set (o.w. current bit set is completed and shouldn't be there)"; + assert getBitSetForSeqNo(checkpoint + 1) == processedSeqNo.getFirst() : "checkpoint + 1 doesn't point to the first bit set (o.w. current bit set is completed and shouldn't be there)"; assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)) : "updateCheckpoint is called but the bit following the checkpoint is not set"; - assert getBitSetForSeqNo(checkpoint + 1) == processedSeqNo.getFirst() : "checkpoint + 1 doesn't point to the first bit set"; // keep it simple for now, get the checkpoint one by one. in the future we can optimize and read words FixedBitSet current = processedSeqNo.getFirst(); do { @@ -118,7 +118,7 @@ private void updateCheckpoint() { // the checkpoint always falls in the first bit set or just before. If it falls // on the last bit of the current bit set, we can clean it. if (checkpoint == firstSeqNoInProcessSeqNo + bitArraysSize - 1) { - processedSeqNo.pop(); + processedSeqNo.removeFirst(); firstSeqNoInProcessSeqNo += bitArraysSize; assert checkpoint - firstSeqNoInProcessSeqNo < bitArraysSize; current = processedSeqNo.peekFirst(); @@ -127,7 +127,7 @@ private void updateCheckpoint() { } /** - * gets the bit array for the give seqNo, allocating new ones if needed. + * gets the bit array for the given seqNo, allocating new ones if needed. */ private FixedBitSet getBitSetForSeqNo(long seqNo) { assert Thread.holdsLock(this); From 649860d273ccac49cee39bde0071ee1911f21067 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 15 Dec 2015 09:30:55 +0100 Subject: [PATCH 8/9] feedback --- .../index/seqno/LocalCheckpointService.java | 29 ++++++++++--------- .../seqno/LocalCheckpointServiceTests.java | 27 ++++++++++------- .../org/elasticsearch/test/ESTestCase.java | 14 +++++++-- 3 files changed, 43 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java index a47fd345b86b7..8b5cb4a4616a5 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java @@ -42,15 +42,15 @@ public class LocalCheckpointService extends AbstractIndexShardComponent { /** - * an ordered list of bit arrays representing pending seq nos. The list is "anchored" in {@link #firstSeqNoInProcessSeqNo} + * an ordered list of bit arrays representing pending seq nos. The list is "anchored" in {@link #firstProcessedSeqNo} * which marks the seqNo the fist bit in the first array corresponds to. */ final LinkedList processedSeqNo; final int bitArraysSize; - long firstSeqNoInProcessSeqNo = 0; + long firstProcessedSeqNo = 0; /** the current local checkpoint, i.e., all seqNo lower (<=) than this number have been completed */ - volatile long checkpoint = -1; + volatile long checkpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; /** the next available seqNo - used for seqNo generation */ volatile long nextSeqNo = 0; @@ -108,19 +108,22 @@ public long getMaxSeqNo() { */ private void updateCheckpoint() { assert Thread.holdsLock(this); - assert checkpoint < firstSeqNoInProcessSeqNo + bitArraysSize - 1 : "checkpoint should be bellow the end of the first bit set (o.w. current bit set is completed and shouldn't be there)"; - assert getBitSetForSeqNo(checkpoint + 1) == processedSeqNo.getFirst() : "checkpoint + 1 doesn't point to the first bit set (o.w. current bit set is completed and shouldn't be there)"; - assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)) : "updateCheckpoint is called but the bit following the checkpoint is not set"; + assert checkpoint < firstProcessedSeqNo + bitArraysSize - 1 : + "checkpoint should be below the end of the first bit set (o.w. current bit set is completed and shouldn't be there)"; + assert getBitSetForSeqNo(checkpoint + 1) == processedSeqNo.getFirst() : + "checkpoint + 1 doesn't point to the first bit set (o.w. current bit set is completed and shouldn't be there)"; + assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)) : + "updateCheckpoint is called but the bit following the checkpoint is not set"; // keep it simple for now, get the checkpoint one by one. in the future we can optimize and read words FixedBitSet current = processedSeqNo.getFirst(); do { checkpoint++; // the checkpoint always falls in the first bit set or just before. If it falls // on the last bit of the current bit set, we can clean it. - if (checkpoint == firstSeqNoInProcessSeqNo + bitArraysSize - 1) { + if (checkpoint == firstProcessedSeqNo + bitArraysSize - 1) { processedSeqNo.removeFirst(); - firstSeqNoInProcessSeqNo += bitArraysSize; - assert checkpoint - firstSeqNoInProcessSeqNo < bitArraysSize; + firstProcessedSeqNo += bitArraysSize; + assert checkpoint - firstProcessedSeqNo < bitArraysSize; current = processedSeqNo.peekFirst(); } } while (current != null && current.get(seqNoToBitSetOffset(checkpoint + 1))); @@ -131,8 +134,8 @@ private void updateCheckpoint() { */ private FixedBitSet getBitSetForSeqNo(long seqNo) { assert Thread.holdsLock(this); - assert seqNo >= firstSeqNoInProcessSeqNo; - int bitSetOffset = ((int) (seqNo - firstSeqNoInProcessSeqNo)) / bitArraysSize; + assert seqNo >= firstProcessedSeqNo; + int bitSetOffset = ((int) (seqNo - firstProcessedSeqNo)) / bitArraysSize; while (bitSetOffset >= processedSeqNo.size()) { processedSeqNo.add(new FixedBitSet(bitArraysSize)); } @@ -143,7 +146,7 @@ private FixedBitSet getBitSetForSeqNo(long seqNo) { /** maps the given seqNo to a position in the bit set returned by {@link #getBitSetForSeqNo} */ private int seqNoToBitSetOffset(long seqNo) { assert Thread.holdsLock(this); - assert seqNo >= firstSeqNoInProcessSeqNo; - return ((int) (seqNo - firstSeqNoInProcessSeqNo)) % bitArraysSize; + assert seqNo >= firstProcessedSeqNo; + return ((int) (seqNo - firstProcessedSeqNo)) % bitArraysSize; } } diff --git a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java index 429c60af00d76..d01938baa9748 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java @@ -26,8 +26,14 @@ import org.elasticsearch.test.IndexSettingsModule; import org.junit.Before; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; import java.util.concurrent.CyclicBarrier; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.isOneOf; @@ -96,6 +102,7 @@ public void testSimpleOverFlow() { } assertThat(checkpointService.checkpoint, equalTo(maxOps - 1L)); assertThat(checkpointService.processedSeqNo.size(), equalTo(aligned ? 0 : 1)); + assertThat(checkpointService.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE)); } public void testConcurrentPrimary() throws InterruptedException { @@ -136,6 +143,7 @@ protected void doRun() throws Exception { checkpointService.markSeqNoAsCompleted(unFinishedSeq); assertThat(checkpointService.getCheckpoint(), equalTo(maxOps - 1L)); assertThat(checkpointService.processedSeqNo.size(), isOneOf(0, 1)); + assertThat(checkpointService.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE)); } public void testConcurrentReplica() throws InterruptedException { @@ -143,19 +151,15 @@ public void testConcurrentReplica() throws InterruptedException { final int opsPerThread = randomIntBetween(10, 20); final int maxOps = opsPerThread * threads.length; final long unFinishedSeq = randomIntBetween(0, maxOps - 2); // make sure we always index the last seqNo to simplify maxSeq checks - Set seqNoList = new HashSet<>(); - for (int i = 0; i < maxOps; i++) { - seqNoList.add(i); - } + Set seqNos = IntStream.range(0, maxOps).boxed().collect(Collectors.toSet()); final Integer[][] seqNoPerThread = new Integer[threads.length][]; for (int t = 0; t < threads.length - 1; t++) { - int size = Math.min(seqNoList.size(), randomIntBetween(opsPerThread - 4, opsPerThread + 4)); - seqNoPerThread[t] = randomSubsetOf(size, seqNoList).toArray(new Integer[size]); - Arrays.sort(seqNoPerThread[t]); - seqNoList.removeAll(Arrays.asList(seqNoPerThread[t])); + int size = Math.min(seqNos.size(), randomIntBetween(opsPerThread - 4, opsPerThread + 4)); + seqNoPerThread[t] = randomSubsetOf(size, seqNos).toArray(new Integer[size]); + seqNos.removeAll(Arrays.asList(seqNoPerThread[t])); } - seqNoPerThread[threads.length - 1] = seqNoList.toArray(new Integer[seqNoList.size()]); + seqNoPerThread[threads.length - 1] = seqNos.toArray(new Integer[seqNos.size()]); logger.info("--> will run [{}] threads, maxOps [{}], unfinished seq no [{}]", threads.length, maxOps, unFinishedSeq); final CyclicBarrier barrier = new CyclicBarrier(threads.length); for (int t = 0; t < threads.length; t++) { @@ -177,7 +181,7 @@ protected void doRun() throws Exception { } } } - }, "testConcurrentPrimary_" + threadId); + }, "testConcurrentReplica_" + threadId); threads[t].start(); } for (Thread thread : threads) { @@ -187,6 +191,7 @@ protected void doRun() throws Exception { assertThat(checkpointService.getCheckpoint(), equalTo(unFinishedSeq - 1L)); checkpointService.markSeqNoAsCompleted(unFinishedSeq); assertThat(checkpointService.getCheckpoint(), equalTo(maxOps - 1L)); + assertThat(checkpointService.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE)); } } diff --git a/test-framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test-framework/src/main/java/org/elasticsearch/test/ESTestCase.java index fb626d9963516..01bc1813ab74c 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test-framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -55,21 +55,29 @@ import org.elasticsearch.test.junit.listeners.LoggingListener; import org.elasticsearch.test.junit.listeners.ReproduceInfoPrinter; import org.elasticsearch.threadpool.ThreadPool; -import org.junit.*; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.rules.RuleChain; import java.io.IOException; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.BooleanSupplier; -import static org.elasticsearch.common.util.CollectionUtils.arrayAsArrayList; import static org.hamcrest.Matchers.equalTo; /** From a4e009ea2c4edd6ecf0f2d68e230f633c07a3a20 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 15 Dec 2015 14:32:28 +0100 Subject: [PATCH 9/9] formatting --- .../metadata/MetaDataIndexUpgradeService.java | 78 +++++++++---------- 1 file changed, 39 insertions(+), 39 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java index 6c1d41da98301..c1e159b2185d7 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java @@ -93,8 +93,8 @@ private boolean isUpgraded(IndexMetaData indexMetaData) { private void checkSupportedVersion(IndexMetaData indexMetaData) { if (indexMetaData.getState() == IndexMetaData.State.OPEN && isSupportedVersion(indexMetaData) == false) { throw new IllegalStateException("The index [" + indexMetaData.getIndex() + "] was created before v2.0.0.beta1 and wasn't upgraded." - + " This index should be open using a version before " + Version.CURRENT.minimumCompatibilityVersion() - + " and upgraded using the upgrade API."); + + " This index should be open using a version before " + Version.CURRENT.minimumCompatibilityVersion() + + " and upgraded using the upgrade API."); } } @@ -107,7 +107,7 @@ private static boolean isSupportedVersion(IndexMetaData indexMetaData) { return true; } if (indexMetaData.getMinimumCompatibleVersion() != null && - indexMetaData.getMinimumCompatibleVersion().onOrAfter(org.apache.lucene.util.Version.LUCENE_5_0_0)) { + indexMetaData.getMinimumCompatibleVersion().onOrAfter(org.apache.lucene.util.Version.LUCENE_5_0_0)) { //The index was upgraded we can work with it return true; } @@ -116,43 +116,43 @@ private static boolean isSupportedVersion(IndexMetaData indexMetaData) { /** All known byte-sized settings for an index. */ public static final Set INDEX_BYTES_SIZE_SETTINGS = unmodifiableSet(newHashSet( - "index.merge.policy.floor_segment", - "index.merge.policy.max_merged_segment", - "index.merge.policy.max_merge_size", - "index.merge.policy.min_merge_size", - "index.shard.recovery.file_chunk_size", - "index.shard.recovery.translog_size", - "index.store.throttle.max_bytes_per_sec", - "index.translog.flush_threshold_size", - "index.translog.fs.buffer_size", - "index.version_map_size")); + "index.merge.policy.floor_segment", + "index.merge.policy.max_merged_segment", + "index.merge.policy.max_merge_size", + "index.merge.policy.min_merge_size", + "index.shard.recovery.file_chunk_size", + "index.shard.recovery.translog_size", + "index.store.throttle.max_bytes_per_sec", + "index.translog.flush_threshold_size", + "index.translog.fs.buffer_size", + "index.version_map_size")); /** All known time settings for an index. */ public static final Set INDEX_TIME_SETTINGS = unmodifiableSet(newHashSet( - "index.gateway.wait_for_mapping_update_post_recovery", - "index.shard.wait_for_mapping_update_post_recovery", - "index.gc_deletes", - "index.indexing.slowlog.threshold.index.debug", - "index.indexing.slowlog.threshold.index.info", - "index.indexing.slowlog.threshold.index.trace", - "index.indexing.slowlog.threshold.index.warn", - "index.refresh_interval", - "index.search.slowlog.threshold.fetch.debug", - "index.search.slowlog.threshold.fetch.info", - "index.search.slowlog.threshold.fetch.trace", - "index.search.slowlog.threshold.fetch.warn", - "index.search.slowlog.threshold.query.debug", - "index.search.slowlog.threshold.query.info", - "index.search.slowlog.threshold.query.trace", - "index.search.slowlog.threshold.query.warn", - "index.shadow.wait_for_initial_commit", - "index.store.stats_refresh_interval", - "index.translog.flush_threshold_period", - "index.translog.interval", - "index.translog.sync_interval", - "index.shard.inactive_time", + "index.gateway.wait_for_mapping_update_post_recovery", + "index.shard.wait_for_mapping_update_post_recovery", + "index.gc_deletes", + "index.indexing.slowlog.threshold.index.debug", + "index.indexing.slowlog.threshold.index.info", + "index.indexing.slowlog.threshold.index.trace", + "index.indexing.slowlog.threshold.index.warn", + "index.refresh_interval", + "index.search.slowlog.threshold.fetch.debug", + "index.search.slowlog.threshold.fetch.info", + "index.search.slowlog.threshold.fetch.trace", + "index.search.slowlog.threshold.fetch.warn", + "index.search.slowlog.threshold.query.debug", + "index.search.slowlog.threshold.query.info", + "index.search.slowlog.threshold.query.trace", + "index.search.slowlog.threshold.query.warn", + "index.shadow.wait_for_initial_commit", + "index.store.stats_refresh_interval", + "index.translog.flush_threshold_period", + "index.translog.interval", + "index.translog.sync_interval", + "index.shard.inactive_time", LocalCheckpointService.SETTINGS_BIT_ARRAYS_SIZE, - UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING)); + UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING)); /** * Elasticsearch 2.0 requires units on byte/memory and time settings; this method adds the default unit to any such settings that are @@ -201,9 +201,9 @@ private IndexMetaData addDefaultUnitsIfNeeded(IndexMetaData indexMetaData) { if (newSettings != null) { // At least one setting was changed: return IndexMetaData.builder(indexMetaData) - .version(indexMetaData.getVersion()) - .settings(newSettings.build()) - .build(); + .version(indexMetaData.getVersion()) + .settings(newSettings.build()) + .build(); } }