Skip to content

Commit

Permalink
Polishing.
Browse files Browse the repository at this point in the history
Reformat code. Suppress warnings in tests.

Original pull request: #4541
See #4495
  • Loading branch information
mp911de committed Nov 2, 2023
1 parent 8b438b4 commit a429ff8
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1906,8 +1906,8 @@ public <T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable String database, @N
publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation)
.orElse(publisher);
publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher);
if(options.getFullDocumentBeforeChangeLookup().isPresent()) {

if (options.getFullDocumentBeforeChangeLookup().isPresent()) {
publisher = publisher.fullDocumentBeforeChange(options.getFullDocumentBeforeChangeLookup().get());
}
return publisher.fullDocument(options.getFullDocumentLookup().orElse(fullDocument));
Expand Down Expand Up @@ -2495,7 +2495,7 @@ private WriteConcern potentiallyForceAcknowledgedWrite(@Nullable WriteConcern wc

if (ObjectUtils.nullSafeEquals(WriteResultChecking.EXCEPTION, writeResultChecking)) {
if (wc == null || wc.getWObject() == null
|| (wc.getWObject() instanceof Number && ((Number) wc.getWObject()).intValue() < 1)) {
|| (wc.getWObject()instanceof Number && ((Number) wc.getWObject()).intValue() < 1)) {
return WriteConcern.ACKNOWLEDGED;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
BsonTimestamp startAt = null;
boolean resumeAfter = true;

if (options instanceof ChangeStreamRequest.ChangeStreamRequestOptions) {
if (options instanceof ChangeStreamRequest.ChangeStreamRequestOptions requestOptions) {

ChangeStreamOptions changeStreamOptions = ((ChangeStreamRequestOptions) options).getChangeStreamOptions();
ChangeStreamOptions changeStreamOptions = requestOptions.getChangeStreamOptions();
filter = prepareFilter(template, changeStreamOptions);

if (changeStreamOptions.getFilter().isPresent()) {
Expand All @@ -115,9 +115,7 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
.orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
: FullDocument.UPDATE_LOOKUP);

if(changeStreamOptions.getFullDocumentBeforeChangeLookup().isPresent()) {
fullDocumentBeforeChange = changeStreamOptions.getFullDocumentBeforeChangeLookup().get();
}
fullDocumentBeforeChange = changeStreamOptions.getFullDocumentBeforeChangeLookup().orElse(null);

startAt = changeStreamOptions.getResumeBsonTimestamp().orElse(null);
}
Expand Down Expand Up @@ -158,7 +156,7 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
}

iterable = iterable.fullDocument(fullDocument);
if(fullDocumentBeforeChange != null) {
if (fullDocumentBeforeChange != null) {
iterable = iterable.fullDocumentBeforeChange(fullDocumentBeforeChange);
}

Expand All @@ -181,7 +179,8 @@ List<Document> prepareFilter(MongoTemplate template, ChangeStreamOptions options
template.getConverter().getMappingContext(), queryMapper)
: Aggregation.DEFAULT_CONTEXT;

return agg.toPipeline(new PrefixingDelegatingAggregationOperationContext(context, "fullDocument", denylist));
return agg
.toPipeline(new PrefixingDelegatingAggregationOperationContext(context, "fullDocument", denylist));
}

if (filter instanceof List) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.mongodb.util.BsonUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
Expand Down Expand Up @@ -93,14 +92,14 @@
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.mongodb.core.timeseries.Granularity;
import org.springframework.data.mongodb.util.BsonUtils;
import org.springframework.data.projection.SpelAwareProxyProjectionFactory;
import org.springframework.lang.Nullable;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.util.CollectionUtils;

import com.mongodb.MongoClientSettings;
import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;
import com.mongodb.client.model.CountOptions;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.client.model.DeleteOptions;
Expand Down Expand Up @@ -1541,11 +1540,11 @@ void changeStreamOptionFullDocumentBeforeChangeShouldBeApplied() {
when(changeStreamPublisher.fullDocument(any())).thenReturn(changeStreamPublisher);
when(changeStreamPublisher.fullDocumentBeforeChange(any())).thenReturn(changeStreamPublisher);

template
.changeStream("database", "collection", ChangeStreamOptions.builder().fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.REQUIRED).build(), Object.class)
.subscribe();
ChangeStreamOptions options = ChangeStreamOptions.builder()
.fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.REQUIRED).build();
template.changeStream("database", "collection", options, Object.class).subscribe();

verify(changeStreamPublisher).fullDocumentBeforeChange(eq(FullDocumentBeforeChange.REQUIRED));
verify(changeStreamPublisher).fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,21 @@
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;

/**
* Unit tests for {@link ChangeStreamTask}.
*
* @author Christoph Strobl
* @author Myroslav Kosinskyi
*/
@ExtendWith(MockitoExtension.class)
@SuppressWarnings({ "unchecked", "rawtypes" })
class ChangeStreamTaskUnitTests {

ChangeStreamTask task;
@Mock MongoTemplate template;
@Mock MongoDatabase mongoDatabase;
@Mock MongoCollection<Document> mongoCollection;
@Mock ChangeStreamIterable<Document> changeStreamIterable;
private MongoConverter converter;

MongoConverter converter;

@BeforeEach
void setUp() {
Expand All @@ -64,9 +67,7 @@ void setUp() {
when(template.getDb()).thenReturn(mongoDatabase);

when(mongoDatabase.getCollection(any())).thenReturn(mongoCollection);

when(mongoCollection.watch(eq(Document.class))).thenReturn(changeStreamIterable);

when(changeStreamIterable.fullDocument(any())).thenReturn(changeStreamIterable);
}

Expand All @@ -84,7 +85,7 @@ void shouldNotBreakLovelaceBehavior() {

initTask(request, Document.class);

verify(changeStreamIterable).resumeAfter(eq(resumeToken));
verify(changeStreamIterable).resumeAfter(resumeToken);
}

@Test // DATAMONGO-2258
Expand All @@ -102,7 +103,7 @@ void shouldApplyResumeAfterToChangeStream() {

initTask(request, Document.class);

verify(changeStreamIterable).resumeAfter(eq(resumeToken));
verify(changeStreamIterable).resumeAfter(resumeToken);
}

@Test // DATAMONGO-2258
Expand All @@ -120,7 +121,7 @@ void shouldApplyStartAfterToChangeStream() {

initTask(request, Document.class);

verify(changeStreamIterable).startAfter(eq(resumeToken));
verify(changeStreamIterable).startAfter(resumeToken);
}

@Test // GH-4495
Expand All @@ -136,7 +137,7 @@ void shouldApplyFullDocumentBeforeChangeToChangeStream() {

initTask(request, Document.class);

verify(changeStreamIterable).fullDocumentBeforeChange(eq(FullDocumentBeforeChange.REQUIRED));
verify(changeStreamIterable).fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);
}

private MongoCursor<ChangeStreamDocument<Document>> initTask(ChangeStreamRequest request, Class<?> targetType) {
Expand Down

0 comments on commit a429ff8

Please sign in to comment.