Skip to content

Commit

Permalink
Propagate the original error for write errors labeled `NoWritesPerfor…
Browse files Browse the repository at this point in the history
…med` (#1013)

JAVA-4701
  • Loading branch information
stIncMale authored Oct 14, 2022
1 parent a8fe4ca commit fbc5e4e
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ static Throwable chooseRetryableWriteException(
return mostRecentAttemptException.getCause();
}
return mostRecentAttemptException;
} else if (mostRecentAttemptException instanceof ResourceSupplierInternalException) {
} else if (mostRecentAttemptException instanceof ResourceSupplierInternalException
|| (mostRecentAttemptException instanceof MongoException
&& ((MongoException) mostRecentAttemptException).hasErrorLabel(NO_WRITES_PERFORMED_ERROR_LABEL))) {
return previouslyChosenException;
} else {
return mostRecentAttemptException;
Expand Down Expand Up @@ -571,6 +573,7 @@ private static boolean isRetryWritesEnabled(@Nullable final BsonDocument command
}

static final String RETRYABLE_WRITE_ERROR_LABEL = "RetryableWriteError";
private static final String NO_WRITES_PERFORMED_ERROR_LABEL = "NoWritesPerformed";

private static boolean decideRetryableAndAddRetryableWriteErrorLabel(final Throwable t, @Nullable final Integer maxWireVersion) {
if (!(t instanceof MongoException)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -846,14 +846,14 @@ class MixedBulkWriteOperationSpecification extends OperationFunctionalSpecificat
given:
getCollectionHelper().insertDocuments(getTestInserts())
def operation = new MixedBulkWriteOperation(getNamespace(),
[new InsertRequest(new BsonDocument('_id', new BsonInt32(7))),
new InsertRequest(new BsonDocument('_id', new BsonInt32(1))) // duplicate key
], false, ACKNOWLEDGED, true)
[new DeleteRequest(new BsonDocument('_id', new BsonInt32(2))), // existing key
new InsertRequest(new BsonDocument('_id', new BsonInt32(1))) // existing (duplicate) key
], true, ACKNOWLEDGED, true)

def failPoint = BsonDocument.parse('''{
"configureFailPoint": "failCommand",
"mode": {"times": 2 },
"data": { "failCommands": ["insert"],
"data": { "failCommands": ["delete"],
"writeConcernError": {"code": 91, "errmsg": "Replication is being shut down"}}}''')
configureFailPoint(failPoint)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ public void poolClearedExceptionMustBeRetryable() throws InterruptedException, E
mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true);
}

/**
* Prose test #3.
*/
@Test
public void originalErrorMustBePropagatedIfNoWritesPerformed() throws InterruptedException {
com.mongodb.client.RetryableWritesProseTest.originalErrorMustBePropagatedIfNoWritesPerformed(
mongoClientSettings -> new SyncMongoClient(MongoClients.create(mongoClientSettings)));
}

