Skip to content

Commit

Permalink
Repo analysis of uncontended register behaviour
Browse files Browse the repository at this point in the history
Today repository analysis verifies that a register behaves correctly
under contention, retrying until successful, but it turns out that some
repository implementations cannot even perform uncontended register
writes correctly which may cause endless retries in the contended case.
This commit adds another repository analyser which verifies that
uncontended register writes work correctly on the first attempt.
  • Loading branch information
DaveCTurner committed Oct 21, 2023
1 parent 9794c6e commit 17566ba
Show file tree
Hide file tree
Showing 9 changed files with 374 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ static TransportVersion def(int id) {
public static final TransportVersion PIPELINES_IN_BULK_RESPONSE_ADDED = def(8_519_00_0);
public static final TransportVersion PLUGIN_DESCRIPTOR_STRING_VERSION = def(8_520_00_0);
public static final TransportVersion TOO_MANY_SCROLL_CONTEXTS_EXCEPTION_ADDED = def(8_521_00_0);

public static final TransportVersion UNCONTENDED_REGISTER_ANALYSIS_ADDED = def(8_522_00_0);
/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ public void handle(final HttpExchange exchange) throws IOException {
uploadsList.append("<MaxUploads>10000</MaxUploads>");
uploadsList.append("<IsTruncated>false</IsTruncated>");

for (MultipartUpload value : uploads.values()) {
value.appendXml(uploadsList);
for (final var multipartUpload : uploads.values()) {
if (multipartUpload.getPath().startsWith(prefix)) {
multipartUpload.appendXml(uploadsList);
}
}

uploadsList.append("</ListMultipartUploadsResult>");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class DefaultOperatorOnlyRegistry implements OperatorOnlyRegistry {
"cluster:admin/repository/analyze/blob",
"cluster:admin/repository/analyze/blob/read",
"cluster:admin/repository/analyze/register",
"cluster:admin/repository/analyze/register/uncontended",
// Node shutdown APIs are operator only
"cluster:admin/shutdown/create",
"cluster:admin/shutdown/get",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,11 @@ public void testFailsIfRegisterIncorrect() {
private final AtomicBoolean registerWasCorrupted = new AtomicBoolean();

@Override
public BytesReference onCompareAndExchange(BytesRegister register, BytesReference expected, BytesReference updated) {
public BytesReference onContendedCompareAndExchange(BytesRegister register, BytesReference expected, BytesReference updated) {
if (registerWasCorrupted.compareAndSet(false, true)) {
register.updateAndGet(bytes -> RegisterAnalyzeAction.bytesFromLong(RegisterAnalyzeAction.longFromBytes(bytes) + 1));
register.updateAndGet(
bytes -> ContendedRegisterAnalyzeAction.bytesFromLong(ContendedRegisterAnalyzeAction.longFromBytes(bytes) + 1)
);
}
return register.compareAndExchange(expected, updated);
}
Expand All @@ -317,19 +319,19 @@ public void testFailsIfRegisterHoldsSpuriousValue() {
final long expectedMax = Math.max(request.getConcurrency(), internalCluster().getNodeNames().length);
blobStore.setDisruption(new Disruption() {
@Override
public BytesReference onCompareAndExchange(BytesRegister register, BytesReference expected, BytesReference updated) {
public BytesReference onContendedCompareAndExchange(BytesRegister register, BytesReference expected, BytesReference updated) {
if (randomBoolean() && sawSpuriousValue.compareAndSet(false, true)) {
final var currentValue = RegisterAnalyzeAction.longFromBytes(register.get());
final var currentValue = ContendedRegisterAnalyzeAction.longFromBytes(register.get());
if (currentValue == expectedMax) {
return RegisterAnalyzeAction.bytesFromLong(
return ContendedRegisterAnalyzeAction.bytesFromLong(
randomFrom(
randomLongBetween(0L, expectedMax - 1),
randomLongBetween(expectedMax + 1, Long.MAX_VALUE),
randomLongBetween(Long.MIN_VALUE, -1)
)
);
} else {
return RegisterAnalyzeAction.bytesFromLong(
return ContendedRegisterAnalyzeAction.bytesFromLong(
randomFrom(expectedMax, randomLongBetween(expectedMax, Long.MAX_VALUE), randomLongBetween(Long.MIN_VALUE, -1))
);
}
Expand All @@ -347,6 +349,17 @@ public BytesReference onCompareAndExchange(BytesRegister register, BytesReferenc
}
}

public void testFailsIfAllRegisterOperationsInconclusive() {
final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo");
blobStore.setDisruption(new Disruption() {
@Override
public boolean compareAndExchangeReturnsWitness() {
return false;
}
});
expectThrows(RepositoryVerificationException.class, () -> analyseRepository(request));
}

private RepositoryAnalyzeAction.Response analyseRepository(RepositoryAnalyzeAction.Request request) {
return client().execute(RepositoryAnalyzeAction.INSTANCE, request).actionGet(30L, TimeUnit.SECONDS);
}
Expand Down Expand Up @@ -464,7 +477,11 @@ default boolean createBlobOnAbort() {
return false;
}

default BytesReference onCompareAndExchange(BytesRegister register, BytesReference expected, BytesReference updated) {
default boolean compareAndExchangeReturnsWitness() {
return true;
}

default BytesReference onContendedCompareAndExchange(BytesRegister register, BytesReference expected, BytesReference updated) {
return register.compareAndExchange(expected, updated);
}
}
Expand Down Expand Up @@ -637,8 +654,16 @@ public void compareAndExchangeRegister(
ActionListener<OptionalBytesReference> listener
) {
assertPurpose(purpose);
final var register = registers.computeIfAbsent(key, ignored -> new BytesRegister());
listener.onResponse(OptionalBytesReference.of(disruption.onCompareAndExchange(register, expected, updated)));
if (disruption.compareAndExchangeReturnsWitness()) {
final var register = registers.computeIfAbsent(key, ignored -> new BytesRegister());
if (key.startsWith(RepositoryAnalyzeAction.CONTENDED_REGISTER_NAME_PREFIX)) {
listener.onResponse(OptionalBytesReference.of(disruption.onContendedCompareAndExchange(register, expected, updated)));
} else {
listener.onResponse(OptionalBytesReference.of(register.compareAndExchange(expected, updated)));
}
} else {
listener.onResponse(OptionalBytesReference.MISSING);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,10 +442,13 @@ public Map<String, BlobMetadata> listBlobsByPrefix(OperationPurpose purpose, Str
@Override
public void getRegister(OperationPurpose purpose, String key, ActionListener<OptionalBytesReference> listener) {
assertPurpose(purpose);
if (firstRegisterRead.compareAndSet(true, false) && randomBoolean() && randomBoolean()) {
if (key.startsWith(RepositoryAnalyzeAction.CONTENDED_REGISTER_NAME_PREFIX)
&& firstRegisterRead.compareAndSet(true, false)
&& randomBoolean()
&& randomBoolean()) {
// only fail the first read, we must not fail the final check
listener.onResponse(OptionalBytesReference.EMPTY);
} else if (randomBoolean()) {
} else if (key.startsWith(RepositoryAnalyzeAction.UNCONTENDED_REGISTER_NAME_PREFIX) || randomBoolean()) {
listener.onResponse(OptionalBytesReference.of(registers.computeIfAbsent(key, ignored -> new BytesRegister()).get()));
} else {
final var bogus = randomFrom(BytesArray.EMPTY, new BytesArray(new byte[] { randomByte() }));
Expand All @@ -463,7 +466,10 @@ public void compareAndExchangeRegister(
) {
assertPurpose(purpose);
firstRegisterRead.set(false);
if (updated.length() > 1 && randomBoolean() && randomBoolean()) {
if (key.startsWith(RepositoryAnalyzeAction.CONTENDED_REGISTER_NAME_PREFIX)
&& updated.length() > 1
&& randomBoolean()
&& randomBoolean()) {
// updated.length() > 1 so we don't fail the final check because we know there can be no concurrent operations at that point
listener.onResponse(OptionalBytesReference.MISSING);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,23 @@
import java.util.concurrent.ExecutorService;

/**
* An action which atomically increments a register using {@link BlobContainer#compareAndExchangeRegister}.
* An action which atomically increments a register using {@link BlobContainer#compareAndExchangeRegister}. There will be multiple parties
* accessing the register concurrently in order to test behaviour under contention.
*/
public class RegisterAnalyzeAction extends ActionType<ActionResponse.Empty> {
public class ContendedRegisterAnalyzeAction extends ActionType<ActionResponse.Empty> {

private static final Logger logger = LogManager.getLogger(RegisterAnalyzeAction.class);
private static final Logger logger = LogManager.getLogger(ContendedRegisterAnalyzeAction.class);

public static final RegisterAnalyzeAction INSTANCE = new RegisterAnalyzeAction();
public static final ContendedRegisterAnalyzeAction INSTANCE = new ContendedRegisterAnalyzeAction();
public static final String NAME = "cluster:admin/repository/analyze/register";

private RegisterAnalyzeAction() {
private ContendedRegisterAnalyzeAction() {
super(NAME, in -> ActionResponse.Empty.INSTANCE);
}

public static class TransportAction extends HandledTransportAction<Request, ActionResponse.Empty> {

private static final Logger logger = RegisterAnalyzeAction.logger;
private static final Logger logger = ContendedRegisterAnalyzeAction.logger;

private final RepositoriesService repositoriesService;
private final ExecutorService executor;
Expand Down Expand Up @@ -259,7 +260,7 @@ public String toString() {
public String getDescription() {
return Strings.format(
"""
RegisterAnalyzeAction.Request{\
ContendedRegisterAnalyzeAction.Request{\
repositoryName='%s', containerPath='%s', registerName='%s', requestCount='%d', initialRead='%d'}""",
repositoryName,
containerPath,
Expand Down
Loading

0 comments on commit 17566ba

Please sign in to comment.