Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track Repository Gen. in BlobStoreRepository #48944

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -366,7 +367,7 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolea
} else {
try {
final Map<String, BlobMetaData> rootBlobs = blobContainer().listBlobs();
final RepositoryData repositoryData = getRepositoryData(latestGeneration(rootBlobs.keySet()));
final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs);
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
// delete an index that was created by another master node after writing this index-N blob.
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
Expand All @@ -377,6 +378,30 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolea
}
}

/**
* Loads {@link RepositoryData} ensuring that it is consistent with the given {@code rootBlobs} as well of the assumed generation.
*
* @param repositoryStateId Expected repository generation
* @param rootBlobs Blobs at the repository root
* @return RepositoryData
*/
private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, BlobMetaData> rootBlobs) {
final long generation = latestGeneration(rootBlobs.keySet());
final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation));
if (genToLoad != generation) {
ywelsch marked this conversation as resolved.
Show resolved Hide resolved
// It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just
// debug log it. Any blobs leaked as a result of an inconsistent listing here will be cleaned up in a subsequent cleanup or
// snapshot delete run anyway.
logger.debug("Determined repository's generation from its contents to [" + generation + "] but " +
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be a little controversial:

By tracking the latest gen in the field, we can now identify out of sync listings that we would have previous missed and that would just have failed in a subsequent step where the repo gen is compared. WIth this change, if we miss to list the latest index-N, we can still complete a delte or cleanup just fine (assuming the value in latestKnownRepoGen is correct).

I think it's better user experience to not do a perfect cleanup in this edge case but proceed with the delete/cleanup as if nothing happened. On an eventually consistent repo, the fact that we list out the correct index-N does not guarantee that we didn't miss any other root blobs in the listing anyway.
Also, apart from maybe missing some stale blobs, the delete will work out perfectly fine otherwise.

"current generation is at least [" + genToLoad + "]");
}
if (genToLoad != repositoryStateId) {
ywelsch marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalStateException("Determined latest repository generation to be [" + genToLoad + "] but this operation " +
ywelsch marked this conversation as resolved.
Show resolved Hide resolved
"assumes generation [" + repositoryStateId + "]");
}
return getRepositoryData(genToLoad);
}