private boolean canRunTests() {
Document storageEngine = (Document) getServerStatus().get("storageEngine");

Expand Down
23 changes: 7 additions & 16 deletions driver-sync/src/test/functional/com/mongodb/client/FailPoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@
public final class FailPoint implements AutoCloseable {
private final BsonDocument failPointDocument;
private final MongoClient client;
private final boolean close;

private FailPoint(final BsonDocument failPointDocument, final MongoClient client, final boolean close) {
private FailPoint(final BsonDocument failPointDocument, final MongoClient client) {
this.failPointDocument = failPointDocument.toBsonDocument();
this.client = client;
this.close = close;
}

/**
* @param configureFailPointDoc A document representing {@code configureFailPoint} command to be issued as is via
* {@link com.mongodb.client.MongoDatabase#runCommand(Bson)}.
* @param serverAddress One may use {@link Fixture#getPrimary()} to get the address of a primary server
* if that is what is needed.
*/
public static FailPoint enable(final BsonDocument configureFailPointDoc, final ServerAddress serverAddress) {
MongoClientSettings clientSettings = getMongoClientSettingsBuilder()
Expand All @@ -48,18 +48,11 @@ public static FailPoint enable(final BsonDocument configureFailPointDoc, final S
.hosts(Collections.singletonList(serverAddress)))
.build();
MongoClient client = MongoClients.create(clientSettings);
return enable(configureFailPointDoc, client, true);
return enable(configureFailPointDoc, client);
}

/**
* @see #enable(BsonDocument, ServerAddress)
*/
public static FailPoint enable(final BsonDocument configureFailPointDoc, final MongoClient client) {
return enable(configureFailPointDoc, client, false);
}

private static FailPoint enable(final BsonDocument configureFailPointDoc, final MongoClient client, final boolean close) {
FailPoint result = new FailPoint(configureFailPointDoc, client, close);
private static FailPoint enable(final BsonDocument configureFailPointDoc, final MongoClient client) {
FailPoint result = new FailPoint(configureFailPointDoc, client);
client.getDatabase("admin").runCommand(configureFailPointDoc);
return result;
}
Expand All @@ -71,9 +64,7 @@ public void close() {
.append("configureFailPoint", failPointDocument.getString("configureFailPoint"))
.append("mode", new BsonString("off")));
} finally {
if (close) {
client.close();
}
client.close();
}
}
}
11 changes: 7 additions & 4 deletions driver-sync/src/test/functional/com/mongodb/client/Fixture.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.ServerAddress;
import com.mongodb.client.internal.MongoClientImpl;
import com.mongodb.connection.ServerDescription;

import java.util.List;
Expand Down Expand Up @@ -109,12 +108,16 @@ public static MongoClientSettings.Builder getMongoClientSettings(final Connectio
return builder;
}

/**
* Beware of a potential race condition hiding here: the primary you discover may differ from the one used by the {@code client}
* when performing some operations, as the primary may change.
*/
public static ServerAddress getPrimary() throws InterruptedException {
getMongoClient();
List<ServerDescription> serverDescriptions = getPrimaries(((MongoClientImpl) mongoClient).getCluster().getDescription());
MongoClient client = getMongoClient();
List<ServerDescription> serverDescriptions = getPrimaries(client.getClusterDescription());
while (serverDescriptions.isEmpty()) {
Thread.sleep(100);
serverDescriptions = getPrimaries(((MongoClientImpl) mongoClient).getCluster().getDescription());
serverDescriptions = getPrimaries(client.getClusterDescription());
}
return serverDescriptions.get(0).getAddress();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import com.mongodb.MongoClientException;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoException;
import com.mongodb.ServerAddress;
import com.mongodb.assertions.Assertions;
import com.mongodb.event.CommandListener;
import com.mongodb.event.CommandSucceededEvent;
import com.mongodb.event.ConnectionCheckOutFailedEvent;
import com.mongodb.event.ConnectionCheckedOutEvent;
import com.mongodb.event.ConnectionPoolClearedEvent;
Expand All @@ -34,12 +38,16 @@
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.mongodb.ClusterFixture.getServerStatus;
import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet;
Expand All @@ -55,6 +63,7 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeFalse;
import static org.junit.Assume.assumeTrue;

Expand Down Expand Up @@ -138,7 +147,6 @@ public static <R> void poolClearedExceptionMustBeRetryable(
* As a result, the client has to wait for at least its heartbeat delay until it hears back from a server
* (while it waits for a response, calling `ServerMonitor.connect` has no effect).
* Thus, we want to use small heartbeat delay to reduce delays in the test. */
.minHeartbeatFrequency(50, TimeUnit.MILLISECONDS)
.heartbeatFrequency(50, TimeUnit.MILLISECONDS))
.retryReads(true)
.retryWrites(true)
Expand All @@ -158,7 +166,7 @@ public static <R> void poolClearedExceptionMustBeRetryable(
.append("blockTimeMS", new BsonInt32(1000)));
int timeoutSeconds = 5;
try (MongoClient client = clientCreator.apply(clientSettings);
FailPoint ignored = FailPoint.enable(configureFailPoint, client)) {
FailPoint ignored = FailPoint.enable(configureFailPoint, Fixture.getPrimary())) {
MongoCollection<Document> collection = client.getDatabase(getDefaultDatabaseName())
.getCollection("poolClearedExceptionMustBeRetryable");
collection.drop();
Expand All @@ -179,6 +187,77 @@ public static <R> void poolClearedExceptionMustBeRetryable(
}
}

/**
* Prose test #3.
*/
@Test
public void originalErrorMustBePropagatedIfNoWritesPerformed() throws InterruptedException {
originalErrorMustBePropagatedIfNoWritesPerformed(MongoClients::create);
}

@SuppressWarnings("try")
public static void originalErrorMustBePropagatedIfNoWritesPerformed(
final Function<MongoClientSettings, MongoClient> clientCreator) throws InterruptedException {
assumeTrue(serverVersionAtLeast(6, 0) && isDiscoverableReplicaSet());
ServerAddress primaryServerAddress = Fixture.getPrimary();
CompletableFuture<FailPoint> futureFailPointFromListener = new CompletableFuture<>();
CommandListener commandListener = new CommandListener() {
private final AtomicBoolean configureFailPoint = new AtomicBoolean(true);

@Override
public void commandSucceeded(final CommandSucceededEvent event) {
if (event.getCommandName().equals("insert")
&& event.getResponse().getDocument("writeConcernError", new BsonDocument())
.getInt32("code", new BsonInt32(-1)).intValue() == 91
&& configureFailPoint.compareAndSet(true, false)) {
Assertions.assertTrue(futureFailPointFromListener.complete(FailPoint.enable(
new BsonDocument()
.append("configureFailPoint", new BsonString("failCommand"))
.append("mode", new BsonDocument()
.append("times", new BsonInt32(1)))
.append("data", new BsonDocument()
.append("failCommands", new BsonArray(singletonList(new BsonString("insert"))))
.append("errorCode", new BsonInt32(10107))
.append("errorLabels", new BsonArray(Stream.of("RetryableWriteError", "NoWritesPerformed")
.map(BsonString::new).collect(Collectors.toList())))),
primaryServerAddress
)));
}
}
};
BsonDocument failPointDocument = new BsonDocument()
.append("configureFailPoint", new BsonString("failCommand"))
.append("mode", new BsonDocument()
.append("times", new BsonInt32(1)))
.append("data", new BsonDocument()
.append("writeConcernError", new BsonDocument()
.append("code", new BsonInt32(91))
.append("errorLabels", new BsonArray(Stream.of("RetryableWriteError")
.map(BsonString::new).collect(Collectors.toList()))))
.append("failCommands", new BsonArray(singletonList(new BsonString("insert")))));
try (MongoClient client = clientCreator.apply(getMongoClientSettingsBuilder()
.retryWrites(true)
.addCommandListener(commandListener)
.applyToServerSettings(builder ->
// see `poolClearedExceptionMustBeRetryable` for the explanation
builder.heartbeatFrequency(50, TimeUnit.MILLISECONDS))
.build());
FailPoint ignored = FailPoint.enable(failPointDocument, primaryServerAddress)) {
MongoCollection<Document> collection = client.getDatabase(getDefaultDatabaseName())
.getCollection("originalErrorMustBePropagatedIfNoWritesPerformed");
collection.drop();
try {
collection.insertOne(new Document());
} catch (MongoException e) {
assertEquals(e.getCode(), 91);
return;
}
fail("must not reach");
} finally {
futureFailPointFromListener.thenAccept(FailPoint::close);
}
}

private boolean canRunTests() {
Document storageEngine = (Document) getServerStatus().get("storageEngine");

Expand Down

0 comments on commit fbc5e4e

Please sign in to comment.