diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index a772d33d685dd..bea1327427511 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -1223,6 +1223,7 @@ protected static boolean isXPackTemplate(String name) { case "metrics": case "metrics-settings": case "metrics-mappings": + case ".snapshot-blob-cache": return true; default: return false; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java index eaec3d0b1e25e..d5c817f33e95b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java @@ -69,6 +69,7 @@ public static Map filterSecurityHeaders(Map head public static final String ASYNC_SEARCH_ORIGIN = "async_search"; public static final String IDP_ORIGIN = "idp"; public static final String STACK_ORIGIN = "stack"; + public static final String SEARCHABLE_SNAPSHOTS_ORIGIN = "searchable_snapshots"; private ClientHelper() {} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/searchablesnapshots/SearchableSnapshotShardStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/searchablesnapshots/SearchableSnapshotShardStats.java index 4d66aa351332c..9ddccc69414be 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/searchablesnapshots/SearchableSnapshotShardStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/searchablesnapshots/SearchableSnapshotShardStats.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.searchablesnapshots; +import org.elasticsearch.Version; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -134,16 +135,20 @@ public static class CacheIndexInputStats implements Writeable, ToXContentObject private final Counter contiguousReads; private final Counter nonContiguousReads; private final Counter cachedBytesRead; + private final Counter indexCacheBytesRead; private final TimedCounter cachedBytesWritten; private final TimedCounter directBytesRead; private final TimedCounter optimizedBytesRead; + private final Counter blobStoreBytesRequested; + private final long currentIndexCacheFills; public CacheIndexInputStats(String fileName, long fileLength, long openCount, long closeCount, Counter forwardSmallSeeks, Counter backwardSmallSeeks, Counter forwardLargeSeeks, Counter backwardLargeSeeks, Counter contiguousReads, Counter nonContiguousReads, - Counter cachedBytesRead, TimedCounter cachedBytesWritten, - TimedCounter directBytesRead, TimedCounter optimizedBytesRead) { + Counter cachedBytesRead, Counter indexCacheBytesRead, + TimedCounter cachedBytesWritten, TimedCounter directBytesRead, TimedCounter optimizedBytesRead, + Counter blobStoreBytesRequested, long currentIndexCacheFills) { this.fileName = fileName; this.fileLength = fileLength; this.openCount = openCount; @@ -155,9 +160,12 @@ public CacheIndexInputStats(String fileName, long fileLength, long openCount, lo this.contiguousReads = contiguousReads; this.nonContiguousReads = nonContiguousReads; this.cachedBytesRead = cachedBytesRead; + this.indexCacheBytesRead = indexCacheBytesRead; this.cachedBytesWritten = cachedBytesWritten; this.directBytesRead = directBytesRead; this.optimizedBytesRead = optimizedBytesRead; + this.blobStoreBytesRequested = blobStoreBytesRequested; + this.currentIndexCacheFills = currentIndexCacheFills; } CacheIndexInputStats(final StreamInput in) throws IOException { @@ -172,9 +180,21 @@ public CacheIndexInputStats(String fileName, long fileLength, long openCount, lo this.contiguousReads = new Counter(in); this.nonContiguousReads = new Counter(in); this.cachedBytesRead = new Counter(in); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + this.indexCacheBytesRead = new Counter(in); + } else { + this.indexCacheBytesRead = new Counter(0, 0, 0, 0); + } this.cachedBytesWritten = new TimedCounter(in); this.directBytesRead = new TimedCounter(in); this.optimizedBytesRead = new TimedCounter(in); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + this.blobStoreBytesRequested = new Counter(in); + this.currentIndexCacheFills = in.readVLong(); + } else { + this.blobStoreBytesRequested = new Counter(0, 0, 0, 0); + this.currentIndexCacheFills = 0; + } } @Override @@ -191,9 +211,16 @@ public void writeTo(StreamOutput out) throws IOException { contiguousReads.writeTo(out); nonContiguousReads.writeTo(out); cachedBytesRead.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + indexCacheBytesRead.writeTo(out); + } cachedBytesWritten.writeTo(out); directBytesRead.writeTo(out); optimizedBytesRead.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + blobStoreBytesRequested.writeTo(out); + out.writeVLong(currentIndexCacheFills); + } } public String getFileName() { @@ -240,6 +267,10 @@ public Counter getCachedBytesRead() { return cachedBytesRead; } + public Counter getIndexCacheBytesRead() { + return indexCacheBytesRead; + } + public TimedCounter getCachedBytesWritten() { return cachedBytesWritten; } @@ -252,6 +283,14 @@ public TimedCounter getOptimizedBytesRead() { return optimizedBytesRead; } + public Counter getBlobStoreBytesRequested() { + return blobStoreBytesRequested; + } + + public long getCurrentIndexCacheFills() { + return currentIndexCacheFills; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -263,6 +302,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("contiguous_bytes_read", getContiguousReads()); builder.field("non_contiguous_bytes_read", getNonContiguousReads()); builder.field("cached_bytes_read", getCachedBytesRead()); + builder.field("index_cache_bytes_read", getIndexCacheBytesRead()); builder.field("cached_bytes_written", getCachedBytesWritten()); builder.field("direct_bytes_read", getDirectBytesRead()); builder.field("optimized_bytes_read", getOptimizedBytesRead()); @@ -278,6 +318,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("large", getBackwardLargeSeeks()); builder.endObject(); } + builder.field("blob_store_bytes_requested", getBlobStoreBytesRequested()); + builder.field("current_index_cache_fills", getCurrentIndexCacheFills()); } return builder.endObject(); } @@ -302,9 +344,12 @@ public boolean equals(Object other) { && Objects.equals(contiguousReads, stats.contiguousReads) && Objects.equals(nonContiguousReads, stats.nonContiguousReads) && Objects.equals(cachedBytesRead, stats.cachedBytesRead) + && Objects.equals(indexCacheBytesRead, stats.indexCacheBytesRead) && Objects.equals(cachedBytesWritten, stats.cachedBytesWritten) && Objects.equals(directBytesRead, stats.directBytesRead) - && Objects.equals(optimizedBytesRead, stats.optimizedBytesRead); + && Objects.equals(optimizedBytesRead, stats.optimizedBytesRead) + && Objects.equals(blobStoreBytesRequested, stats.blobStoreBytesRequested) + && currentIndexCacheFills == stats.currentIndexCacheFills; } @Override @@ -313,8 +358,9 @@ public int hashCode() { forwardSmallSeeks, backwardSmallSeeks, forwardLargeSeeks, backwardLargeSeeks, contiguousReads, nonContiguousReads, - cachedBytesRead, cachedBytesWritten, - directBytesRead, optimizedBytesRead); + cachedBytesRead, indexCacheBytesRead, + cachedBytesWritten, directBytesRead, optimizedBytesRead, + blobStoreBytesRequested, currentIndexCacheFills); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java index e8322010c5d20..d160a91883a85 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java @@ -42,4 +42,6 @@ public static boolean isSearchableSnapshotStore(Settings indexSettings) { public static final String CACHE_PREWARMING_THREAD_POOL_NAME = "searchable_snapshots_cache_prewarming"; public static final String CACHE_PREWARMING_THREAD_POOL_SETTING = "xpack.searchable_snapshots.cache_prewarming_thread_pool"; + + public static final String SNAPSHOT_BLOB_CACHE_INDEX = ".snapshot-blob-cache"; } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/searchablesnapshots/SearchableSnapshotShardStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/searchablesnapshots/SearchableSnapshotShardStatsTests.java index dade0a0ca4204..d57d0746db417 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/searchablesnapshots/SearchableSnapshotShardStatsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/searchablesnapshots/SearchableSnapshotShardStatsTests.java @@ -46,8 +46,9 @@ private CacheIndexInputStats randomCacheIndexInputStats() { randomCounter(), randomCounter(), randomCounter(), randomCounter(), randomCounter(), randomCounter(), - randomCounter(), randomTimedCounter(), - randomTimedCounter(), randomTimedCounter()); + randomCounter(), randomCounter(), randomTimedCounter(), + randomTimedCounter(), randomTimedCounter(), + randomCounter(), randomNonNegativeLong()); } private Counter randomCounter() { diff --git a/x-pack/plugin/searchable-snapshots/qa/rest/src/test/resources/rest-api-spec/test/stats.yml b/x-pack/plugin/searchable-snapshots/qa/rest/src/test/resources/rest-api-spec/test/stats.yml index 6f3cd6271eefb..a0256f8b483aa 100644 --- a/x-pack/plugin/searchable-snapshots/qa/rest/src/test/resources/rest-api-spec/test/stats.yml +++ b/x-pack/plugin/searchable-snapshots/qa/rest/src/test/resources/rest-api-spec/test/stats.yml @@ -164,6 +164,11 @@ teardown: - gte: { indices.docs.shards.0.0.files.0.cached_bytes_read.min: 0 } - gte: { indices.docs.shards.0.0.files.0.cached_bytes_read.max: 0 } + - gte: { indices.docs.shards.0.0.files.0.index_cache_bytes_read.count: 0 } + - gte: { indices.docs.shards.0.0.files.0.index_cache_bytes_read.sum: 0 } + - gte: { indices.docs.shards.0.0.files.0.index_cache_bytes_read.min: 0 } + - gte: { indices.docs.shards.0.0.files.0.index_cache_bytes_read.max: 0 } + - gte: { indices.docs.shards.0.0.files.0.cached_bytes_written.count: 0 } - gte: { indices.docs.shards.0.0.files.0.cached_bytes_written.sum: 0 } - gte: { indices.docs.shards.0.0.files.0.cached_bytes_written.min: 0 } @@ -203,6 +208,13 @@ teardown: - gte: { indices.docs.shards.0.0.files.0.backward_seeks.large.min: 0 } - gte: { indices.docs.shards.0.0.files.0.backward_seeks.large.max: 0 } + - gte: { indices.docs.shards.0.0.files.0.blob_store_bytes_requested.count: 0 } + - gte: { indices.docs.shards.0.0.files.0.blob_store_bytes_requested.sum: 0 } + - gte: { indices.docs.shards.0.0.files.0.blob_store_bytes_requested.min: 0 } + - gte: { indices.docs.shards.0.0.files.0.blob_store_bytes_requested.max: 0 } + + - gte: { indices.docs.shards.0.0.files.0.current_index_cache_fills: 0 } + - do: searchable_snapshots.stats: index: "d*" diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java new file mode 100644 index 0000000000000..f000f06e5c1b4 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java @@ -0,0 +1,349 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.blobstore.cache; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.TransportActions; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.node.NodeClosedException; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ConnectTransportException; + +import java.io.IOException; +import java.time.Instant; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; +import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN; + +public class BlobStoreCacheService extends AbstractLifecycleComponent implements ClusterStateListener { + + private static final Logger logger = LogManager.getLogger(BlobStoreCacheService.class); + + public static final int DEFAULT_CACHED_BLOB_SIZE = Math.toIntExact(ByteSizeUnit.KB.toBytes(4L)); + + private final ClusterService clusterService; + private final ThreadPool threadPool; + private final AtomicBoolean ready; + private final Client client; + private final String index; + + public BlobStoreCacheService(ClusterService clusterService, ThreadPool threadPool, Client client, String index) { + this.client = new OriginSettingClient(client, SEARCHABLE_SNAPSHOTS_ORIGIN); + this.ready = new AtomicBoolean(false); + this.clusterService = clusterService; + this.threadPool = threadPool; + this.index = index; + } + + @Override + protected void doStart() { + clusterService.addListener(this); + } + + @Override + protected void doStop() { + clusterService.removeListener(this); + } + + @Override + protected void doClose() {} + + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (lifecycle.started() == false || event.routingTableChanged() == false) { + return; + } + if (event.indexRoutingTableChanged(index)) { + final IndexRoutingTable indexRoutingTable = event.state().routingTable().index(index); + if (indexRoutingTable == null) { + ready.set(false); + return; + } + ready.set(indexRoutingTable.allPrimaryShardsActive()); + } + } + + private void createIndexIfNecessary(ActionListener listener) { + if (clusterService.state().routingTable().hasIndex(index)) { + listener.onResponse(index); + return; + } + try { + client.admin() + .indices() + .prepareCreate(index) + .setSettings(indexSettings()) + .setMapping(mappings()) + .execute(new ActionListener<>() { + @Override + public void onResponse(CreateIndexResponse createIndexResponse) { + assert createIndexResponse.index().equals(index); + listener.onResponse(createIndexResponse.index()); + } + + @Override + public void onFailure(Exception e) { + if (e instanceof ResourceAlreadyExistsException + || ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) { + listener.onResponse(index); + } else { + listener.onFailure(e); + } + } + }); + } catch (Exception e) { + listener.onFailure(e); + } + } + + private static Settings indexSettings() { + return Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1") + .put(IndexMetadata.SETTING_PRIORITY, "900") + .build(); + } + + private static XContentBuilder mappings() throws IOException { + final XContentBuilder builder = jsonBuilder(); + { + builder.startObject(); + { + builder.startObject(SINGLE_MAPPING_NAME); + builder.field("dynamic", "strict"); + { + builder.startObject("_meta"); + builder.field("version", Version.CURRENT); + builder.endObject(); + } + { + builder.startObject("properties"); + { + builder.startObject("type"); + builder.field("type", "keyword"); + builder.endObject(); + } + { + builder.startObject("creation_time"); + builder.field("type", "date"); + builder.field("format", "epoch_millis"); + builder.endObject(); + } + { + builder.startObject("version"); + builder.field("type", "integer"); + builder.endObject(); + } + { + builder.startObject("repository"); + builder.field("type", "keyword"); + builder.endObject(); + } + { + builder.startObject("blob"); + builder.field("type", "object"); + { + builder.startObject("properties"); + { + builder.startObject("name"); + builder.field("type", "keyword"); + builder.endObject(); + builder.startObject("path"); + builder.field("type", "keyword"); + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + { + builder.startObject("data"); + builder.field("type", "object"); + { + builder.startObject("properties"); + { + builder.startObject("content"); + builder.field("type", "binary"); + builder.endObject(); + } + { + builder.startObject("length"); + builder.field("type", "long"); + builder.endObject(); + } + { + builder.startObject("from"); + builder.field("type", "long"); + builder.endObject(); + } + { + builder.startObject("to"); + builder.field("type", "long"); + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + return builder; + } + + public CachedBlob get(String repository, String name, String path, long offset) { + assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.SYSTEM_READ + ']') == false : "must not block [" + + Thread.currentThread().getName() + + "] for a cache read"; + + final PlainActionFuture future = PlainActionFuture.newFuture(); + getAsync(repository, name, path, offset, future); + try { + return future.actionGet(5, TimeUnit.SECONDS); + } catch (ElasticsearchTimeoutException e) { + if (logger.isDebugEnabled()) { + logger.warn( + () -> new ParameterizedMessage( + "get from cache index timed out after [5s], retrieving from blob store instead [id={}]", + CachedBlob.generateId(repository, name, path, offset) + ), + e + ); + } else { + logger.warn("get from cache index timed out after [5s], retrieving from blob store instead"); + } + return CachedBlob.CACHE_NOT_READY; + } + } + + protected void getAsync(String repository, String name, String path, long offset, ActionListener listener) { + if ((lifecycle.started() && ready.get()) == false) { + // TODO TBD can we just execute the GET request and let it fail if the index isn't ready yet? + // We might get lucky and hit a started shard anyway. + logger.debug("not ready : [{}]", CachedBlob.generateId(repository, name, path, offset)); + listener.onResponse(CachedBlob.CACHE_NOT_READY); + return; + } + final GetRequest request = new GetRequest(index).id(CachedBlob.generateId(repository, name, path, offset)); + client.get(request, new ActionListener<>() { + @Override + public void onResponse(GetResponse response) { + if (response.isExists()) { + logger.debug("cache hit : [{}]", request.id()); + assert response.isSourceEmpty() == false; + + final CachedBlob cachedBlob = CachedBlob.fromSource(response.getSource()); + assert response.getId().equals(cachedBlob.generatedId()); + listener.onResponse(cachedBlob); + } else { + logger.debug("cache miss: [{}]", request.id()); + listener.onResponse(CachedBlob.CACHE_MISS); + } + } + + @Override + public void onFailure(Exception e) { + // In case the blob cache system index is unavailable, we indicate it's not ready and move on. We do not fail the request: + // a failure here is not fatal since the data exists in the blob store, so we can simply indicate the cache is not ready. + if (isExpectedCacheGetException(e)) { + logger.debug(() -> new ParameterizedMessage("failed to retrieve cached blob from system index [{}]", index), e); + } else { + logger.warn(() -> new ParameterizedMessage("failed to retrieve cached blob from system index [{}]", index), e); + assert false : e; + } + listener.onResponse(CachedBlob.CACHE_NOT_READY); + } + }); + } + + private static boolean isExpectedCacheGetException(Exception e) { + return TransportActions.isShardNotAvailableException(e) + || e instanceof ConnectTransportException + || ExceptionsHelper.unwrapCause(e) instanceof NodeClosedException; + } + + public void putAsync(String repository, String name, String path, long offset, BytesReference content, ActionListener listener) { + createIndexIfNecessary(new ActionListener<>() { + @Override + public void onResponse(String s) { + final IndexRequest request; + try { + final CachedBlob cachedBlob = new CachedBlob( + Instant.ofEpochMilli(threadPool.absoluteTimeInMillis()), + Version.CURRENT, + repository, + name, + path, + content, + offset + ); + request = new IndexRequest(index).id(cachedBlob.generatedId()); + try (XContentBuilder builder = jsonBuilder()) { + request.source(cachedBlob.toXContent(builder, ToXContent.EMPTY_PARAMS)); + } + + client.index(request, new ActionListener<>() { + @Override + public void onResponse(IndexResponse indexResponse) { + logger.trace("cache fill ({}): [{}]", indexResponse.status(), request.id()); + listener.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + logger.debug(new ParameterizedMessage("failure in cache fill: [{}]", request.id()), e); + listener.onFailure(e); + } + }); + } catch (Exception e) { + logger.warn( + new ParameterizedMessage("cache fill failure: [{}]", CachedBlob.generateId(repository, name, path, offset)), + e + ); + listener.onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { + logger.error(() -> new ParameterizedMessage("failed to create blob cache system index [{}]", index), e); + listener.onFailure(e); + } + }); + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/CachedBlob.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/CachedBlob.java new file mode 100644 index 0000000000000..acc0c7cbe9260 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/CachedBlob.java @@ -0,0 +1,189 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.blobstore.cache; + +import org.elasticsearch.Version; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.time.Instant; +import java.util.Base64; +import java.util.Map; + +public class CachedBlob implements ToXContent { + + /** + * Sentinel {@link CachedBlob} indicating that searching the cache index returned an error. + */ + public static final CachedBlob CACHE_NOT_READY = new CachedBlob(null, null, null, "CACHE_NOT_READY", null, BytesArray.EMPTY, 0L, 0L); + + /** + * Sentinel {@link CachedBlob} indicating that the cache index definitely did not contain the requested data. + */ + public static final CachedBlob CACHE_MISS = new CachedBlob(null, null, null, "CACHE_MISS", null, BytesArray.EMPTY, 0L, 0L); + + private static final String TYPE = "blob"; + + private final Instant creationTime; + private final Version version; + private final String repository; + private final String name; + private final String path; + + private final BytesReference bytes; + private final long from; + private final long to; + + public CachedBlob( + Instant creationTime, + Version version, + String repository, + String name, + String path, + BytesReference content, + long offset + ) { + this(creationTime, version, repository, name, path, content, offset, offset + (content == null ? 0 : content.length())); + } + + private CachedBlob( + Instant creationTime, + Version version, + String repository, + String name, + String path, + BytesReference content, + long from, + long to + ) { + this.creationTime = creationTime; + this.version = version; + this.repository = repository; + this.name = name; + this.path = path; + this.bytes = content; + this.from = from; + this.to = to; + assert this.to == this.from + this.bytes.length(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + builder.field("type", TYPE); + builder.field("creation_time", creationTime.toEpochMilli()); + builder.field("version", version.id); + builder.field("repository", repository); + builder.startObject("blob"); + { + builder.field("name", name); + builder.field("path", path); + } + builder.endObject(); + builder.startObject("data"); + { + builder.field("content", BytesReference.toBytes(bytes)); + builder.field("length", bytes.length()); + builder.field("from", from); + builder.field("to", to); + } + builder.endObject(); + } + return builder.endObject(); + } + + public String generatedId() { + return generateId(repository, name, path, from); + } + + public long from() { + return from; + } + + public long to() { + return to; + } + + public int length() { + return bytes.length(); + } + + public BytesReference bytes() { + return bytes; + } + + public static String generateId(String repository, String name, String path, long offset) { + return String.join("/", repository, path, name, "@" + offset); + } + + @SuppressWarnings("unchecked") + public static CachedBlob fromSource(final Map source) { + final Long creationTimeEpochMillis = (Long) source.get("creation_time"); + if (creationTimeEpochMillis == null) { + throw new IllegalStateException("cached blob document does not have the [creation_time] field"); + } + final Version version = Version.fromId((Integer) source.get("version")); + if (version == null) { + throw new IllegalStateException("cached blob document does not have the [version] field"); + } + final String repository = (String) source.get("repository"); + if (repository == null) { + throw new IllegalStateException("cached blob document does not have the [repository] field"); + } + final Map blob = (Map) source.get("blob"); + if (blob == null || blob.isEmpty()) { + throw new IllegalStateException("cached blob document does not have the [blob] object"); + } + final String name = (String) blob.get("name"); + if (name == null) { + throw new IllegalStateException("cached blob document does not have the [blob.name] field"); + } + final String path = (String) blob.get("path"); + if (path == null) { + throw new IllegalStateException("cached blob document does not have the [blob.path] field"); + } + final Map data = (Map) source.get("data"); + if (data == null || data.isEmpty()) { + throw new IllegalStateException("cached blob document does not have the [data] fobjectield"); + } + final String encodedContent = (String) data.get("content"); + if (encodedContent == null) { + throw new IllegalStateException("cached blob document does not have the [data.content] field"); + } + final Integer length = (Integer) data.get("length"); + if (length == null) { + throw new IllegalStateException("cached blob document does not have the [data.length] field"); + } + final byte[] content = Base64.getDecoder().decode(encodedContent); + if (content.length != length) { + throw new IllegalStateException("cached blob document content length does not match [data.length] field"); + } + final Number from = (Number) data.get("from"); + if (from == null) { + throw new IllegalStateException("cached blob document does not have the [data.from] field"); + } + final Number to = (Number) data.get("to"); + if (to == null) { + throw new IllegalStateException("cached blob document does not have the [data.to] field"); + } + // TODO add exhaustive verifications (from/to/content.length, version supported, id == recomputed id etc) + return new CachedBlob( + Instant.ofEpochMilli(creationTimeEpochMillis), + version, + repository, + name, + path, + new BytesArray(content), + from.longValue(), + to.longValue() + ); + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java index 9b4ed2db5981e..45e57e40cd855 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java @@ -9,12 +9,10 @@ import org.apache.lucene.store.IOContext; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; -import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants; import java.io.IOException; -import java.io.InputStream; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; @@ -101,34 +99,6 @@ public final void close() throws IOException { public abstract void innerClose() throws IOException; - protected InputStream openInputStream(final long position, final long length) throws IOException { - assert assertCurrentThreadMayAccessBlobStore(); - if (fileInfo.numberOfParts() == 1L) { - assert position + length <= fileInfo.partBytes(0) : "cannot read [" - + position - + "-" - + (position + length) - + "] from [" - + fileInfo - + "]"; - return blobContainer.readBlob(fileInfo.partName(0L), position, length); - } else { - final long startPart = getPartNumberForPosition(position); - final long endPart = getPartNumberForPosition(position + length); - return new SlicedInputStream(endPart - startPart + 1L) { - @Override - protected InputStream openSlice(long slice) throws IOException { - final long currentPart = startPart + slice; - final long startInPart = (currentPart == startPart) ? getRelativePositionInPart(position) : 0L; - final long endInPart = (currentPart == endPart) - ? getRelativePositionInPart(position + length) - : getLengthOfPart(currentPart); - return blobContainer.readBlob(fileInfo.partName(currentPart), startInPart, endInPart - startInPart); - } - }; - } - } - protected final boolean assertCurrentThreadMayAccessBlobStore() { final String threadName = Thread.currentThread().getName(); assert threadName.contains('[' + ThreadPool.Names.SNAPSHOT + ']') @@ -149,29 +119,4 @@ protected final boolean assertCurrentThreadMayAccessBlobStore() { return true; } - private long getPartNumberForPosition(long position) { - ensureValidPosition(position); - final long part = position / fileInfo.partSize().getBytes(); - assert part <= fileInfo.numberOfParts() : "part number [" + part + "] exceeds number of parts: " + fileInfo.numberOfParts(); - assert part >= 0L : "part number [" + part + "] is negative"; - return part; - } - - private long getRelativePositionInPart(long position) { - ensureValidPosition(position); - final long pos = position % fileInfo.partSize().getBytes(); - assert pos < fileInfo.partBytes((int) getPartNumberForPosition(pos)) : "position in part [" + pos + "] exceeds part's length"; - assert pos >= 0L : "position in part [" + pos + "] is negative"; - return pos; - } - - private long getLengthOfPart(long part) { - return fileInfo.partBytes(Math.toIntExact(part)); - } - - private void ensureValidPosition(long position) { - if (position < 0L || position > fileInfo.length()) { - throw new IllegalArgumentException("Position [" + position + "] is invalid"); - } - } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/IndexInputStats.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/IndexInputStats.java index 8cc4b0e4f9d8f..0f652d604ba5d 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/IndexInputStats.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/IndexInputStats.java @@ -6,6 +6,7 @@ package org.elasticsearch.index.store; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.store.cache.CachedBlobContainerIndexInput; @@ -43,8 +44,12 @@ public class IndexInputStats { private final TimedCounter optimizedBytesRead = new TimedCounter(); private final Counter cachedBytesRead = new Counter(); + private final Counter indexCacheBytesRead = new Counter(); private final TimedCounter cachedBytesWritten = new TimedCounter(); + private final Counter blobStoreBytesRequested = new Counter(); + private final AtomicLong currentIndexCacheFills = new AtomicLong(); + public IndexInputStats(long fileLength, LongSupplier currentTimeNanos) { this(fileLength, SEEKING_THRESHOLD.getBytes(), currentTimeNanos); } @@ -74,6 +79,10 @@ public void addCachedBytesRead(int bytesRead) { cachedBytesRead.add(bytesRead); } + public void addIndexCacheBytesRead(int bytesRead) { + indexCacheBytesRead.add(bytesRead); + } + public void addCachedBytesWritten(long bytesWritten, long nanoseconds) { cachedBytesWritten.add(bytesWritten, nanoseconds); } @@ -112,6 +121,19 @@ public void incrementSeeks(long currentPosition, long newPosition) { } } + public void addBlobStoreBytesRequested(long bytesRequested) { + blobStoreBytesRequested.add(bytesRequested); + } + + public Releasable addIndexCacheFill() { + final long openValue = currentIndexCacheFills.incrementAndGet(); + assert openValue > 0 : openValue; + return () -> { + final long closeValue = currentIndexCacheFills.decrementAndGet(); + assert closeValue >= 0 : closeValue; + }; + } + public long getFileLength() { return fileLength; } @@ -160,15 +182,27 @@ public Counter getCachedBytesRead() { return cachedBytesRead; } + public Counter getIndexCacheBytesRead() { + return indexCacheBytesRead; + } + public TimedCounter getCachedBytesWritten() { return cachedBytesWritten; } + public Counter getBlobStoreBytesRequested() { + return blobStoreBytesRequested; + } + @SuppressForbidden(reason = "Handles Long.MIN_VALUE before using Math.abs()") public boolean isLargeSeek(long delta) { return delta != Long.MIN_VALUE && Math.abs(delta) > seekingThreshold; } + public long getCurrentIndexCacheFills() { + return currentIndexCacheFills.get(); + } + public static class Counter { private final LongAdder count = new LongAdder(); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java index 14fdb229adff9..cf43474817d38 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java @@ -21,11 +21,14 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.StepListener; import org.elasticsearch.action.support.GroupedActionListener; +import org.elasticsearch.blobstore.cache.BlobStoreCacheService; +import org.elasticsearch.blobstore.cache.CachedBlob; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.support.FilterBlobContainer; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lucene.store.ByteArrayIndexInput; import org.elasticsearch.common.settings.Settings; @@ -105,6 +108,9 @@ public class SearchableSnapshotDirectory extends BaseDirectory { private final Supplier blobContainerSupplier; private final Supplier snapshotSupplier; + private final BlobStoreCacheService blobStoreCacheService; + private final String blobStoreCachePath; + private final String repository; private final SnapshotId snapshotId; private final IndexId indexId; private final ShardId shardId; @@ -129,6 +135,8 @@ public class SearchableSnapshotDirectory extends BaseDirectory { public SearchableSnapshotDirectory( Supplier blobContainer, Supplier snapshot, + BlobStoreCacheService blobStoreCacheService, + String repository, SnapshotId snapshotId, IndexId indexId, ShardId shardId, @@ -142,6 +150,8 @@ public SearchableSnapshotDirectory( super(new SingleInstanceLockFactory()); this.snapshotSupplier = Objects.requireNonNull(snapshot); this.blobContainerSupplier = Objects.requireNonNull(blobContainer); + this.blobStoreCacheService = Objects.requireNonNull(blobStoreCacheService); + this.repository = Objects.requireNonNull(repository); this.snapshotId = Objects.requireNonNull(snapshotId); this.indexId = Objects.requireNonNull(indexId); this.shardId = Objects.requireNonNull(shardId); @@ -155,6 +165,7 @@ public SearchableSnapshotDirectory( this.prewarmCache = useCache ? SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.get(indexSettings) : false; this.excludedFileTypes = new HashSet<>(SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING.get(indexSettings)); this.uncachedChunkSize = SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING.get(indexSettings).getBytes(); + this.blobStoreCachePath = String.join("/", snapshotId.getUUID(), indexId.getId(), String.valueOf(shardId.id())); this.threadPool = threadPool; this.loaded = false; assert invariant(); @@ -163,6 +174,7 @@ public SearchableSnapshotDirectory( private synchronized boolean invariant() { assert loaded != (snapshot == null); assert loaded != (blobContainer == null); + assert loaded != (recoveryState == null); return true; } @@ -184,6 +196,7 @@ public boolean loadSnapshot(RecoveryState recoveryState) { assert recoveryState != null; assert recoveryState instanceof SearchableSnapshotRecoveryState; assert assertCurrentThreadMayLoadSnapshot(); + // noinspection ConstantConditions in case assertions are disabled if (recoveryState instanceof SearchableSnapshotRecoveryState == false) { throw new IllegalArgumentException("A SearchableSnapshotRecoveryState instance was expected"); } @@ -386,7 +399,7 @@ private boolean isExcludedFromCache(String name) { @Override public String toString() { - return this.getClass().getSimpleName() + "@snapshotId=" + snapshotId + " lockFactory=" + lockFactory; + return this.getClass().getSimpleName() + "@snapshotId=" + snapshotId + " lockFactory=" + lockFactory + " shard=" + shardId; } private void cleanExistingRegularShardFiles() { @@ -488,7 +501,8 @@ public static Directory create( IndexSettings indexSettings, ShardPath shardPath, LongSupplier currentTimeNanosSupplier, - ThreadPool threadPool + ThreadPool threadPool, + BlobStoreCacheService blobStoreCacheService ) throws IOException { if (SNAPSHOT_REPOSITORY_SETTING.exists(indexSettings.getSettings()) == false @@ -516,7 +530,8 @@ public static Directory create( ); } - final Repository repository = repositories.repository(SNAPSHOT_REPOSITORY_SETTING.get(indexSettings.getSettings())); + final String repositoryName = SNAPSHOT_REPOSITORY_SETTING.get(indexSettings.getSettings()); + final Repository repository = repositories.repository(repositoryName); if (repository instanceof BlobStoreRepository == false) { throw new IllegalArgumentException("Repository [" + repository + "] is not searchable"); } @@ -546,6 +561,8 @@ public static Directory create( new SearchableSnapshotDirectory( lazyBlobContainer::getOrCompute, lazySnapshot::getOrCompute, + blobStoreCacheService, + repositoryName, snapshotId, indexId, shardPath.getShardId(), @@ -585,6 +602,17 @@ public static SearchableSnapshotDirectory unwrapDirectory(Directory dir) { return null; } + public CachedBlob getCachedBlob(String name, long offset, int length) { + final CachedBlob cachedBlob = blobStoreCacheService.get(repository, name, blobStoreCachePath, offset); + assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || cachedBlob.from() <= offset; + assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || offset + length <= cachedBlob.to(); + return cachedBlob; + } + + public void putCachedBlob(String name, long offset, BytesReference content, ActionListener listener) { + blobStoreCacheService.putAsync(repository, name, blobStoreCachePath, offset, content, listener); + } + /** * A {@link FilterBlobContainer} that uses {@link BlobStoreRepository#maybeRateLimitRestores(InputStream)} to limit the rate at which * blobs are read from the repository. diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java index c6e0557bb483d..e4bc6ac8bb613 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java @@ -270,7 +270,15 @@ interface RangeMissingHandler { void fillCacheRange(FileChannel channel, long from, long to, Consumer progressUpdater) throws IOException; } - CompletableFuture fetchAsync( + /** + * Populates any missing ranges within {@code rangeToWrite} using the {@link RangeMissingHandler}, and notifies the + * {@link RangeAvailableHandler} when {@code rangeToRead} is available to read from the file. If {@code rangeToRead} is already + * available then the {@link RangeAvailableHandler} is called synchronously by this method; if not then the given {@link Executor} + * processes the missing ranges and notifies the {@link RangeAvailableHandler}. + * + * @return a future which returns the result of the {@link RangeAvailableHandler} once it has completed. + */ + CompletableFuture populateAndRead( final Tuple rangeToWrite, final Tuple rangeToRead, final RangeAvailableHandler reader, @@ -331,6 +339,41 @@ public void onFailure(Exception e) { return future; } + /** + * Notifies the {@link RangeAvailableHandler} when {@code rangeToRead} is available to read from the file. If {@code rangeToRead} is + * already available then the {@link RangeAvailableHandler} is called synchronously by this method; if not, but it is pending, then the + * {@link RangeAvailableHandler} is notified when the pending ranges have completed. If it contains gaps that are not currently pending + * then no listeners are registered and this method returns {@code null}. + * + * @return a future which returns the result of the {@link RangeAvailableHandler} once it has completed, or {@code null} if the + * target range is neither available nor pending. + */ + @Nullable + CompletableFuture readIfAvailableOrPending(final Tuple rangeToRead, final RangeAvailableHandler reader) { + final CompletableFuture future = new CompletableFuture<>(); + try { + ensureOpen(); + if (tracker.waitForRangeIfPending(rangeToRead, ActionListener.wrap(success -> { + final int read = reader.onRangeAvailable(channel); + assert read == rangeToRead.v2() - rangeToRead.v1() : "partial read [" + + read + + "] does not match the range to read [" + + rangeToRead.v2() + + '-' + + rangeToRead.v1() + + ']'; + future.complete(read); + }, future::completeExceptionally))) { + return future; + } else { + return null; + } + } catch (Exception e) { + future.completeExceptionally(e); + return future; + } + } + public Tuple getAbsentRangeWithin(long start, long end) { ensureOpen(); return tracker.getAbsentRangeWithin(start, end); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java index 4ee8a191fd5d7..4a4eebfce8d46 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java @@ -9,15 +9,23 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefIterator; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.blobstore.cache.BlobStoreCacheService; +import org.elasticsearch.blobstore.cache.CachedBlob; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.Channels; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; +import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream; import org.elasticsearch.index.store.BaseSearchableSnapshotIndexInput; import org.elasticsearch.index.store.IndexInputStats; import org.elasticsearch.index.store.SearchableSnapshotDirectory; @@ -29,12 +37,15 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.Locale; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.IntStream; +import static org.elasticsearch.index.store.checksum.ChecksumBlobContainerIndexInput.checksumToBytesArray; + public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexInput { /** @@ -74,6 +85,7 @@ public CachedBlobContainerIndexInput( new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length()), rangeSize ); + assert getBufferSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE; // must be able to cache at least one buffer's worth stats.incrementOpenCount(); } @@ -136,53 +148,296 @@ protected void readInternal(ByteBuffer b) throws IOException { final long position = getFilePointer() + this.offset; final int length = b.remaining(); - int totalBytesRead = 0; - while (totalBytesRead < length) { - final long pos = position + totalBytesRead; - final int len = length - totalBytesRead; - int bytesRead = 0; - try { - final CacheFile cacheFile = getCacheFileSafe(); - try (Releasable ignored = cacheFile.fileLock()) { - final Tuple rangeToWrite = computeRange(pos); - final Tuple rangeToRead = Tuple.tuple(pos, Math.min(pos + len, rangeToWrite.v2())); - - bytesRead = cacheFile.fetchAsync(rangeToWrite, rangeToRead, (channel) -> { - final int read; - if ((rangeToRead.v2() - rangeToRead.v1()) < b.remaining()) { - final ByteBuffer duplicate = b.duplicate(); - duplicate.limit(duplicate.position() + Math.toIntExact(rangeToRead.v2() - rangeToRead.v1())); - read = readCacheFile(channel, pos, duplicate); - assert duplicate.position() <= b.limit(); - b.position(duplicate.position()); + // We can detect that we're going to read the last 16 bytes (that contains the footer checksum) of the file. Such reads are often + // executed when opening a Directory and since we have the checksum in the snapshot metadata we can use it to fill the ByteBuffer. + if (length == CodecUtil.footerLength() && isClone == false && position == fileInfo.length() - length) { + if (readChecksumFromFileInfo(b)) { + logger.trace("read footer of file [{}] at position [{}], bypassing all caches", fileInfo.physicalName(), position); + return; + } + assert b.remaining() == length; + } + + logger.trace("readInternal: read [{}-{}] ([{}] bytes) from [{}]", position, position + length, length, this); + + try { + final CacheFile cacheFile = getCacheFileSafe(); + try (Releasable ignored = cacheFile.fileLock()) { + + // Can we serve the read directly from disk? If so, do so and don't worry about anything else. + + final CompletableFuture waitingForRead = cacheFile.readIfAvailableOrPending( + Tuple.tuple(position, position + length), + channel -> { + final int read = readCacheFile(channel, position, b); + assert read == length : read + " vs " + length; + return read; + } + ); + + if (waitingForRead != null) { + final Integer read = waitingForRead.get(); + assert read == length; + readComplete(position, length); + return; + } + + // Requested data is not on disk, so try the cache index next. + + final Tuple indexCacheMiss; // null if not a miss + + // We try to use the cache index if: + // - the file is small enough to be fully cached + final boolean canBeFullyCached = fileInfo.length() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE * 2; + // - we're reading the first N bytes of the file + final boolean isStartOfFile = (position + length <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE); + + if (canBeFullyCached || isStartOfFile) { + final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), 0L, length); + + if (cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY) { + // We would have liked to find a cached entry but we did not find anything: the cache on the disk will be requested + // so we compute the region of the file we would like to have the next time. The region is expressed as a tuple of + // {start, end} where positions are relative to the whole file. + + if (canBeFullyCached) { + // if the index input is smaller than twice the size of the blob cache, it will be fully indexed + indexCacheMiss = Tuple.tuple(0L, fileInfo.length()); } else { - read = readCacheFile(channel, pos, b); + // the index input is too large to fully cache, so just cache the initial range + indexCacheMiss = Tuple.tuple(0L, (long) BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE); } - return read; - }, this::writeCacheFile, directory.cacheFetchAsyncExecutor()).get(); + + // We must fill in a cache miss even if CACHE_NOT_READY since the cache index is only created on the first put. + // TODO TBD use a different trigger for creating the cache index and avoid a put in the CACHE_NOT_READY case. + } else { + logger.trace( + "reading [{}] bytes of file [{}] at position [{}] using cache index", + length, + fileInfo.physicalName(), + position + ); + stats.addIndexCacheBytesRead(cachedBlob.length()); + + final BytesRefIterator cachedBytesIterator = cachedBlob.bytes().slice(Math.toIntExact(position), length).iterator(); + BytesRef bytesRef; + while ((bytesRef = cachedBytesIterator.next()) != null) { + b.put(bytesRef.bytes, bytesRef.offset, bytesRef.length); + } + assert b.position() == length : "copied " + b.position() + " but expected " + length; + + try { + final Tuple cachedRange = Tuple.tuple(cachedBlob.from(), cachedBlob.to()); + cacheFile.populateAndRead( + cachedRange, + cachedRange, + channel -> cachedBlob.length(), + (channel, from, to, progressUpdater) -> { + final long startTimeNanos = stats.currentTimeNanos(); + final BytesRefIterator iterator = cachedBlob.bytes() + .slice(Math.toIntExact(from - cachedBlob.from()), Math.toIntExact(to - from)) + .iterator(); + long writePosition = from; + BytesRef current; + while ((current = iterator.next()) != null) { + final ByteBuffer byteBuffer = ByteBuffer.wrap(current.bytes, current.offset, current.length); + while (byteBuffer.remaining() > 0) { + writePosition += positionalWrite(channel, writePosition, byteBuffer); + progressUpdater.accept(writePosition); + } + } + assert writePosition == to : writePosition + " vs " + to; + final long endTimeNanos = stats.currentTimeNanos(); + stats.addCachedBytesWritten(to - from, endTimeNanos - startTimeNanos); + logger.trace("copied bytes [{}-{}] of file [{}] from cache index to disk", from, to, fileInfo); + }, + directory.cacheFetchAsyncExecutor() + ); + } catch (Exception e) { + logger.debug( + new ParameterizedMessage( + "failed to store bytes [{}-{}] of file [{}] obtained from index cache", + cachedBlob.from(), + cachedBlob.to(), + fileInfo + ), + e + ); + // oh well, no big deal, at least we can return them to the caller. + } + + readComplete(position, length); + + return; + } + } else { + // requested range is not eligible for caching + indexCacheMiss = null; } - } catch (final Exception e) { - if (e instanceof AlreadyClosedException || (e.getCause() != null && e.getCause() instanceof AlreadyClosedException)) { - try { - // cache file was evicted during the range fetching, read bytes directly from source - bytesRead = readDirectly(pos, pos + len, b); - continue; - } catch (Exception inner) { - e.addSuppressed(inner); + + // Requested data is also not in the cache index, so we must visit the blob store to satisfy both the target range and any + // miss in the cache index. + + final Tuple startRangeToWrite = computeRange(position); + final Tuple endRangeToWrite = computeRange(position + length - 1); + assert startRangeToWrite.v2() <= endRangeToWrite.v2() : startRangeToWrite + " vs " + endRangeToWrite; + final Tuple rangeToWrite = Tuple.tuple( + Math.min(startRangeToWrite.v1(), indexCacheMiss == null ? Long.MAX_VALUE : indexCacheMiss.v1()), + Math.max(endRangeToWrite.v2(), indexCacheMiss == null ? Long.MIN_VALUE : indexCacheMiss.v2()) + ); + + assert rangeToWrite.v1() <= position && position + length <= rangeToWrite.v2() : "[" + + position + + "-" + + (position + length) + + "] vs " + + rangeToWrite; + final Tuple rangeToRead = Tuple.tuple(position, position + length); + + final CompletableFuture populateCacheFuture = cacheFile.populateAndRead(rangeToWrite, rangeToRead, channel -> { + final int read; + if ((rangeToRead.v2() - rangeToRead.v1()) < b.remaining()) { + final ByteBuffer duplicate = b.duplicate(); + duplicate.limit(duplicate.position() + Math.toIntExact(rangeToRead.v2() - rangeToRead.v1())); + read = readCacheFile(channel, position, duplicate); + assert duplicate.position() <= b.limit(); + b.position(duplicate.position()); + } else { + read = readCacheFile(channel, position, b); + } + return read; + }, this::writeCacheFile, directory.cacheFetchAsyncExecutor()); + + if (indexCacheMiss != null) { + final Releasable onCacheFillComplete = stats.addIndexCacheFill(); + final CompletableFuture readFuture = cacheFile.readIfAvailableOrPending(indexCacheMiss, channel -> { + final int indexCacheMissLength = Math.toIntExact(indexCacheMiss.v2() - indexCacheMiss.v1()); + + // We assume that we only cache small portions of blobs so that we do not need to: + // - use a BigArrays for allocation + // - use an intermediate copy buffer to read the file in sensibly-sized chunks + // - release the buffer once the indexing operation is complete + assert indexCacheMissLength <= COPY_BUFFER_SIZE : indexCacheMiss; + + final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength); + Channels.readFromFileChannelWithEofException(channel, indexCacheMiss.v1(), byteBuffer); + // NB use Channels.readFromFileChannelWithEofException not readCacheFile() to avoid counting this in the stats + byteBuffer.flip(); + final BytesReference content = BytesReference.fromByteBuffer(byteBuffer); + directory.putCachedBlob(fileInfo.physicalName(), indexCacheMiss.v1(), content, new ActionListener<>() { + @Override + public void onResponse(Void response) { + onCacheFillComplete.close(); + } + + @Override + public void onFailure(Exception e1) { + onCacheFillComplete.close(); + } + }); + return indexCacheMissLength; + }); + + if (readFuture == null) { + // Normally doesn't happen, we're already obtaining a range covering all cache misses above, but theoretically + // possible in the case that the real populateAndRead call already failed to obtain this range of the file. In that + // case, simply move on. + onCacheFillComplete.close(); } } - throw new IOException("Fail to read data from cache", e); - } finally { - totalBytesRead += bytesRead; + final int bytesRead = populateCacheFuture.get(); + assert bytesRead == length : bytesRead + " vs " + length; } + } catch (final Exception e) { + // may have partially filled the buffer before the exception was thrown, so try and get the remainder directly. + final int alreadyRead = length - b.remaining(); + final int bytesRead = readDirectlyIfAlreadyClosed(position + alreadyRead, b, e); + assert alreadyRead + bytesRead == length : alreadyRead + " + " + bytesRead + " vs " + length; + + // In principle we could handle an index cache miss here too, ensuring that the direct read was large enough, but this is + // already a rare case caused by an overfull/undersized cache. } - assert totalBytesRead == length : "partial read operation, read [" + totalBytesRead + "] bytes of [" + length + "]"; - stats.incrementBytesRead(lastReadPosition, position, totalBytesRead); - lastReadPosition = position + totalBytesRead; + + readComplete(position, length); + } + + private void readComplete(long position, int length) { + stats.incrementBytesRead(lastReadPosition, position, length); + lastReadPosition = position + length; lastSeekPosition = lastReadPosition; } + private int readDirectlyIfAlreadyClosed(long position, ByteBuffer b, Exception e) throws IOException { + if (e instanceof AlreadyClosedException || (e.getCause() != null && e.getCause() instanceof AlreadyClosedException)) { + try { + // cache file was evicted during the range fetching, read bytes directly from blob container + final long length = b.remaining(); + final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, length))]; + logger.trace( + () -> new ParameterizedMessage( + "direct reading of range [{}-{}] for cache file [{}]", + position, + position + length, + cacheFileReference + ) + ); + + int bytesCopied = 0; + final long startTimeNanos = stats.currentTimeNanos(); + try (InputStream input = openInputStreamFromBlobStore(position, length)) { + long remaining = length; + while (remaining > 0) { + final int len = (remaining < copyBuffer.length) ? (int) remaining : copyBuffer.length; + int bytesRead = input.read(copyBuffer, 0, len); + if (bytesRead == -1) { + throw new EOFException( + String.format( + Locale.ROOT, + "unexpected EOF reading [%d-%d] ([%d] bytes remaining) from %s", + position, + position + length, + remaining, + cacheFileReference + ) + ); + } + b.put(copyBuffer, 0, bytesRead); + bytesCopied += bytesRead; + remaining -= bytesRead; + assert remaining == b.remaining() : remaining + " vs " + b.remaining(); + } + final long endTimeNanos = stats.currentTimeNanos(); + stats.addDirectBytesRead(bytesCopied, endTimeNanos - startTimeNanos); + } + return bytesCopied; + } catch (Exception inner) { + e.addSuppressed(inner); + } + } + throw new IOException("failed to read data from cache", e); + } + + private boolean readChecksumFromFileInfo(ByteBuffer b) throws IOException { + assert isClone == false; + byte[] footer; + try { + footer = checksumToBytesArray(fileInfo.checksum()); + } catch (NumberFormatException e) { + // tests disable this optimisation by passing an invalid checksum + footer = null; + } + if (footer == null) { + return false; + } + + b.put(footer); + assert b.remaining() == 0L; + return true; + + // TODO we should add this to DirectBlobContainerIndexInput too. + } + /** * Prefetches a complete part and writes it in cache. This method is used to prewarm the cache. */ @@ -232,7 +487,7 @@ public void prefetchPart(final int part) throws IOException { final AtomicLong totalBytesWritten = new AtomicLong(); long remainingBytes = rangeEnd - rangeStart; final long startTimeNanos = stats.currentTimeNanos(); - try (InputStream input = openInputStream(rangeStart, rangeLength)) { + try (InputStream input = openInputStreamFromBlobStore(rangeStart, rangeLength)) { while (remainingBytes > 0L) { assert totalBytesRead + remainingBytes == rangeLength; final int bytesRead = readSafe(input, copyBuffer, rangeStart, rangeEnd, remainingBytes, cacheFileReference); @@ -241,23 +496,33 @@ public void prefetchPart(final int part) throws IOException { final long readStart = rangeStart + totalBytesRead; final Tuple rangeToWrite = Tuple.tuple(readStart, readStart + bytesRead); - cacheFile.fetchAsync(rangeToWrite, rangeToWrite, (channel) -> bytesRead, (channel, start, end, progressUpdater) -> { - final ByteBuffer byteBuffer = ByteBuffer.wrap( - copyBuffer, - Math.toIntExact(start - readStart), - Math.toIntExact(end - start) - ); - final int writtenBytes = positionalWrite(channel, start, byteBuffer); - logger.trace( - "prefetchPart: writing range [{}-{}] of file [{}], [{}] bytes written", - start, - end, - fileInfo.physicalName(), - writtenBytes - ); - totalBytesWritten.addAndGet(writtenBytes); - progressUpdater.accept(start + writtenBytes); - }, directory.cacheFetchAsyncExecutor()).get(); + // We do not actually read anything, but we want to wait for the write to complete before proceeding. + // noinspection UnnecessaryLocalVariable + final Tuple rangeToRead = rangeToWrite; + + cacheFile.populateAndRead( + rangeToWrite, + rangeToRead, + (channel) -> bytesRead, + (channel, start, end, progressUpdater) -> { + final ByteBuffer byteBuffer = ByteBuffer.wrap( + copyBuffer, + Math.toIntExact(start - readStart), + Math.toIntExact(end - start) + ); + final int writtenBytes = positionalWrite(channel, start, byteBuffer); + logger.trace( + "prefetchPart: writing range [{}-{}] of file [{}], [{}] bytes written", + start, + end, + fileInfo.physicalName(), + writtenBytes + ); + totalBytesWritten.addAndGet(writtenBytes); + progressUpdater.accept(start + writtenBytes); + }, + directory.cacheFetchAsyncExecutor() + ).get(); totalBytesRead += bytesRead; remainingBytes -= bytesRead; } @@ -357,7 +622,7 @@ private void writeCacheFile(final FileChannel fc, final long start, final long e long bytesCopied = 0L; long remaining = end - start; final long startTimeNanos = stats.currentTimeNanos(); - try (InputStream input = openInputStream(start, length)) { + try (InputStream input = openInputStreamFromBlobStore(start, length)) { while (remaining > 0L) { final int bytesRead = readSafe(input, copyBuffer, start, end, remaining, cacheFileReference); positionalWrite(fc, start + bytesCopied, ByteBuffer.wrap(copyBuffer, 0, bytesRead)); @@ -370,6 +635,86 @@ private void writeCacheFile(final FileChannel fc, final long start, final long e } } + /** + * Opens an {@link InputStream} for the given range of bytes which reads the data directly from the blob store. If the requested range + * spans multiple blobs then this stream will request them in turn. + * + * @param position The start of the range of bytes to read, relative to the start of the corresponding Lucene file. + * @param length The number of bytes to read + */ + private InputStream openInputStreamFromBlobStore(final long position, final long length) throws IOException { + assert assertCurrentThreadMayAccessBlobStore(); + if (fileInfo.numberOfParts() == 1L) { + assert position + length <= fileInfo.partBytes(0) : "cannot read [" + + position + + "-" + + (position + length) + + "] from [" + + fileInfo + + "]"; + stats.addBlobStoreBytesRequested(length); + return blobContainer.readBlob(fileInfo.partName(0L), position, length); + } else { + final long startPart = getPartNumberForPosition(position); + final long endPart = getPartNumberForPosition(position + length - 1); + + for (long currentPart = startPart; currentPart <= endPart; currentPart++) { + final long startInPart = (currentPart == startPart) ? getRelativePositionInPart(position) : 0L; + final long endInPart = (currentPart == endPart) + ? getRelativePositionInPart(position + length - 1) + 1 + : getLengthOfPart(currentPart); + stats.addBlobStoreBytesRequested(endInPart - startInPart); + } + + return new SlicedInputStream(endPart - startPart + 1L) { + @Override + protected InputStream openSlice(long slice) throws IOException { + final long currentPart = startPart + slice; + final long startInPart = (currentPart == startPart) ? getRelativePositionInPart(position) : 0L; + final long endInPart = (currentPart == endPart) + ? getRelativePositionInPart(position + length - 1) + 1 + : getLengthOfPart(currentPart); + return blobContainer.readBlob(fileInfo.partName(currentPart), startInPart, endInPart - startInPart); + } + }; + } + } + + /** + * Compute the part number that contains the byte at the given position in the corresponding Lucene file. + */ + private long getPartNumberForPosition(long position) { + ensureValidPosition(position); + final long part = position / fileInfo.partSize().getBytes(); + assert part <= fileInfo.numberOfParts() : "part number [" + part + "] exceeds number of parts: " + fileInfo.numberOfParts(); + assert part >= 0L : "part number [" + part + "] is negative"; + return part; + } + + /** + * Compute the position of the given byte relative to the start of its part. + * @param position the position of the required byte (within the corresponding Lucene file) + */ + private long getRelativePositionInPart(long position) { + ensureValidPosition(position); + final long pos = position % fileInfo.partSize().getBytes(); + assert pos < fileInfo.partBytes((int) getPartNumberForPosition(pos)) : "position in part [" + pos + "] exceeds part's length"; + assert pos >= 0L : "position in part [" + pos + "] is negative"; + return pos; + } + + private long getLengthOfPart(long part) { + return fileInfo.partBytes(Math.toIntExact(part)); + } + + private void ensureValidPosition(long position) { + assert position >= 0L && position < fileInfo.length() : position + " vs " + fileInfo.length(); + // noinspection ConstantConditions in case assertions are disabled + if (position < 0L || position >= fileInfo.length()) { + throw new IllegalArgumentException("Position [" + position + "] is invalid for a file of length [" + fileInfo.length() + "]"); + } + } + @Override protected void seekInternal(long pos) throws IOException { if (pos > length()) { @@ -431,43 +776,11 @@ public String toString() { + getFilePointer() + ", rangeSize=" + getDefaultRangeSize() + + ", directory=" + + directory + '}'; } - private int readDirectly(long start, long end, ByteBuffer b) throws IOException { - final long length = end - start; - final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, length))]; - logger.trace(() -> new ParameterizedMessage("direct reading of range [{}-{}] for cache file [{}]", start, end, cacheFileReference)); - - int bytesCopied = 0; - final long startTimeNanos = stats.currentTimeNanos(); - try (InputStream input = openInputStream(start, length)) { - long remaining = end - start; - while (remaining > 0) { - final int len = (remaining < copyBuffer.length) ? (int) remaining : copyBuffer.length; - int bytesRead = input.read(copyBuffer, 0, len); - if (bytesRead == -1) { - throw new EOFException( - String.format( - Locale.ROOT, - "unexpected EOF reading [%d-%d] ([%d] bytes remaining) from %s", - start, - end, - remaining, - cacheFileReference - ) - ); - } - b.put(copyBuffer, 0, bytesRead); - bytesCopied += bytesRead; - remaining -= bytesRead; - } - final long endTimeNanos = stats.currentTimeNanos(); - stats.addDirectBytesRead(bytesCopied, endTimeNanos - startTimeNanos); - } - return bytesCopied; - } - private static class CacheFileReference implements CacheFile.EvictionListener { private final long fileLength; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/SparseFileTracker.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/SparseFileTracker.java index feeb55152e314..9e9217688b66a 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/SparseFileTracker.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/SparseFileTracker.java @@ -178,7 +178,7 @@ public List waitForRange(final Tuple range, final Tuple null), - Math.min(requiredRange.end, subRange != null ? subRange.v2() : Long.MAX_VALUE) + Math.min(requiredRange.end, subRange.v2()) ); break; default: @@ -187,10 +187,7 @@ public List waitForRange(final Tuple range, final Tuple r.completionListener.addListener( - groupedActionListener, - Math.min(r.end, subRange != null ? subRange.v2() : Long.MAX_VALUE) - ) + r -> r.completionListener.addListener(groupedActionListener, Math.min(r.end, subRange.v2())) ); } @@ -203,6 +200,90 @@ public List waitForRange(final Tuple range, final Tuple range, final ActionListener listener) { + final long start = range.v1(); + final long end = range.v2(); + if (end < start || start < 0L || length < end) { + throw new IllegalArgumentException("invalid range [start=" + start + ", end=" + end + ", length=" + length + "]"); + } + + synchronized (mutex) { + assert invariant(); + + final List pendingRanges = new ArrayList<>(); + + final Range targetRange = new Range(start, end, null); + final SortedSet earlierRanges = ranges.headSet(targetRange, false); // ranges with strictly earlier starts + if (earlierRanges.isEmpty() == false) { + final Range lastEarlierRange = earlierRanges.last(); + if (start < lastEarlierRange.end) { + if (lastEarlierRange.isPending()) { + pendingRanges.add(lastEarlierRange); + } + targetRange.start = Math.min(end, lastEarlierRange.end); + } + } + + while (targetRange.start < end) { + assert 0 <= targetRange.start : targetRange; + assert invariant(); + + final SortedSet existingRanges = ranges.tailSet(targetRange); + if (existingRanges.isEmpty()) { + return false; + } else { + final Range firstExistingRange = existingRanges.first(); + assert targetRange.start <= firstExistingRange.start : targetRange + " vs " + firstExistingRange; + + if (targetRange.start == firstExistingRange.start) { + if (firstExistingRange.isPending()) { + pendingRanges.add(firstExistingRange); + } + targetRange.start = Math.min(end, firstExistingRange.end); + } else { + return false; + } + } + } + assert targetRange.start == targetRange.end : targetRange; + assert targetRange.start == end : targetRange; + assert invariant(); + + switch (pendingRanges.size()) { + case 0: + break; + case 1: + final Range pendingRange = pendingRanges.get(0); + pendingRange.completionListener.addListener( + ActionListener.map(listener, progress -> null), + Math.min(pendingRange.end, end) + ); + return true; + default: + final GroupedActionListener groupedActionListener = new GroupedActionListener<>( + ActionListener.map(listener, progress -> null), + pendingRanges.size() + ); + pendingRanges.forEach(r -> r.completionListener.addListener(groupedActionListener, Math.min(r.end, end))); + return true; + } + } + + listener.onResponse(null); + return true; + } + /** * Returns a range that contains all bytes of the target range which are absent (possibly pending). The returned range may include * some ranges of present bytes. It tries to return the smallest possible range, but does so on a best-effort basis. This method does diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/checksum/ChecksumBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/checksum/ChecksumBlobContainerIndexInput.java index 76c9017ba84e1..bb72e354b2dfc 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/checksum/ChecksumBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/checksum/ChecksumBlobContainerIndexInput.java @@ -7,11 +7,9 @@ package org.elasticsearch.index.store.checksum; import org.apache.lucene.codecs.CodecUtil; -import org.apache.lucene.store.ByteBuffersDataOutput; -import org.apache.lucene.store.ByteBuffersIndexOutput; +import org.apache.lucene.store.ByteArrayDataOutput; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; -import org.apache.lucene.store.IndexOutput; import org.elasticsearch.index.store.Store; import java.io.EOFException; @@ -131,14 +129,18 @@ private static void ensureReadOnceChecksumContext(IOContext context) { * @throws IOException if something goes wrong when creating the {@link ChecksumBlobContainerIndexInput} */ public static ChecksumBlobContainerIndexInput create(String name, long length, String checksum, IOContext context) throws IOException { - final ByteBuffersDataOutput out = new ByteBuffersDataOutput(); - try (IndexOutput output = new ByteBuffersIndexOutput(out, "tmp", name)) { - // reverse CodecUtil.writeFooter() - output.writeInt(CodecUtil.FOOTER_MAGIC); - output.writeInt(0); - output.writeLong(Long.parseLong(checksum, Character.MAX_RADIX)); - output.close(); - return new ChecksumBlobContainerIndexInput(name, length, out.toArrayCopy(), context); - } + return new ChecksumBlobContainerIndexInput(name, length, checksumToBytesArray(checksum), context); + } + + public static byte[] checksumToBytesArray(String checksum) throws IOException { + final byte[] result = new byte[CodecUtil.footerLength()]; + assert result.length >= Integer.BYTES + Integer.BYTES + Long.BYTES; // ensure that nobody changed the file format under us + final ByteArrayDataOutput output = new ByteArrayDataOutput(result); + // reverse CodecUtil.writeFooter() + output.writeInt(CodecUtil.FOOTER_MAGIC); + output.writeInt(0); + output.writeLong(Long.parseLong(checksum, Character.MAX_RADIX)); + assert output.getPosition() == result.length; + return result; } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java index 18e779578a286..1386c67de671a 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java @@ -338,6 +338,7 @@ public String toString() { private InputStream openBlobStream(int part, long pos, long length) throws IOException { assert assertCurrentThreadMayAccessBlobStore(); + stats.addBlobStoreBytesRequested(length); return blobContainer.readBlob(fileInfo.partName(part), pos, length); } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index d98f633a9c709..39e84e627a46c 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -8,6 +8,7 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.blobstore.cache.BlobStoreCacheService; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -32,6 +33,7 @@ import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.store.SearchableSnapshotDirectory; import org.elasticsearch.index.translog.TranslogStats; +import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.indices.recovery.SearchableSnapshotRecoveryState; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; @@ -40,6 +42,7 @@ import org.elasticsearch.plugins.EnginePlugin; import org.elasticsearch.plugins.IndexStorePlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.SystemIndexPlugin; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; @@ -79,13 +82,15 @@ import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.CACHE_PREWARMING_THREAD_POOL_NAME; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.CACHE_PREWARMING_THREAD_POOL_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.CACHE_FETCH_ASYNC_THREAD_POOL_NAME; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_BLOB_CACHE_INDEX; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_RECOVERY_STATE_FACTORY_KEY; /** * Plugin for Searchable Snapshots feature */ -public class SearchableSnapshots extends Plugin implements IndexStorePlugin, EnginePlugin, ActionPlugin, ClusterPlugin { +public class SearchableSnapshots extends Plugin implements IndexStorePlugin, EnginePlugin, ActionPlugin, ClusterPlugin, SystemIndexPlugin { public static final Setting SNAPSHOT_REPOSITORY_SETTING = Setting.simpleString( "index.store.snapshot.repository_name", @@ -141,6 +146,7 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng ); private volatile Supplier repositoriesServiceSupplier; + private final SetOnce blobStoreCacheService = new SetOnce<>(); private final SetOnce cacheService = new SetOnce<>(); private final SetOnce threadPool = new SetOnce<>(); private final SetOnce failShardsListener = new SetOnce<>(); @@ -195,10 +201,17 @@ public Collection createComponents( this.cacheService.set(cacheService); this.repositoriesServiceSupplier = repositoriesServiceSupplier; this.threadPool.set(threadPool); + final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService( + clusterService, + threadPool, + client, + SNAPSHOT_BLOB_CACHE_INDEX + ); + this.blobStoreCacheService.set(blobStoreCacheService); this.failShardsListener.set( new FailShardsOnInvalidLicenseClusterListener(getLicenseState(), clusterService.getRerouteService()) ); - return List.of(cacheService); + return List.of(cacheService, blobStoreCacheService); } else { this.repositoriesServiceSupplier = () -> { assert false : "searchable snapshots are disabled"; @@ -216,6 +229,11 @@ public void onIndexModule(IndexModule indexModule) { } } + @Override + public Collection getSystemIndexDescriptors(Settings settings) { + return List.of(new SystemIndexDescriptor(SNAPSHOT_BLOB_CACHE_INDEX, "Contains cached data of blob store repositories")); + } + @Override public Map getDirectoryFactories() { if (SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED) { @@ -226,7 +244,17 @@ public Map getDirectoryFactories() { assert cache != null; final ThreadPool threadPool = this.threadPool.get(); assert threadPool != null; - return SearchableSnapshotDirectory.create(repositories, cache, indexSettings, shardPath, System::nanoTime, threadPool); + final BlobStoreCacheService blobCache = blobStoreCacheService.get(); + assert blobCache != null; + return SearchableSnapshotDirectory.create( + repositories, + cache, + indexSettings, + shardPath, + System::nanoTime, + threadPool, + blobCache + ); }); } else { return Map.of(); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportSearchableSnapshotsStatsAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportSearchableSnapshotsStatsAction.java index a2e9ac0c1f478..8d0c3eac5fe02 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportSearchableSnapshotsStatsAction.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportSearchableSnapshotsStatsAction.java @@ -108,9 +108,12 @@ private static CacheIndexInputStats toCacheIndexInputStats(final String fileName toCounter(inputStats.getContiguousReads()), toCounter(inputStats.getNonContiguousReads()), toCounter(inputStats.getCachedBytesRead()), + toCounter(inputStats.getIndexCacheBytesRead()), toTimedCounter(inputStats.getCachedBytesWritten()), toTimedCounter(inputStats.getDirectBytesRead()), - toTimedCounter(inputStats.getOptimizedBytesRead()) + toTimedCounter(inputStats.getOptimizedBytesRead()), + toCounter(inputStats.getBlobStoreBytesRequested()), + inputStats.getCurrentIndexCacheFills() ); } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java new file mode 100644 index 0000000000000..2b22fef67e3ef --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java @@ -0,0 +1,450 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.blobstore.cache; + +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; +import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.plugins.ClusterPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotShardStats; +import org.elasticsearch.xpack.searchablesnapshots.BaseSearchableSnapshotsIntegTestCase; +import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots; +import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants; +import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsAction; +import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsRequest; +import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; + +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.INDEX_SHARD_SNAPSHOT_FORMAT; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_BLOB_CACHE_INDEX; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; + +public class SearchableSnapshotsBlobStoreCacheIntegTests extends BaseSearchableSnapshotsIntegTestCase { + + @Override + protected Collection> nodePlugins() { + final List> plugins = new ArrayList<>(); + plugins.add(WaitForSnapshotBlobCacheShardsActivePlugin.class); + plugins.addAll(super.nodePlugins()); + return List.copyOf(plugins); + } + + @Override + protected int numberOfReplicas() { + return 0; + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put( + CacheService.SNAPSHOT_CACHE_RANGE_SIZE_SETTING.getKey(), + randomLongBetween(new ByteSizeValue(4, ByteSizeUnit.KB).getBytes(), new ByteSizeValue(20, ByteSizeUnit.KB).getBytes()) + "b" + ) + .put(CacheService.SNAPSHOT_CACHE_SIZE_SETTING.getKey(), new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES)) + .build(); + } + + public void testBlobStoreCache() throws Exception { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName); + + final List indexRequestBuilders = new ArrayList<>(); + for (int i = scaledRandomIntBetween(0, 10_000); i >= 0; i--) { + indexRequestBuilders.add(client().prepareIndex(indexName).setSource("text", randomUnicodeOfLength(10), "num", i)); + } + indexRandom(true, false, true, indexRequestBuilders); + final long numberOfDocs = indexRequestBuilders.size(); + final NumShards numberOfShards = getNumShards(indexName); + + if (randomBoolean()) { + logger.info("--> force-merging index before snapshotting"); + final ForceMergeResponse forceMergeResponse = client().admin() + .indices() + .prepareForceMerge(indexName) + .setMaxNumSegments(1) + .get(); + assertThat(forceMergeResponse.getSuccessfulShards(), equalTo(numberOfShards.totalNumShards)); + assertThat(forceMergeResponse.getFailedShards(), equalTo(0)); + } + + final String repositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final Path repositoryLocation = randomRepoPath(); + createFsRepository(repositoryName, repositoryLocation); + + final SnapshotId snapshot = createSnapshot(repositoryName, List.of(indexName)); + assertAcked(client().admin().indices().prepareDelete(indexName)); + + // extract the list of blobs per shard from the snapshot directory on disk + final Map blobsInSnapshot = blobsInSnapshot(repositoryLocation, snapshot.getUUID()); + assertThat("Failed to load all shard snapshot metadata files", blobsInSnapshot.size(), equalTo(numberOfShards.numPrimaries)); + + expectThrows( + IndexNotFoundException.class, + ".snapshot-blob-cache system index should not be created yet", + () -> systemClient().admin().indices().prepareGetIndex().addIndices(SNAPSHOT_BLOB_CACHE_INDEX).get() + ); + + logger.info("--> mount snapshot [{}] as an index for the first time", snapshot); + final String restoredIndex = mountSnapshot( + repositoryName, + snapshot.getName(), + indexName, + Settings.builder() + .put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true) + .put(SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), false) + .build() + ); + ensureGreen(restoredIndex); + + // wait for all async cache fills to complete + assertBusy(() -> { + for (final SearchableSnapshotShardStats shardStats : client().execute( + SearchableSnapshotsStatsAction.INSTANCE, + new SearchableSnapshotsStatsRequest() + ).actionGet().getStats()) { + for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) { + assertThat(Strings.toString(indexInputStats), indexInputStats.getCurrentIndexCacheFills(), equalTo(0L)); + } + } + }); + + for (final SearchableSnapshotShardStats shardStats : client().execute( + SearchableSnapshotsStatsAction.INSTANCE, + new SearchableSnapshotsStatsRequest() + ).actionGet().getStats()) { + for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) { + assertThat(Strings.toString(indexInputStats), indexInputStats.getBlobStoreBytesRequested().getCount(), greaterThan(0L)); + } + } + + logger.info("--> verifying cached documents in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); + assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); + + final long numberOfCachedBlobs = systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).get().getHits().getTotalHits().value; + final long numberOfCacheWrites = systemClient().admin() + .indices() + .prepareStats(SNAPSHOT_BLOB_CACHE_INDEX) + .clear() + .setIndexing(true) + .get() + .getTotal().indexing.getTotal().getIndexCount(); + + logger.info("--> verifying documents in index [{}]", restoredIndex); + assertHitCount(client().prepareSearch(restoredIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); + assertHitCount( + client().prepareSearch(restoredIndex) + .setQuery(QueryBuilders.rangeQuery("num").lte(numberOfDocs)) + .setSize(0) + .setTrackTotalHits(true) + .get(), + numberOfDocs + ); + assertHitCount( + client().prepareSearch(restoredIndex) + .setQuery(QueryBuilders.rangeQuery("num").gt(numberOfDocs + 1)) + .setSize(0) + .setTrackTotalHits(true) + .get(), + 0L + ); + + assertAcked(client().admin().indices().prepareDelete(restoredIndex)); + + logger.info("--> mount snapshot [{}] as an index for the second time", snapshot); + final String restoredAgainIndex = mountSnapshot( + repositoryName, + snapshot.getName(), + indexName, + Settings.builder() + .put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true) + .put(SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), false) + .build() + ); + ensureGreen(restoredAgainIndex); + + logger.info("--> verifying shards of [{}] were started without using the blob store more than necessary", restoredAgainIndex); + for (final SearchableSnapshotShardStats shardStats : client().execute( + SearchableSnapshotsStatsAction.INSTANCE, + new SearchableSnapshotsStatsRequest() + ).actionGet().getStats()) { + for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) { + final boolean mayReadMoreThanHeader + // we read the header of each file contained within the .cfs file, which could be anywhere + = indexInputStats.getFileName().endsWith(".cfs") + // we read a couple of longs at the end of the .fdt file (see https://issues.apache.org/jira/browse/LUCENE-9456) + // TODO revisit this when this issue is addressed in Lucene + || indexInputStats.getFileName().endsWith(".fdt"); + if (indexInputStats.getFileLength() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE * 2 + || mayReadMoreThanHeader == false) { + assertThat(Strings.toString(indexInputStats), indexInputStats.getBlobStoreBytesRequested().getCount(), equalTo(0L)); + } + } + } + + logger.info("--> verifying documents in index [{}]", restoredAgainIndex); + assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); + assertHitCount( + client().prepareSearch(restoredAgainIndex) + .setQuery(QueryBuilders.rangeQuery("num").lte(numberOfDocs)) + .setSize(0) + .setTrackTotalHits(true) + .get(), + numberOfDocs + ); + assertHitCount( + client().prepareSearch(restoredAgainIndex) + .setQuery(QueryBuilders.rangeQuery("num").gt(numberOfDocs + 1)) + .setSize(0) + .setTrackTotalHits(true) + .get(), + 0L + ); + + logger.info("--> verifying cached documents (again) in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); + assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); + + logger.info("--> verifying that no extra cached blobs were indexed [{}]", SNAPSHOT_BLOB_CACHE_INDEX); + refreshSystemIndex(); + assertHitCount(systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get(), numberOfCachedBlobs); + assertThat( + systemClient().admin().indices().prepareStats(SNAPSHOT_BLOB_CACHE_INDEX).clear().setIndexing(true).get().getTotal().indexing + .getTotal() + .getIndexCount(), + equalTo(numberOfCacheWrites) + ); + + logger.info("--> restarting cluster"); + internalCluster().fullRestart(new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + return Settings.builder() + .put(super.onNodeStopped(nodeName)) + .put(WaitForSnapshotBlobCacheShardsActivePlugin.ENABLED.getKey(), true) + .build(); + } + }); + ensureGreen(restoredAgainIndex); + + logger.info("--> verifying documents in index [{}]", restoredAgainIndex); + assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); + assertHitCount( + client().prepareSearch(restoredAgainIndex) + .setQuery(QueryBuilders.rangeQuery("num").lte(numberOfDocs)) + .setSize(0) + .setTrackTotalHits(true) + .get(), + numberOfDocs + ); + assertHitCount( + client().prepareSearch(restoredAgainIndex) + .setQuery(QueryBuilders.rangeQuery("num").gt(numberOfDocs + 1)) + .setSize(0) + .setTrackTotalHits(true) + .get(), + 0L + ); + + logger.info("--> verifying cached documents (after restart) in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); + assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); + + logger.info("--> verifying that no cached blobs were indexed in system index [{}] after restart", SNAPSHOT_BLOB_CACHE_INDEX); + assertHitCount(systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get(), numberOfCachedBlobs); + assertThat( + systemClient().admin().indices().prepareStats(SNAPSHOT_BLOB_CACHE_INDEX).clear().setIndexing(true).get().getTotal().indexing + .getTotal() + .getIndexCount(), + equalTo(0L) + ); + + // TODO also test when the index is frozen + // TODO also test when prewarming is enabled + } + + /** + * @return a {@link Client} that can be used to query the blob store cache system index + */ + private Client systemClient() { + return new OriginSettingClient(client(), ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN); + } + + private void refreshSystemIndex() { + try { + final RefreshResponse refreshResponse = systemClient().admin().indices().prepareRefresh(SNAPSHOT_BLOB_CACHE_INDEX).get(); + assertThat(refreshResponse.getSuccessfulShards(), greaterThan(0)); + assertThat(refreshResponse.getFailedShards(), equalTo(0)); + } catch (IndexNotFoundException indexNotFoundException) { + throw new AssertionError("unexpected", indexNotFoundException); + } + } + + /** + * Reads a repository location on disk and extracts the list of blobs for each shards + */ + private Map blobsInSnapshot(Path repositoryLocation, String snapshotId) throws IOException { + final Map blobsPerShard = new HashMap<>(); + Files.walkFileTree(repositoryLocation.resolve("indices"), new SimpleFileVisitor<>() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + final String fileName = file.getFileName().toString(); + if (fileName.equals("snap-" + snapshotId + ".dat")) { + blobsPerShard.put( + String.join( + "/", + snapshotId, + file.getParent().getParent().getFileName().toString(), + file.getParent().getFileName().toString() + ), + INDEX_SHARD_SNAPSHOT_FORMAT.deserialize(fileName, xContentRegistry(), Streams.readFully(Files.newInputStream(file))) + ); + } + return FileVisitResult.CONTINUE; + } + }); + return Map.copyOf(blobsPerShard); + } + + private void assertCachedBlobsInSystemIndex(final String repositoryName, final Map blobsInSnapshot) + throws Exception { + assertBusy(() -> { + refreshSystemIndex(); + + long numberOfCachedBlobs = 0L; + for (Map.Entry blob : blobsInSnapshot.entrySet()) { + for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : blob.getValue().indexFiles()) { + if (fileInfo.name().startsWith("__") == false) { + continue; + } + + final String path = String.join("/", repositoryName, blob.getKey(), fileInfo.physicalName()); + if (fileInfo.length() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE * 2) { + // file has been fully cached + final GetResponse getResponse = systemClient().prepareGet(SNAPSHOT_BLOB_CACHE_INDEX, path + "/@0").get(); + assertThat("not cached: [" + path + "/@0] for blob [" + fileInfo + "]", getResponse.isExists(), is(true)); + final CachedBlob cachedBlob = CachedBlob.fromSource(getResponse.getSourceAsMap()); + assertThat(cachedBlob.from(), equalTo(0L)); + assertThat(cachedBlob.to(), equalTo(fileInfo.length())); + assertThat((long) cachedBlob.length(), equalTo(fileInfo.length())); + numberOfCachedBlobs += 1; + + } else { + // first region of file has been cached + GetResponse getResponse = systemClient().prepareGet(SNAPSHOT_BLOB_CACHE_INDEX, path + "/@0").get(); + assertThat( + "not cached: [" + path + "/@0] for first region of blob [" + fileInfo + "]", + getResponse.isExists(), + is(true) + ); + + CachedBlob cachedBlob = CachedBlob.fromSource(getResponse.getSourceAsMap()); + assertThat(cachedBlob.from(), equalTo(0L)); + assertThat(cachedBlob.to(), equalTo((long) BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE)); + assertThat(cachedBlob.length(), equalTo(BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE)); + numberOfCachedBlobs += 1; + } + } + } + + refreshSystemIndex(); + assertHitCount(systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get(), numberOfCachedBlobs); + }); + } + + /** + * This plugin declares an {@link AllocationDecider} that forces searchable snapshot shards to be allocated after + * the primary shards of the snapshot blob cache index are started. This way we can ensure that searchable snapshot + * shards can use the snapshot blob cache index after the cluster restarted. + */ + public static class WaitForSnapshotBlobCacheShardsActivePlugin extends Plugin implements ClusterPlugin { + + public static Setting ENABLED = Setting.boolSetting( + "wait_for_snapshot_blob_cache_shards_active.enabled", + false, + Setting.Property.NodeScope + ); + + @Override + public List> getSettings() { + return List.of(ENABLED); + } + + @Override + public Collection createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) { + if (ENABLED.get(settings) == false) { + return List.of(); + } + final String name = "wait_for_snapshot_blob_cache_shards_active"; + return List.of(new AllocationDecider() { + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return canAllocate(shardRouting, allocation); + } + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) { + final IndexMetadata indexMetadata = allocation.metadata().index(shardRouting.index()); + if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexMetadata.getSettings()) == false) { + return allocation.decision(Decision.YES, name, "index is not a searchable snapshot shard - can allocate"); + } + if (allocation.metadata().hasIndex(SNAPSHOT_BLOB_CACHE_INDEX) == false) { + return allocation.decision(Decision.YES, name, SNAPSHOT_BLOB_CACHE_INDEX + " is not created yet"); + } + if (allocation.routingTable().hasIndex(SNAPSHOT_BLOB_CACHE_INDEX) == false) { + return allocation.decision(Decision.THROTTLE, name, SNAPSHOT_BLOB_CACHE_INDEX + " is not active yet"); + } + final IndexRoutingTable indexRoutingTable = allocation.routingTable().index(SNAPSHOT_BLOB_CACHE_INDEX); + if (indexRoutingTable.allPrimaryShardsActive() == false) { + return allocation.decision(Decision.THROTTLE, name, SNAPSHOT_BLOB_CACHE_INDEX + " is not active yet"); + } + return allocation.decision(Decision.YES, name, "primary shard for this replica is already active"); + } + }); + } + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java index 78c5bbb4fb909..0fea6616c6c2e 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java @@ -26,6 +26,8 @@ import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import org.elasticsearch.index.store.cache.TestUtils; +import org.elasticsearch.index.store.cache.TestUtils.NoopBlobStoreCacheService; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.SearchableSnapshotRecoveryState; import org.elasticsearch.repositories.IndexId; @@ -125,20 +127,18 @@ public void testCachedBytesReadsAndWrites() { assertBusy(() -> { assertThat(inputStats.getCachedBytesWritten(), notNullValue()); assertThat(inputStats.getCachedBytesWritten().total(), equalTo(length)); - assertThat(inputStats.getCachedBytesWritten().count(), equalTo(cachedBytesWriteCount)); + final long actualWriteCount = inputStats.getCachedBytesWritten().count(); + assertThat(actualWriteCount, lessThanOrEqualTo(cachedBytesWriteCount)); assertThat(inputStats.getCachedBytesWritten().min(), greaterThan(0L)); - assertThat( - inputStats.getCachedBytesWritten().max(), - (length < rangeSize.getBytes()) ? equalTo(length) : equalTo(rangeSize.getBytes()) - ); + assertThat(inputStats.getCachedBytesWritten().max(), lessThanOrEqualTo(length)); assertThat( inputStats.getCachedBytesWritten().totalNanoseconds(), allOf( // each read takes at least FAKE_CLOCK_ADVANCE_NANOS time - greaterThanOrEqualTo(FAKE_CLOCK_ADVANCE_NANOS * cachedBytesWriteCount), + greaterThanOrEqualTo(FAKE_CLOCK_ADVANCE_NANOS * actualWriteCount), // worst case: we start all reads before finishing any of them - lessThanOrEqualTo(FAKE_CLOCK_ADVANCE_NANOS * cachedBytesWriteCount * cachedBytesWriteCount) + lessThanOrEqualTo(FAKE_CLOCK_ADVANCE_NANOS * actualWriteCount * actualWriteCount) ) ); }); @@ -147,10 +147,7 @@ public void testCachedBytesReadsAndWrites() { assertThat(inputStats.getCachedBytesRead().total(), greaterThanOrEqualTo(length)); assertThat(inputStats.getCachedBytesRead().count(), greaterThan(0L)); assertThat(inputStats.getCachedBytesRead().min(), greaterThan(0L)); - assertThat( - inputStats.getCachedBytesRead().max(), - (length < rangeSize.getBytes()) ? lessThanOrEqualTo(length) : lessThanOrEqualTo(rangeSize.getBytes()) - ); + assertThat(inputStats.getCachedBytesRead().max(), lessThanOrEqualTo(length)); assertCounter(inputStats.getDirectBytesRead(), 0L, 0L, 0L, 0L); assertThat(inputStats.getDirectBytesRead().totalNanoseconds(), equalTo(0L)); @@ -322,7 +319,7 @@ public void testReadBytesContiguously() { final IndexInputStats inputStats = cacheDirectory.getStats(fileName); // account for the CacheBufferedIndexInput internal buffer - final long bufferSize = (long) BufferedIndexInput.bufferSize(ioContext); + final long bufferSize = BufferedIndexInput.bufferSize(ioContext); final long remaining = input.length() % bufferSize; final long expectedTotal = input.length(); final long expectedCount = input.length() / bufferSize + (remaining > 0L ? 1L : 0L); @@ -614,6 +611,7 @@ private static void executeTestCase( throw new UncheckedIOException(e); } final ShardPath shardPath = new ShardPath(false, shardDir, shardDir, shardId); + final DiscoveryNode discoveryNode = new DiscoveryNode("_id", buildNewFakeTransportAddress(), Version.CURRENT); final Path cacheDir = createTempDir(); try ( @@ -621,6 +619,8 @@ private static void executeTestCase( SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory( () -> blobContainer, () -> snapshot, + new NoopBlobStoreCacheService(), + "_repo", snapshotId, indexId, shardId, diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java index ae982ad9525cc..17bdb787a5dea 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java @@ -596,6 +596,8 @@ protected void assertSnapshotOrGenericThread() { SearchableSnapshotDirectory snapshotDirectory = new SearchableSnapshotDirectory( () -> blobContainer, () -> snapshot, + new TestUtils.NoopBlobStoreCacheService(), + "_repo", snapshotId, indexId, shardId, @@ -691,6 +693,8 @@ public void testClearCache() throws Exception { SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory( () -> blobContainer, () -> snapshot, + new TestUtils.NoopBlobStoreCacheService(), + "_repo", snapshotId, indexId, shardId, @@ -761,7 +765,7 @@ public void testRequiresAdditionalSettings() { final IndexSettings indexSettings = new IndexSettings(IndexMetadata.builder("test").settings(settings).build(), Settings.EMPTY); expectThrows( IllegalArgumentException.class, - () -> SearchableSnapshotDirectory.create(null, null, indexSettings, null, null, null) + () -> SearchableSnapshotDirectory.create(null, null, indexSettings, null, null, null, null) ); } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java index 0fd9b7f4b7501..838ce10aba21c 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java @@ -8,9 +8,9 @@ import org.apache.lucene.store.IndexInput; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.support.FilterBlobContainer; import org.elasticsearch.common.lucene.store.ESIndexInputTestCase; @@ -22,6 +22,7 @@ import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import org.elasticsearch.index.store.SearchableSnapshotDirectory; import org.elasticsearch.index.store.StoreFileMetadata; +import org.elasticsearch.index.store.cache.TestUtils.NoopBlobStoreCacheService; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.SearchableSnapshotRecoveryState; import org.elasticsearch.repositories.IndexId; @@ -51,6 +52,7 @@ import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase { @@ -66,6 +68,7 @@ public void testRandomReads() throws IOException { for (int i = 0; i < 5; i++) { final String fileName = randomAlphaOfLength(10); + final byte[] input = randomUnicodeOfLength(randomIntBetween(1, 100_000)).getBytes(StandardCharsets.UTF_8); final String blobName = randomUnicodeOfLength(10); @@ -104,6 +107,8 @@ public void testRandomReads() throws IOException { SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory( () -> blobContainer, () -> snapshot, + new NoopBlobStoreCacheService(), + "_repo", snapshotId, indexId, shardId, @@ -118,13 +123,6 @@ public void testRandomReads() throws IOException { threadPool ) ) { - ShardRouting shardRouting = TestShardRouting.newShardRouting( - randomAlphaOfLength(10), - 0, - randomAlphaOfLength(10), - true, - ShardRoutingState.INITIALIZING - ); RecoveryState recoveryState = createRecoveryState(); final boolean loaded = directory.loadSnapshot(recoveryState); assertThat("Failed to load snapshot", loaded, is(true)); @@ -142,9 +140,9 @@ public void testRandomReads() throws IOException { if (blobContainer instanceof CountingBlobContainer) { long numberOfRanges = TestUtils.numberOfRanges(input.length, cacheService.getRangeSize()); assertThat( - "Expected " + numberOfRanges + " ranges fetched from the source", + "Expected at most " + numberOfRanges + " ranges fetched from the source", ((CountingBlobContainer) blobContainer).totalOpens.sum(), - equalTo(numberOfRanges) + lessThanOrEqualTo(numberOfRanges) ); assertThat( "All bytes should have been read from source", @@ -195,6 +193,8 @@ public void testThrowsEOFException() throws IOException { SearchableSnapshotDirectory searchableSnapshotDirectory = new SearchableSnapshotDirectory( () -> blobContainer, () -> snapshot, + new NoopBlobStoreCacheService(), + "_repo", snapshotId, indexId, shardId, @@ -270,7 +270,7 @@ private static class CountingBlobContainer extends FilterBlobContainer { @Override public InputStream readBlob(String blobName, long position, long length) throws IOException { - return new CountingInputStream(this, super.readBlob(blobName, position, length), length, rangeSize); + return new CountingInputStream(this, super.readBlob(blobName, position, length)); } @Override @@ -292,19 +292,15 @@ public InputStream readBlob(String name) { private static class CountingInputStream extends FilterInputStream { private final CountingBlobContainer container; - private final int rangeSize; - private final long length; private long bytesRead = 0L; private long position = 0L; private long start = Long.MAX_VALUE; private long end = Long.MIN_VALUE; - CountingInputStream(CountingBlobContainer container, InputStream input, long length, int rangeSize) { + CountingInputStream(CountingBlobContainer container, InputStream input) { super(input); this.container = Objects.requireNonNull(container); - this.rangeSize = rangeSize; - this.length = length; this.container.totalOpens.increment(); } @@ -346,30 +342,6 @@ public int read(byte[] b, int offset, int len) throws IOException { @Override public void close() throws IOException { in.close(); - if (start % rangeSize != 0) { - throw new AssertionError("Read operation should start at the beginning of a range"); - } - if (end % rangeSize != 0) { - if (end != length) { - throw new AssertionError("Read operation should finish at the end of a range or the end of the file"); - } - } - if (length <= rangeSize) { - if (bytesRead != length) { - throw new AssertionError("All [" + length + "] bytes should have been read, no more no less but got:" + bytesRead); - } - } else { - if (bytesRead != rangeSize) { - if (end != length) { - throw new AssertionError("Expecting [" + rangeSize + "] bytes to be read but got:" + bytesRead); - - } - final long remaining = length % rangeSize; - if (bytesRead != remaining) { - throw new AssertionError("Expecting [" + remaining + "] bytes to be read but got:" + bytesRead); - } - } - } this.container.totalBytes.add(bytesRead); } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java index e64bea4412ff5..822c07037fa4b 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java @@ -220,17 +220,21 @@ public void testCallsListenerWhenRangeIsAvailable() { if (pending == false) { final AtomicBoolean wasNotified = new AtomicBoolean(); - final List gaps = sparseFileTracker.waitForRange( - range, - subRange, - ActionListener.wrap(ignored -> assertTrue(wasNotified.compareAndSet(false, true)), e -> { throw new AssertionError(e); }) + final ActionListener listener = ActionListener.wrap( + ignored -> assertTrue(wasNotified.compareAndSet(false, true)), + e -> { throw new AssertionError(e); } ); + final List gaps = sparseFileTracker.waitForRange(range, subRange, listener); assertTrue( "All bytes of the sub range " + subRange + " are available, listener must be executed immediately", wasNotified.get() ); + wasNotified.set(false); + assertTrue(sparseFileTracker.waitForRangeIfPending(subRange, listener)); + assertTrue(wasNotified.get()); + for (final SparseFileTracker.Gap gap : gaps) { assertThat(gap.start(), greaterThanOrEqualTo(range.v1())); assertThat(gap.end(), lessThanOrEqualTo(range.v2())); @@ -238,13 +242,19 @@ public void testCallsListenerWhenRangeIsAvailable() { for (long i = gap.start(); i < gap.end(); i++) { assertThat(fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE)); fileContents[Math.toIntExact(i)] = AVAILABLE; - assertTrue(wasNotified.get()); gap.onProgress(i + 1L); } gap.onCompletion(); } } else { + final AtomicBoolean waitIfPendingWasNotified = new AtomicBoolean(); + final ActionListener waitIfPendingListener = ActionListener.wrap( + ignored -> assertTrue(waitIfPendingWasNotified.compareAndSet(false, true)), + e -> { throw new AssertionError(e); } + ); + assertFalse(sparseFileTracker.waitForRangeIfPending(subRange, waitIfPendingListener)); + final AtomicBoolean wasNotified = new AtomicBoolean(); final AtomicBoolean expectNotification = new AtomicBoolean(); final List gaps = sparseFileTracker.waitForRange(range, subRange, ActionListener.wrap(ignored -> { @@ -254,6 +264,9 @@ public void testCallsListenerWhenRangeIsAvailable() { assertFalse("Listener should not have been executed yet", wasNotified.get()); + assertTrue(sparseFileTracker.waitForRangeIfPending(subRange, waitIfPendingListener)); + assertFalse(waitIfPendingWasNotified.get()); + long triggeringProgress = -1L; for (long i = subRange.v1(); i < subRange.v2(); i++) { if (fileContents[Math.toIntExact(i)] == UNAVAILABLE) { @@ -278,7 +291,7 @@ public void testCallsListenerWhenRangeIsAvailable() { + "] is reached, but it was triggered after progress got updated to [" + i + ']', - wasNotified.get(), + wasNotified.get() && waitIfPendingWasNotified.get(), equalTo(triggeringProgress < i) ); @@ -290,7 +303,7 @@ public void testCallsListenerWhenRangeIsAvailable() { + "] is reached, but it was triggered after progress got updated to [" + i + ']', - wasNotified.get(), + wasNotified.get() && waitIfPendingWasNotified.get(), equalTo(triggeringProgress < i + 1L) ); } @@ -305,8 +318,10 @@ public void testCallsListenerWhenRangeIsAvailable() { wasNotified.get(), equalTo(triggeringProgress < gap.end()) ); + assertThat(waitIfPendingWasNotified.get(), equalTo(triggeringProgress < gap.end())); } assertTrue(wasNotified.get()); + assertTrue(waitIfPendingWasNotified.get()); } final AtomicBoolean wasNotified = new AtomicBoolean(); @@ -430,34 +445,44 @@ private static void waitForRandomRange( final AtomicBoolean listenerCalled = new AtomicBoolean(); listenerCalledConsumer.accept(listenerCalled); - final boolean useSubRange = randomBoolean(); + final boolean fillInGaps = randomBoolean(); + final boolean useSubRange = fillInGaps && randomBoolean(); final long subRangeStart = useSubRange ? randomLongBetween(rangeStart, rangeEnd) : rangeStart; final long subRangeEnd = useSubRange ? randomLongBetween(subRangeStart, rangeEnd) : rangeEnd; - final List gaps = sparseFileTracker.waitForRange( - Tuple.tuple(rangeStart, rangeEnd), - Tuple.tuple(subRangeStart, subRangeEnd), - new ActionListener<>() { - @Override - public void onResponse(Void aVoid) { - for (long i = subRangeStart; i < subRangeEnd; i++) { - assertThat(fileContents[Math.toIntExact(i)], equalTo(AVAILABLE)); - } - assertTrue(listenerCalled.compareAndSet(false, true)); + final ActionListener actionListener = new ActionListener<>() { + @Override + public void onResponse(Void aVoid) { + for (long i = subRangeStart; i < subRangeEnd; i++) { + assertThat(fileContents[Math.toIntExact(i)], equalTo(AVAILABLE)); } + assertTrue(listenerCalled.compareAndSet(false, true)); + } - @Override - public void onFailure(Exception e) { - assertTrue(listenerCalled.compareAndSet(false, true)); - } + @Override + public void onFailure(Exception e) { + assertTrue(listenerCalled.compareAndSet(false, true)); } - ); + }; - for (final SparseFileTracker.Gap gap : gaps) { - for (long i = gap.start(); i < gap.end(); i++) { - assertThat(Long.toString(i), fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE)); + if (randomBoolean()) { + final List gaps = sparseFileTracker.waitForRange( + Tuple.tuple(rangeStart, rangeEnd), + Tuple.tuple(subRangeStart, subRangeEnd), + actionListener + ); + + for (final SparseFileTracker.Gap gap : gaps) { + for (long i = gap.start(); i < gap.end(); i++) { + assertThat(Long.toString(i), fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE)); + } + gapConsumer.accept(gap); + } + } else { + final boolean listenerRegistered = sparseFileTracker.waitForRangeIfPending(Tuple.tuple(rangeStart, rangeEnd), actionListener); + if (listenerRegistered == false) { + assertTrue(listenerCalled.compareAndSet(false, true)); } - gapConsumer.accept(gap); } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java index 5d71c085cf1a5..e7d96f7b62474 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java @@ -5,10 +5,15 @@ */ package org.elasticsearch.index.store.cache; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.blobstore.cache.BlobStoreCacheService; +import org.elasticsearch.blobstore.cache.CachedBlob; +import org.elasticsearch.client.Client; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetadata; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.DeleteResult; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -28,6 +33,7 @@ import static com.carrotsearch.randomizedtesting.generators.RandomPicks.randomFrom; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; public final class TestUtils { private TestUtils() {} @@ -209,4 +215,28 @@ private UnsupportedOperationException unsupportedException() { return new UnsupportedOperationException("This operation is not supported"); } } + + public static class NoopBlobStoreCacheService extends BlobStoreCacheService { + + public NoopBlobStoreCacheService() { + super(null, null, mock(Client.class), null); + } + + @Override + protected void getAsync(String repository, String name, String path, long offset, ActionListener listener) { + listener.onResponse(CachedBlob.CACHE_NOT_READY); + } + + @Override + public void putAsync( + String repository, + String name, + String path, + long offset, + BytesReference content, + ActionListener listener + ) { + listener.onResponse(null); + } + } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java index a27d3f9c8c628..c368264f1134d 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java @@ -24,23 +24,34 @@ */ package org.elasticsearch.xpack.searchablesnapshots; +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction; +import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Locale; import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING; import static org.elasticsearch.license.LicenseService.SELF_GENERATED_LICENSE_TYPE; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; public abstract class BaseSearchableSnapshotsIntegTestCase extends ESIntegTestCase { @Override @@ -80,6 +91,58 @@ protected Settings nodeSettings(int nodeOrdinal) { return builder.build(); } + protected void createFsRepository(String repositoryName, Path location) { + createRepository( + repositoryName, + FsRepository.TYPE, + Settings.builder().put(FsRepository.LOCATION_SETTING.getKey(), location).build(), + true + ); + } + + protected void createRepository(String repositoryName, String repositoryType, Settings repositorySettings, boolean verify) { + assertAcked( + client().admin() + .cluster() + .preparePutRepository(repositoryName) + .setType(repositoryType) + .setSettings(repositorySettings) + .setVerify(verify) + ); + } + + protected SnapshotId createSnapshot(String repositoryName, List indices) { + final CreateSnapshotResponse response = client().admin() + .cluster() + .prepareCreateSnapshot(repositoryName, randomAlphaOfLength(10).toLowerCase(Locale.ROOT)) + .setIndices(indices.toArray(Strings.EMPTY_ARRAY)) + .setWaitForCompletion(true) + .get(); + + final SnapshotInfo snapshotInfo = response.getSnapshotInfo(); + assertThat(snapshotInfo.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo.failedShards(), equalTo(0)); + return snapshotInfo.snapshotId(); + } + + protected String mountSnapshot(String repositoryName, String snapshotName, String indexName, Settings indexSettings) throws Exception { + final String restoredIndexName = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final MountSearchableSnapshotRequest mountRequest = new MountSearchableSnapshotRequest( + restoredIndexName, + repositoryName, + snapshotName, + indexName, + Settings.builder().put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), Boolean.FALSE.toString()).put(indexSettings).build(), + Strings.EMPTY_ARRAY, + true + ); + + final RestoreSnapshotResponse restoreResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, mountRequest).get(); + assertThat(restoreResponse.getRestoreInfo().successfulShards(), equalTo(getNumShards(restoredIndexName).numPrimaries)); + assertThat(restoreResponse.getRestoreInfo().failedShards(), equalTo(0)); + return restoredIndexName; + } + protected void createRepo(String fsRepoName) { final Path repo = randomRepoPath(); assertAcked( diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotRecoveryStateIntegrationTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotRecoveryStateIntegrationTests.java index a95103488a1df..cd4e5e3ce68c0 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotRecoveryStateIntegrationTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotRecoveryStateIntegrationTests.java @@ -122,6 +122,8 @@ public void testRecoveryStateRecoveredBytesMatchPhysicalCacheState() throws Exce assertThat("Physical cache size doesn't match with recovery state data", physicalCacheSize, equalTo(recoveredBytes)); assertThat("Expected to recover 100% of files", recoveryState.getIndex().recoveredBytesPercent(), equalTo(100.0f)); + + assertAcked(client().admin().indices().prepareDelete(restoredIndexName)); } @SuppressForbidden(reason = "Uses FileSystem APIs") diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java index 0597c4508b1aa..474729be2e610 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java @@ -11,7 +11,6 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; -import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.admin.indices.shrink.ResizeType; @@ -425,6 +424,8 @@ public void testCanMountSnapshotTakenWhileConcurrentlyIndexing() throws Exceptio final RestoreSnapshotResponse restoreSnapshotResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, req).get(); assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0)); ensureGreen(restoredIndexName); + + assertAcked(client().admin().indices().prepareDelete(restoredIndexName)); } public void testMaxRestoreBytesPerSecIsUsed() throws Exception { @@ -507,6 +508,8 @@ public void testMaxRestoreBytesPerSecIsUsed() throws Exception { ); } } + + assertAcked(client().admin().indices().prepareDelete(restoredIndexName)); } private Map getMaxShardSizeByNodeInBytes(String indexName) { @@ -707,11 +710,11 @@ private void assertRecoveryStats(String indexName, boolean preWarmEnabled) { for (List recoveryStates : recoveryResponse.shardRecoveryStates().values()) { for (RecoveryState recoveryState : recoveryStates) { - ByteSizeValue cacheSize = getCacheSizeForShard(recoveryState.getShardId()); + ByteSizeValue cacheSize = getCacheSizeForNode(recoveryState.getTargetNode().getName()); boolean unboundedCache = cacheSize.equals(new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES)); RecoveryState.Index index = recoveryState.getIndex(); assertThat( - Strings.toString(recoveryState), + Strings.toString(recoveryState, true, true), index.recoveredFileCount(), preWarmEnabled && unboundedCache ? equalTo(index.totalRecoverFiles()) : greaterThanOrEqualTo(0) ); @@ -784,18 +787,22 @@ private void assertSearchableSnapshotStats(String indexName, boolean cacheEnable if (cacheEnabled == false || nonCachedExtensions.contains(IndexFileNames.getExtension(fileName))) { assertThat( "Expected at least 1 optimized or direct read for " + fileName + " of shard " + shardRouting, - Math.max(indexInputStats.getOptimizedBytesRead().getCount(), indexInputStats.getDirectBytesRead().getCount()), + max(indexInputStats.getOptimizedBytesRead().getCount(), indexInputStats.getDirectBytesRead().getCount()), greaterThan(0L) ); assertThat( "Expected no cache read or write for " + fileName + " of shard " + shardRouting, - Math.max(indexInputStats.getCachedBytesRead().getCount(), indexInputStats.getCachedBytesWritten().getCount()), + max(indexInputStats.getCachedBytesRead().getCount(), indexInputStats.getCachedBytesWritten().getCount()), equalTo(0L) ); } else if (nodeIdsWithLargeEnoughCache.contains(stats.getShardRouting().currentNodeId())) { assertThat( "Expected at least 1 cache read or write for " + fileName + " of shard " + shardRouting, - Math.max(indexInputStats.getCachedBytesRead().getCount(), indexInputStats.getCachedBytesWritten().getCount()), + max( + indexInputStats.getCachedBytesRead().getCount(), + indexInputStats.getCachedBytesWritten().getCount(), + indexInputStats.getIndexCacheBytesRead().getCount() + ), greaterThan(0L) ); assertThat( @@ -811,15 +818,12 @@ private void assertSearchableSnapshotStats(String indexName, boolean cacheEnable } else { assertThat( "Expected at least 1 read or write of any kind for " + fileName + " of shard " + shardRouting, - Math.max( - Math.max( - indexInputStats.getCachedBytesRead().getCount(), - indexInputStats.getCachedBytesWritten().getCount() - ), - Math.max( - indexInputStats.getOptimizedBytesRead().getCount(), - indexInputStats.getDirectBytesRead().getCount() - ) + max( + indexInputStats.getCachedBytesRead().getCount(), + indexInputStats.getCachedBytesWritten().getCount(), + indexInputStats.getOptimizedBytesRead().getCount(), + indexInputStats.getDirectBytesRead().getCount(), + indexInputStats.getIndexCacheBytesRead().getCount() ), greaterThan(0L) ); @@ -829,13 +833,11 @@ private void assertSearchableSnapshotStats(String indexName, boolean cacheEnable } } - private ByteSizeValue getCacheSizeForShard(ShardId shardId) { - ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().setRoutingTable(true).setNodes(true).get(); - ClusterState clusterStateResponseState = clusterStateResponse.getState(); - String nodeId = clusterStateResponseState.getRoutingTable().shardRoutingTable(shardId).primaryShard().currentNodeId(); - DiscoveryNode discoveryNode = clusterStateResponseState.nodes().get(nodeId); + private static long max(long... values) { + return Arrays.stream(values).max().orElseThrow(() -> new AssertionError("no values")); + } - final Settings nodeSettings = internalCluster().getInstance(Environment.class, discoveryNode.getName()).settings(); - return CacheService.SNAPSHOT_CACHE_SIZE_SETTING.get(nodeSettings); + private ByteSizeValue getCacheSizeForNode(String nodeName) { + return CacheService.SNAPSHOT_CACHE_SIZE_SETTING.get(internalCluster().getInstance(Environment.class, nodeName).settings()); } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/action/SearchableSnapshotsStatsResponseTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/action/SearchableSnapshotsStatsResponseTests.java index fd04ce4b7f226..ad57c607d1722 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/action/SearchableSnapshotsStatsResponseTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/action/SearchableSnapshotsStatsResponseTests.java @@ -107,9 +107,12 @@ private static SearchableSnapshotShardStats.CacheIndexInputStats randomCacheInde randomCounter(), randomCounter(), randomCounter(), + randomCounter(), + randomTimedCounter(), randomTimedCounter(), randomTimedCounter(), - randomTimedCounter() + randomCounter(), + randomNonNegativeLong() ); } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java index c63e086921cbd..7e0911120df1b 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java @@ -25,6 +25,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.ENRICH_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.IDP_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.STACK_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.DEPRECATION_ORIGIN; @@ -121,6 +122,7 @@ public static void switchUserBasedOnActionOriginAndExecute(ThreadContext threadC case IDP_ORIGIN: case INGEST_ORIGIN: case STACK_ORIGIN: + case SEARCHABLE_SNAPSHOTS_ORIGIN: case TASKS_ORIGIN: // TODO use a more limited user for tasks securityContext.executeAsUser(XPackUser.INSTANCE, consumer, Version.CURRENT); break;