/**
* After updating the {@link RepositoryData} each of the shards directories is individually first moved to the next shard generation
* and then has all now unreferenced blobs in it deleted.
Expand Down Expand Up @@ -604,14 +629,8 @@ public void cleanup(long repositoryStateId, boolean writeShardGens, ActionListen
if (isReadOnly()) {
throw new RepositoryException(metadata.name(), "cannot run cleanup on readonly repository");
}
final RepositoryData repositoryData = getRepositoryData();
if (repositoryData.getGenId() != repositoryStateId) {
// Check that we are working on the expected repository version before gathering the data to clean up
throw new RepositoryException(metadata.name(), "concurrent modification of the repository before cleanup started, " +
"expected current generation [" + repositoryStateId + "], actual current generation ["
+ repositoryData.getGenId() + "]");
}
Map<String, BlobMetaData> rootBlobs = blobContainer().listBlobs();
final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs);
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
final Set<String> survivingIndexIds =
repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet());
Expand Down Expand Up @@ -897,12 +916,36 @@ public void endVerification(String seed) {
}
}

// Tracks the latest known repository generation in a best-effort way to detect inconsistent listing of root level index-N blobs
// and concurrent modifications.
// Protected for use in MockEventuallyConsistentRepository
protected final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.EMPTY_REPO_GEN);

@Override
public RepositoryData getRepositoryData() {
try {
return getRepositoryData(latestIndexBlobId());
} catch (IOException ioe) {
throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe);
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
while (true) {
final long generation;
try {
generation = latestIndexBlobId();
} catch (IOException ioe) {
throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe);
}
final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation));
if (genToLoad != generation) {
ywelsch marked this conversation as resolved.
Show resolved Hide resolved
logger.warn("Determined repository generation [" + generation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be warn level? In safeRepositoryData you've just logged this as debug.

Also, this warning is confusing to a user. Perhaps we could talk about eventually consistent repositories here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Let's just make this debug. I wouldn't necessarily start talking about eventual consistency here. It's not the only thing that might lead to this warning, concurrent modifications of the repo will have the same result.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In hindsight, I wonder if we should log this at info level, just so that we get some stats on how often this logic saves the day on Cloud

Copy link
Member Author

@original-brownbear original-brownbear Nov 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now I'd assume/hope the answer here is "never" :D (with standard snapshotting ... other functionality/manual action/... may trigger this obviously) but yea. Let's do info and verify :)

+ "] from repository contents but correct generation must be at least [" + genToLoad + "]");
}
try {
return getRepositoryData(genToLoad);
} catch (RepositoryException e) {
if (genToLoad != latestKnownRepoGen.get()) {
logger.warn("Failed to load repository data generation [" + genToLoad +
ywelsch marked this conversation as resolved.
Show resolved Hide resolved
"] because a concurrent operation moved the current generation to [" + latestKnownRepoGen.get() + "]", e);
continue;
}
throw e;
}
}
}

Expand All @@ -920,6 +963,13 @@ private RepositoryData getRepositoryData(long indexGen) {
return RepositoryData.snapshotsFromXContent(parser, indexGen);
}
} catch (IOException ioe) {
// If we fail to load the generation we tracked in latestKnownRepoGen we reset it.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if resetting is the right thing to do here. If the content of the repo has been deleted (or bucket/folder moved, or permissions changed etc) maybe we should keep the last generation seen around, and let the user sort the issue and re-register the repository?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We talked about that yesterday and I figured that we decided not to do that (yet). I'm of the same opinion but it's quite the change in behavior if we want to just do this as a short-term fix.
Maybe we should just move to that kind of stricter approach in 7.x once we start tracking the repo generation in the CS permanently but for now not do any big experiments? :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rah, I've already forgot about this discussion, sorry. But I'm good with the plan.

// This is done as a fail-safe in case a user manually deletes the contents of the repository in which case subsequent
// operations must start from the EMPTY_REPO_GEN again
if (RepositoryData.EMPTY_REPO_GEN ==
latestKnownRepoGen.updateAndGet(known -> known == indexGen ? RepositoryData.EMPTY_REPO_GEN : known)) {
ywelsch marked this conversation as resolved.
Show resolved Hide resolved
logger.warn("Resetting repository generation tracker because we failed to read generation [" + indexGen + "]");
ywelsch marked this conversation as resolved.
Show resolved Hide resolved
}
throw new RepositoryException(metadata.name(), "could not read repository data from index blob", ioe);
}
}
Expand All @@ -945,11 +995,21 @@ protected void writeIndexGen(final RepositoryData repositoryData, final long exp
"] - possibly due to simultaneous snapshot deletion requests");
}
final long newGen = currentGen + 1;
if (latestKnownRepoGen.get() >= newGen) {
throw new IllegalArgumentException(
"Tried writing generation [" + newGen + "] but repository is at least at generation [" + newGen + "] already");
}
// write the index file
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
writeAtomic(indexBlob,
BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
final long newLatestGen = latestKnownRepoGen.updateAndGet(known -> Math.max(known, newGen));
if (newGen != newLatestGen) {
ywelsch marked this conversation as resolved.
Show resolved Hide resolved
// Don't mess up the index.latest blob
throw new IllegalStateException(
"Wrote generation [" + newGen + "] but latest known repo gen concurrently changed to [" + newLatestGen + "]");
}
// write the current generation to the index-latest file
final BytesReference genBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ public void testConcurrentSnapshotCreateAndDelete() {
final StepListener<CreateSnapshotResponse> createAnotherSnapshotResponseStepListener = new StepListener<>();

continueOrDie(deleteSnapshotStepListener, acknowledgedResponse -> masterNode.client.admin().cluster()
.prepareCreateSnapshot(repoName, snapshotName).execute(createAnotherSnapshotResponseStepListener));
.prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true).execute(createAnotherSnapshotResponseStepListener));
continueOrDie(createAnotherSnapshotResponseStepListener, createSnapshotResponse ->
assertEquals(createSnapshotResponse.getSnapshotInfo().state(), SnapshotState.SUCCESS));

Expand Down Expand Up @@ -1133,7 +1133,7 @@ protected void assertSnapshotOrGenericThread() {
} else {
return metaData -> {
final Repository repository = new MockEventuallyConsistentRepository(
metaData, xContentRegistry(), deterministicTaskQueue.getThreadPool(), blobStoreContext);
metaData, xContentRegistry(), deterministicTaskQueue.getThreadPool(), blobStoreContext, random());
repository.start();
return repository;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
Expand All @@ -64,6 +65,8 @@
*/
public class MockEventuallyConsistentRepository extends BlobStoreRepository {

private final Random random;

private final Context context;

private final NamedXContentRegistry namedXContentRegistry;
Expand All @@ -72,10 +75,12 @@ public MockEventuallyConsistentRepository(
final RepositoryMetaData metadata,
final NamedXContentRegistry namedXContentRegistry,
final ThreadPool threadPool,
final Context context) {
final Context context,
final Random random) {
super(metadata, namedXContentRegistry, threadPool, BlobPath.cleanPath());
this.context = context;
this.namedXContentRegistry = namedXContentRegistry;
this.random = random;
}

// Filters out all actions that are super-seeded by subsequent actions
Expand Down Expand Up @@ -107,6 +112,9 @@ protected BlobStore createBlobStore() {
*/
public static final class Context {

// Eventual consistency is only simulated as long as this flag is false
private boolean consistent;

private final List<BlobStoreAction> actions = new ArrayList<>();

/**
Expand All @@ -117,6 +125,7 @@ public void forceConsistent() {
final List<BlobStoreAction> consistentActions = consistentView(actions);
actions.clear();
actions.addAll(consistentActions);
consistent = true;
}
}
}
Expand Down Expand Up @@ -240,14 +249,14 @@ public Map<String, BlobMetaData> listBlobs() {
ensureNotClosed();
final String thisPath = path.buildAsString();
synchronized (context.actions) {
return consistentView(context.actions).stream()
return maybeMissLatestIndexN(consistentView(context.actions).stream()
.filter(
action -> action.path.startsWith(thisPath) && action.path.substring(thisPath.length()).indexOf('/') == -1
&& action.operation == Operation.PUT)
.collect(
Collectors.toMap(
action -> action.path.substring(thisPath.length()),
action -> new PlainBlobMetaData(action.path.substring(thisPath.length()), action.data.length)));
action -> new PlainBlobMetaData(action.path.substring(thisPath.length()), action.data.length))));
}
}

Expand All @@ -268,9 +277,21 @@ public Map<String, BlobContainer> children() {

@Override
public Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) {
return Maps.ofEntries(
listBlobs().entrySet().stream().filter(entry -> entry.getKey().startsWith(blobNamePrefix)).collect(Collectors.toList())
);
return maybeMissLatestIndexN(
Maps.ofEntries(listBlobs().entrySet().stream().filter(entry -> entry.getKey().startsWith(blobNamePrefix))
.collect(Collectors.toList())));
}

// Randomly filter out the latest /index-N blob from a listing to test that tracking of it in latestKnownRepoGen
// overrides an inconsistent listing
private Map<String, BlobMetaData> maybeMissLatestIndexN(Map<String, BlobMetaData> listing) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am aware that this does not fully cover all possible inconsistent listing scenarios, but only the scenario of missing a known (in the latestKnownRepoGen field) index-N, but correctly handling this scenario is the only thing fixed here for now. It's the most likely scenario in practice though in my opinion (inconsistent listing after back-to-back operations without master failover).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks sufficient to me

// Only filter out latest index-N at the repo root and only as long as we're not in a forced consistent state
if (path.parent() == null && context.consistent == false && random.nextBoolean()) {
final Map<String, BlobMetaData> filtered = new HashMap<>(listing);
filtered.remove(BlobStoreRepository.INDEX_FILE_PREFIX + latestKnownRepoGen.get());
return Map.copyOf(filtered);
}
return listing;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void testReadAfterWriteConsistently() throws IOException {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
repository.start();
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
final String blobName = randomAlphaOfLength(10);
Expand All @@ -70,7 +70,7 @@ public void testReadAfterWriteAfterReadThrows() throws IOException {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
repository.start();
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
final String blobName = randomAlphaOfLength(10);
Expand All @@ -86,7 +86,7 @@ public void testReadAfterDeleteAfterWriteThrows() throws IOException {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
repository.start();
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
final String blobName = randomAlphaOfLength(10);
Expand All @@ -104,7 +104,7 @@ public void testOverwriteRandomBlobFails() throws IOException {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
repository.start();
final BlobContainer container = repository.blobStore().blobContainer(repository.basePath());
final String blobName = randomAlphaOfLength(10);
Expand All @@ -121,7 +121,7 @@ public void testOverwriteShardSnapBlobFails() throws IOException {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
repository.start();
final BlobContainer container =
repository.blobStore().blobContainer(repository.basePath().add("indices").add("someindex").add("0"));
Expand All @@ -143,7 +143,7 @@ public void testOverwriteSnapshotInfoBlob() {
new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, randomIntBetween(1, 10)));
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), threadPool, blobStoreContext)) {
xContentRegistry(), threadPool, blobStoreContext, random())) {
repository.start();

// We create a snap- blob for snapshot "foo" in the first generation
Expand Down