Skip to content

Commit

Permalink
Wait for blob cache fills to complete before stopping the service (#7…
Browse files Browse the repository at this point in the history
…0220)

Today nothing prevents to use the BlobStoreCacheService after the component 
is stopped. It sometimes happens because the shards are one of the very last 
resources to be closed when a node stops, after components are stopped, and 
the closing of shards also releases more resources that are likely to trigger more 
blobs to be cached by the BlobStoreCacheService.

On CI we can notice this behavior by seeing the blob cache index re-created 
while the after-test clean up logic is running (see #69735). This committ changes 
integration tests so that at stop time they now waits for in-flight index operations 
to be completed. It also prevents any new blob to be cached after the service 
has been stopped.

Closes #69735
  • Loading branch information
tlrx authored Mar 24, 2021
1 parent dd69ae9 commit c11be7d
Show file tree
Hide file tree
Showing 5 changed files with 281 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.blobstore.cache.BlobStoreCacheService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
Expand All @@ -28,11 +29,13 @@
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest.Storage;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
import org.elasticsearch.xpack.searchablesnapshots.cache.FrozenCacheService;
import org.junit.After;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.license.LicenseService.SELF_GENERATED_LICENSE_TYPE;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
Expand Down Expand Up @@ -103,6 +106,13 @@ protected Settings nodeSettings(int nodeOrdinal) {
return builder.build();
}

@After
public void waitForBlobCacheFillsToComplete() {
for (BlobStoreCacheService blobStoreCacheService : internalCluster().getDataNodeInstances(BlobStoreCacheService.class)) {
assertTrue(blobStoreCacheService.waitForInFlightCacheFillsToComplete(30L, TimeUnit.SECONDS));
}
}

protected String mountSnapshot(String repositoryName, String snapshotName, String indexName, Settings restoredIndexSettings)
throws Exception {
final String restoredIndexName = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand All @@ -42,12 +44,15 @@
import java.time.Instant;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN;

public class BlobStoreCacheService {
public class BlobStoreCacheService extends AbstractLifecycleComponent {

private static final Logger logger = LogManager.getLogger(BlobStoreCacheService.class);

Expand All @@ -61,18 +66,59 @@ public class BlobStoreCacheService {
.setExpireAfterAccess(TimeValue.timeValueMinutes(60L))
.build();

static final int MAX_IN_FLIGHT_CACHE_FILLS = Integer.MAX_VALUE;

private final ClusterService clusterService;
private final ThreadPool threadPool;
private final Semaphore inFlightCacheFills;
private final Supplier<Long> timeSupplier;
private final AtomicBoolean closed;
private final Client client;
private final String index;

public BlobStoreCacheService(ClusterService clusterService, ThreadPool threadPool, Client client, String index) {
public BlobStoreCacheService(ClusterService clusterService, Client client, String index, Supplier<Long> timeSupplier) {
this.client = new OriginSettingClient(client, SEARCHABLE_SNAPSHOTS_ORIGIN);
this.inFlightCacheFills = new Semaphore(MAX_IN_FLIGHT_CACHE_FILLS);
this.closed = new AtomicBoolean(false);
this.clusterService = clusterService;
this.threadPool = threadPool;
this.timeSupplier = timeSupplier;
this.index = index;
}

@Override
protected void doStart() {}

@Override
protected void doStop() {
if (closed.compareAndSet(false, true)) {
logger.debug("blob cache service is stopped");
}
}

// public for tests
public boolean waitForInFlightCacheFillsToComplete(long timeout, TimeUnit unit) {
boolean acquired = false;
try {
logger.debug("waiting for in-flight blob cache fills to complete");
acquired = inFlightCacheFills.tryAcquire(MAX_IN_FLIGHT_CACHE_FILLS, timeout, unit);
} catch (InterruptedException e) {
logger.warn("interrupted while waiting for in-flight blob cache fills to complete", e);
Thread.currentThread().interrupt();
} finally {
if (acquired) {
inFlightCacheFills.release(MAX_IN_FLIGHT_CACHE_FILLS);
}
}
return acquired;
}

// pkg private for tests
int getInFlightCacheFills() {
return MAX_IN_FLIGHT_CACHE_FILLS - inFlightCacheFills.availablePermits();
}

@Override
protected void doClose() {}

public CachedBlob get(String repository, String name, String path, long offset) {
assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.SYSTEM_READ + ']') == false : "must not block ["
+ Thread.currentThread().getName()
Expand All @@ -84,7 +130,7 @@ public CachedBlob get(String repository, String name, String path, long offset)
return future.actionGet(5, TimeUnit.SECONDS);
} catch (ElasticsearchTimeoutException e) {
if (logger.isDebugEnabled()) {
logger.warn(
logger.debug(
() -> new ParameterizedMessage(
"get from cache index timed out after [5s], retrieving from blob store instead [id={}]",
CachedBlob.generateId(repository, name, path, offset)
Expand All @@ -99,6 +145,11 @@ public CachedBlob get(String repository, String name, String path, long offset)
}

protected void getAsync(String repository, String name, String path, long offset, ActionListener<CachedBlob> listener) {
if (closed.get()) {
logger.debug("failed to retrieve cached blob from system index [{}], service is closed", index);
listener.onResponse(CachedBlob.CACHE_NOT_READY);
return;
}
final GetRequest request = new GetRequest(index).id(CachedBlob.generateId(repository, name, path, offset));
client.get(request, new ActionListener<>() {
@Override
Expand Down Expand Up @@ -144,7 +195,7 @@ private static boolean isExpectedCacheGetException(Exception e) {
public void putAsync(String repository, String name, String path, long offset, BytesReference content, ActionListener<Void> listener) {
try {
final CachedBlob cachedBlob = new CachedBlob(
Instant.ofEpochMilli(threadPool.absoluteTimeInMillis()),
Instant.ofEpochMilli(timeSupplier.get()),
Version.CURRENT,
repository,
name,
Expand All @@ -157,19 +208,39 @@ public void putAsync(String repository, String name, String path, long offset, B
request.source(cachedBlob.toXContent(builder, ToXContent.EMPTY_PARAMS));
}

client.index(request, new ActionListener<>() {
@Override
public void onResponse(IndexResponse indexResponse) {
logger.trace("cache fill ({}): [{}]", indexResponse.status(), request.id());
listener.onResponse(null);
final RunOnce release = new RunOnce(() -> {
final int availablePermits = inFlightCacheFills.availablePermits();
assert availablePermits > 0 : "in-flight available permits should be greater than 0 but got: " + availablePermits;
inFlightCacheFills.release();
});

boolean submitted = false;
inFlightCacheFills.acquire();
try {
if (closed.get()) {
listener.onFailure(new IllegalStateException("Blob cache service is closed"));
return;
}
final ActionListener<Void> wrappedListener = ActionListener.runAfter(listener, release);
client.index(request, new ActionListener<>() {
@Override
public void onResponse(IndexResponse indexResponse) {
logger.trace("cache fill ({}): [{}]", indexResponse.status(), request.id());
wrappedListener.onResponse(null);
}

@Override
public void onFailure(Exception e) {
logger.debug(new ParameterizedMessage("failure in cache fill: [{}]", request.id()), e);
listener.onFailure(e);
@Override
public void onFailure(Exception e) {
logger.debug(new ParameterizedMessage("failure in cache fill: [{}]", request.id()), e);
wrappedListener.onFailure(e);
}
});
submitted = true;
} finally {
if (submitted == false) {
release.run();
}
});
}
} catch (Exception e) {
logger.warn(new ParameterizedMessage("cache fill failure: [{}]", CachedBlob.generateId(repository, name, path, offset)), e);
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,9 @@ public Collection<Object> createComponents(
components.add(cacheService);
final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService(
clusterService,
threadPool,
client,
SNAPSHOT_BLOB_CACHE_INDEX
SNAPSHOT_BLOB_CACHE_INDEX,
threadPool::absoluteTimeInMillis
);
this.blobStoreCacheService.set(blobStoreCacheService);
components.add(blobStoreCacheService);
Expand Down
Loading

0 comments on commit c11be7d

Please sign in to comment.