Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Local checkpoints #15390

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.action.admin.cluster.stats;

import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
Expand All @@ -30,6 +29,7 @@
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterStateHealth;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -105,7 +105,8 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
for (IndexShard indexShard : indexService) {
if (indexShard.routingEntry() != null && indexShard.routingEntry().active()) {
// only report on fully started shards
shardsStats.add(new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indexShard, SHARD_STATS_FLAGS), indexShard.commitStats()));
shardsStats.add(new ShardStats(indexShard.routingEntry(), indexShard.shardPath(),
new CommonStats(indexShard, SHARD_STATS_FLAGS), indexShard.commitStats(), indexShard.seqNoStats()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ public static enum Flag {
RequestCache("request_cache"),
Recovery("recovery");


private final String restName;

Flag(String restName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.ShardPath;

import java.io.IOException;
Expand All @@ -42,20 +42,23 @@ public class ShardStats implements Streamable, ToXContent {
private CommonStats commonStats;
@Nullable
private CommitStats commitStats;
@Nullable
private SeqNoStats seqNoStats;
private String dataPath;
private String statePath;
private boolean isCustomDataPath;

ShardStats() {
}

public ShardStats(ShardRouting routing, ShardPath shardPath, CommonStats commonStats, CommitStats commitStats) {
public ShardStats(ShardRouting routing, ShardPath shardPath, CommonStats commonStats, CommitStats commitStats, SeqNoStats seqNoStats) {
this.shardRouting = routing;
this.dataPath = shardPath.getRootDataPath().toString();
this.statePath = shardPath.getRootStatePath().toString();
this.isCustomDataPath = shardPath.isCustomDataPath();
this.commitStats = commitStats;
this.commonStats = commonStats;
this.seqNoStats = seqNoStats;
}

/**
Expand All @@ -73,6 +76,11 @@ public CommitStats getCommitStats() {
return this.commitStats;
}

@Nullable
public SeqNoStats getSeqNoStats() {
return this.seqNoStats;
}

public String getDataPath() {
return dataPath;
}
Expand All @@ -99,6 +107,7 @@ public void readFrom(StreamInput in) throws IOException {
statePath = in.readString();
dataPath = in.readString();
isCustomDataPath = in.readBoolean();
seqNoStats = in.readOptionalStreamableReader(SeqNoStats::new);
}

@Override
Expand All @@ -109,6 +118,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(statePath);
out.writeString(dataPath);
out.writeBoolean(isCustomDataPath);
out.writeOptionalWritable(seqNoStats);
}

@Override
Expand All @@ -124,6 +134,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (commitStats != null) {
commitStats.toXContent(builder, params);
}
if (seqNoStats != null) {
seqNoStats.toXContent(builder, params);
}
builder.startObject(Fields.SHARD_PATH);
builder.field(Fields.STATE_PATH, statePath);
builder.field(Fields.DATA_PATH, dataPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ protected ShardStats shardOperation(IndicesStatsRequest request, ShardRouting sh
flags.set(CommonStatsFlags.Flag.Recovery);
}

return new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indexShard, flags), indexShard.commitStats());
return new ShardStats(indexShard.routingEntry(), indexShard.shardPath(),
new CommonStats(indexShard, flags), indexShard.commitStats(), indexShard.seqNoStats());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.elasticsearch.cluster.metadata;

import com.carrotsearch.hppc.cursors.ObjectCursor;

import org.apache.lucene.analysis.Analyzer;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.routing.UnassignedInfo;
Expand All @@ -30,6 +29,7 @@
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.LocalCheckpointService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.indices.mapper.MapperRegistry;

Expand Down Expand Up @@ -116,42 +116,43 @@ private static boolean isSupportedVersion(IndexMetaData indexMetaData) {

/** All known byte-sized settings for an index. */
public static final Set<String> INDEX_BYTES_SIZE_SETTINGS = unmodifiableSet(newHashSet(
"index.merge.policy.floor_segment",
"index.merge.policy.max_merged_segment",
"index.merge.policy.max_merge_size",
"index.merge.policy.min_merge_size",
"index.shard.recovery.file_chunk_size",
"index.shard.recovery.translog_size",
"index.store.throttle.max_bytes_per_sec",
"index.translog.flush_threshold_size",
"index.translog.fs.buffer_size",
"index.version_map_size"));
"index.merge.policy.floor_segment",
"index.merge.policy.max_merged_segment",
"index.merge.policy.max_merge_size",
"index.merge.policy.min_merge_size",
"index.shard.recovery.file_chunk_size",
"index.shard.recovery.translog_size",
"index.store.throttle.max_bytes_per_sec",
"index.translog.flush_threshold_size",
"index.translog.fs.buffer_size",
"index.version_map_size"));

/** All known time settings for an index. */
public static final Set<String> INDEX_TIME_SETTINGS = unmodifiableSet(newHashSet(
"index.gateway.wait_for_mapping_update_post_recovery",
"index.shard.wait_for_mapping_update_post_recovery",
"index.gc_deletes",
"index.indexing.slowlog.threshold.index.debug",
"index.indexing.slowlog.threshold.index.info",
"index.indexing.slowlog.threshold.index.trace",
"index.indexing.slowlog.threshold.index.warn",
"index.refresh_interval",
"index.search.slowlog.threshold.fetch.debug",
"index.search.slowlog.threshold.fetch.info",
"index.search.slowlog.threshold.fetch.trace",
"index.search.slowlog.threshold.fetch.warn",
"index.search.slowlog.threshold.query.debug",
"index.search.slowlog.threshold.query.info",
"index.search.slowlog.threshold.query.trace",
"index.search.slowlog.threshold.query.warn",
"index.shadow.wait_for_initial_commit",
"index.store.stats_refresh_interval",
"index.translog.flush_threshold_period",
"index.translog.interval",
"index.translog.sync_interval",
"index.shard.inactive_time",
UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING));
"index.gateway.wait_for_mapping_update_post_recovery",
"index.shard.wait_for_mapping_update_post_recovery",
"index.gc_deletes",
"index.indexing.slowlog.threshold.index.debug",
"index.indexing.slowlog.threshold.index.info",
"index.indexing.slowlog.threshold.index.trace",
"index.indexing.slowlog.threshold.index.warn",
"index.refresh_interval",
"index.search.slowlog.threshold.fetch.debug",
"index.search.slowlog.threshold.fetch.info",
"index.search.slowlog.threshold.fetch.trace",
"index.search.slowlog.threshold.fetch.warn",
"index.search.slowlog.threshold.query.debug",
"index.search.slowlog.threshold.query.info",
"index.search.slowlog.threshold.query.trace",
"index.search.slowlog.threshold.query.warn",
"index.shadow.wait_for_initial_commit",
"index.store.stats_refresh_interval",
"index.translog.flush_threshold_period",
"index.translog.interval",
"index.translog.sync_interval",
"index.shard.inactive_time",
LocalCheckpointService.SETTINGS_BIT_ARRAYS_SIZE,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: The indentation is off on this line.

UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING));

/**
* Elasticsearch 2.0 requires units on byte/memory and time settings; this method adds the default unit to any such settings that are
Expand All @@ -163,7 +164,7 @@ private IndexMetaData addDefaultUnitsIfNeeded(IndexMetaData indexMetaData) {
// Created lazily if we find any settings that are missing units:
Settings settings = indexMetaData.getSettings();
Settings.Builder newSettings = null;
for(String byteSizeSetting : INDEX_BYTES_SIZE_SETTINGS) {
for (String byteSizeSetting : INDEX_BYTES_SIZE_SETTINGS) {
String value = settings.get(byteSizeSetting);
if (value != null) {
try {
Expand All @@ -180,7 +181,7 @@ private IndexMetaData addDefaultUnitsIfNeeded(IndexMetaData indexMetaData) {
newSettings.put(byteSizeSetting, value + "b");
}
}
for(String timeSetting : INDEX_TIME_SETTINGS) {
for (String timeSetting : INDEX_TIME_SETTINGS) {
String value = settings.get(timeSetting);
if (value != null) {
try {
Expand All @@ -200,9 +201,9 @@ private IndexMetaData addDefaultUnitsIfNeeded(IndexMetaData indexMetaData) {
if (newSettings != null) {
// At least one setting was changed:
return IndexMetaData.builder(indexMetaData)
.version(indexMetaData.getVersion())
.settings(newSettings.build())
.build();
.version(indexMetaData.getVersion())
.settings(newSettings.build())
.build();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public short readShort() throws IOException {
*/
public int readInt() throws IOException {
return ((readByte() & 0xFF) << 24) | ((readByte() & 0xFF) << 16)
| ((readByte() & 0xFF) << 8) | (readByte() & 0xFF);
| ((readByte() & 0xFF) << 8) | (readByte() & 0xFF);
}

/**
Expand Down Expand Up @@ -543,6 +543,17 @@ public <T extends Streamable> T readOptionalStreamable(Supplier<T> supplier) thr
}
}

/**
* Serializes a potential null value.
*/
public <T> T readOptionalStreamableReader(StreamableReader<T> streamableReader) throws IOException {
if (readBoolean()) {
return streamableReader.readFrom(this);
} else {
return null;
}
}

public <T extends Throwable> T readThrowable() throws IOException {
if (readBoolean()) {
int key = readVInt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,18 @@ public void writeOptionalStreamable(@Nullable Streamable streamable) throws IOEx
}
}

/**
* Serializes a potential null value.
*/
public void writeOptionalWritable(@Nullable Writeable writeable) throws IOException {
if (writeable != null) {
writeBoolean(true);
writeable.writeTo(this);
} else {
writeBoolean(false);
}
}

public void writeThrowable(Throwable throwable) throws IOException {
if (throwable == null) {
writeBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,17 @@
public class EsRejectedExecutionException extends ElasticsearchException {
private final boolean isExecutorShutdown;

public EsRejectedExecutionException(String message, boolean isExecutorShutdown) {
super(message);
public EsRejectedExecutionException(String message, boolean isExecutorShutdown, Object... args) {
super(message, args);
this.isExecutorShutdown = isExecutorShutdown;
}

public EsRejectedExecutionException(String message) {
this(message, false);
public EsRejectedExecutionException(String message, Object... args) {
this(message, false, args);
}

public EsRejectedExecutionException(String message, boolean isExecutorShutdown) {
this(message, isExecutorShutdown, new Object[0]);
}

public EsRejectedExecutionException() {
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -325,6 +326,9 @@ public CommitStats commitStats() {
return new CommitStats(getLastCommittedSegmentInfos());
}

/** get sequence number related stats */
public abstract SeqNoStats seqNoStats();

/**
* Read the last segments info from the commit pointed to by the searcher manager
*/
Expand Down
Loading