diff --git a/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabase.java b/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabase.java index e31d7bcb..30b28ef3 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabase.java +++ b/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabase.java @@ -644,7 +644,7 @@ private Document commandChangeStreamPipeline(Document query, Oplog oplog, String int batchSize = (int) cursorDocument.getOrDefault("batchSize", 0); String namespace = getFullCollectionNamespace(collectionName); - Cursor cursor = oplog.createCursor(changeStreamDocument, namespace, aggregation); + Cursor cursor = oplog.createChangeStreamCursor(changeStreamDocument, namespace, aggregation); return Utils.firstBatchCursorResponse(namespace, cursor.takeDocuments(batchSize), cursor); } diff --git a/core/src/main/java/de/bwaldvogel/mongo/oplog/ChangeStreamCursor.java b/core/src/main/java/de/bwaldvogel/mongo/oplog/ChangeStreamCursor.java new file mode 100644 index 00000000..d47808a4 --- /dev/null +++ b/core/src/main/java/de/bwaldvogel/mongo/oplog/ChangeStreamCursor.java @@ -0,0 +1,143 @@ +package de.bwaldvogel.mongo.oplog; + +import java.util.List; +import java.util.stream.Collectors; + +import de.bwaldvogel.mongo.MongoBackend; +import de.bwaldvogel.mongo.backend.TailableCursor; +import de.bwaldvogel.mongo.backend.aggregation.Aggregation; +import de.bwaldvogel.mongo.bson.BsonTimestamp; +import de.bwaldvogel.mongo.bson.Document; +import de.bwaldvogel.mongo.exception.MongoServerException; + +public class ChangeStreamCursor implements TailableCursor { + + private static final String FULL_DOCUMENT = "fullDocument"; + private static final String OPERATION_TYPE = "operationType"; + private static final String CLUSTER_TIME = "clusterTime"; + private static final String DOCUMENT_KEY = "documentKey"; + + private final MongoBackend mongoBackend; + private final Document changeStreamDocument; + private final Aggregation aggregation; + private final OplogCursor oplogCursor; + + ChangeStreamCursor( + MongoBackend mongoBackend, + Document changeStreamDocument, + Aggregation aggregation, + OplogCursor oplogCursor + ) { + this.mongoBackend = mongoBackend; + this.changeStreamDocument = changeStreamDocument; + this.aggregation = aggregation; + this.oplogCursor = oplogCursor; + } + + @Override + public long getId() { + return oplogCursor.getId(); + } + + @Override + public boolean isEmpty() { + return oplogCursor.isEmpty(); + } + + @Override + public List<Document> takeDocuments(int numberToReturn) { + return aggregation.runStagesAsStream( + oplogCursor.takeDocuments(numberToReturn).stream() + .map(this::toChangeStreamResponseDocument) + ).collect(Collectors.toList()); + } + + @Override + public OplogPosition getPosition() { + return oplogCursor.getPosition(); + } + + private Document toChangeStreamResponseDocument(Document oplogDocument) { + OperationType operationType = OperationType.fromCode(oplogDocument.get("op").toString()); + Document documentKey = new Document(); + Document document = getUpdateDocument(oplogDocument); + BsonTimestamp timestamp = OplogUtils.getOplogTimestamp(oplogDocument); + OplogPosition oplogPosition = new OplogPosition(timestamp); + switch (operationType) { + case UPDATE: + case DELETE: + documentKey = document; + break; + case INSERT: + documentKey.append(OplogDocumentFields.ID, document.get(OplogDocumentFields.ID)); + break; + case COMMAND: + return toChangeStreamCommandResponseDocument(oplogDocument, oplogPosition, timestamp); + default: + throw new IllegalArgumentException("Unexpected operation type: " + operationType); + } + + return new Document() + .append(OplogDocumentFields.ID, new Document(OplogDocumentFields.ID_DATA_KEY, oplogPosition.toHexString())) + .append(OPERATION_TYPE, operationType.getDescription()) + .append(FULL_DOCUMENT, getFullDocument(changeStreamDocument, oplogDocument, operationType)) + .append(DOCUMENT_KEY, documentKey) + .append(CLUSTER_TIME, timestamp); + } + + private Document toChangeStreamCommandResponseDocument(Document oplogDocument, OplogPosition oplogPosition, BsonTimestamp timestamp) { + Document document = getUpdateDocument(oplogDocument); + String operationType = document.keySet().stream().findFirst().orElseThrow( + () -> new MongoServerException("Unspecified command operation type") + ); + + return new Document() + .append(OplogDocumentFields.ID, new Document(OplogDocumentFields.ID_DATA_KEY, oplogPosition.toHexString())) + .append(OPERATION_TYPE, operationType) + .append(CLUSTER_TIME, timestamp); + } + + private Document getFullDocument(Document changeStreamDocument, Document document, OperationType operationType) { + switch (operationType) { + case INSERT: + return getUpdateDocument(document); + case DELETE: + return null; + case UPDATE: + return lookUpUpdateDocument(changeStreamDocument, document); + } + throw new IllegalArgumentException("Invalid operation type"); + } + + private Document getUpdateDocument(Document document) { + return (Document) document.get(OplogDocumentFields.O); + } + + private Document lookUpUpdateDocument(Document changeStreamDocument, Document document) { + Document deltaUpdate = getDeltaUpdate(getUpdateDocument(document)); + if (changeStreamDocument.containsKey(FULL_DOCUMENT) && changeStreamDocument.get(FULL_DOCUMENT).equals("updateLookup")) { + String namespace = (String) document.get(OplogDocumentFields.NAMESPACE); + String databaseName = namespace.split("\\.")[0]; + String collectionName = namespace.split("\\.")[1]; + return mongoBackend.resolveDatabase(databaseName) + .resolveCollection(collectionName, true) + .queryAllAsStream() + .filter(d -> d.get(OplogDocumentFields.ID).equals(((Document) document.get(OplogDocumentFields.O2)).get(OplogDocumentFields.ID))) + .findFirst() + .orElse(deltaUpdate); + } + return deltaUpdate; + } + + private Document getDeltaUpdate(Document updateDocument) { + Document delta = new Document(); + if (updateDocument.containsKey("$set")) { + delta.appendAll((Document) updateDocument.get("$set")); + } + if (updateDocument.containsKey("$unset")) { + delta.appendAll((Document) updateDocument.get("$unset")); + } + return delta; + } + +} diff --git a/core/src/main/java/de/bwaldvogel/mongo/oplog/CollectionBackedOplog.java b/core/src/main/java/de/bwaldvogel/mongo/oplog/CollectionBackedOplog.java index 5fb0be01..6b4d722e 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/oplog/CollectionBackedOplog.java +++ b/core/src/main/java/de/bwaldvogel/mongo/oplog/CollectionBackedOplog.java @@ -1,30 +1,27 @@ package de.bwaldvogel.mongo.oplog; import java.util.List; +import java.util.Objects; import java.util.UUID; import java.util.function.Function; +import java.util.stream.Collectors; import java.util.stream.Stream; import de.bwaldvogel.mongo.MongoBackend; import de.bwaldvogel.mongo.MongoCollection; -import de.bwaldvogel.mongo.backend.Cursor; import de.bwaldvogel.mongo.backend.CursorRegistry; +import de.bwaldvogel.mongo.backend.TailableCursor; import de.bwaldvogel.mongo.backend.Utils; import de.bwaldvogel.mongo.backend.aggregation.Aggregation; import de.bwaldvogel.mongo.bson.BsonTimestamp; import de.bwaldvogel.mongo.bson.Document; -import de.bwaldvogel.mongo.exception.MongoServerException; public class CollectionBackedOplog implements Oplog { private static final long ELECTION_TERM = 1L; private static final String START_AT_OPERATION_TIME = "startAtOperationTime"; - private static final String FULL_DOCUMENT = "fullDocument"; private static final String START_AFTER = "startAfter"; private static final String RESUME_AFTER = "resumeAfter"; - private static final String OPERATION_TYPE = "operationType"; - private static final String CLUSTER_TIME = "clusterTime"; - private static final String DOCUMENT_KEY = "documentKey"; private final OplogClock oplogClock; private final MongoCollection<Document> collection; @@ -83,21 +80,19 @@ public void handleDropCollection(String namespace) { collection.addDocument(toOplogDropCollection(databaseName, collectionName)); } - private Stream<Document> streamOplog(Document changeStreamDocument, OplogPosition position, Aggregation aggregation, - String namespace) { - return aggregation.runStagesAsStream(collection.queryAllAsStream() + private Stream<Document> streamOplog(OplogPosition position, String namespace) { + return collection.queryAllAsStream() .filter(document -> filterNamespace(document, namespace)) .filter(document -> { - BsonTimestamp timestamp = getOplogTimestamp(document); + BsonTimestamp timestamp = OplogUtils.getOplogTimestamp(document); OplogPosition documentOplogPosition = new OplogPosition(timestamp); return documentOplogPosition.isAfter(position); }) .sorted((o1, o2) -> { - BsonTimestamp timestamp1 = getOplogTimestamp(o1); - BsonTimestamp timestamp2 = getOplogTimestamp(o2); + BsonTimestamp timestamp1 = OplogUtils.getOplogTimestamp(o1); + BsonTimestamp timestamp2 = OplogUtils.getOplogTimestamp(o2); return timestamp1.compareTo(timestamp2); - }) - .map(document -> toChangeStreamResponseDocument(document, changeStreamDocument))); + }); } private static boolean filterNamespace(Document document, String namespace) { @@ -110,7 +105,16 @@ private static boolean filterNamespace(Document document, String namespace) { } @Override - public Cursor createCursor(Document changeStreamDocument, String namespace, Aggregation aggregation) { + public OplogCursor createCursor(String namespace, OplogPosition initialOplogPosition) { + return new OplogCursor( + cursorRegistry.generateCursorId(), + position -> streamOplog(position, namespace), + initialOplogPosition + ); + } + + @Override + public TailableCursor createChangeStreamCursor(Document changeStreamDocument, String namespace, Aggregation aggregation) { Document startAfter = (Document) changeStreamDocument.get(START_AFTER); Document resumeAfter = (Document) changeStreamDocument.get(RESUME_AFTER); BsonTimestamp startAtOperationTime = (BsonTimestamp) changeStreamDocument.get(START_AT_OPERATION_TIME); @@ -123,7 +127,7 @@ public Cursor createCursor(Document changeStreamDocument, String namespace, Aggr String collectionName = Utils.getCollectionNameFromFullName(namespace); boolean resumeAfterTerminalEvent = collection.queryAllAsStream() .filter(document -> { - BsonTimestamp timestamp = getOplogTimestamp(document); + BsonTimestamp timestamp = OplogUtils.getOplogTimestamp(document); OplogPosition documentOplogPosition = new OplogPosition(timestamp); return initialOplogPosition.isAfter(documentOplogPosition.inclusive()); }) @@ -141,9 +145,9 @@ public Cursor createCursor(Document changeStreamDocument, String namespace, Aggr initialOplogPosition = new OplogPosition(oplogClock.now()); } - Function<OplogPosition, Stream<Document>> streamSupplier = - position -> streamOplog(changeStreamDocument, position, aggregation, namespace); - OplogCursor cursor = new OplogCursor(cursorRegistry.generateCursorId(), streamSupplier, initialOplogPosition); + OplogCursor oplogCursor = createCursor(namespace, initialOplogPosition); + ChangeStreamCursor cursor + = new ChangeStreamCursor(backend, changeStreamDocument, aggregation, oplogCursor); cursorRegistry.add(cursor); return cursor; } @@ -185,91 +189,4 @@ private boolean isOplogCollection(String namespace) { return collection.getFullName().equals(namespace); } - private Document getFullDocument(Document changeStreamDocument, Document document, OperationType operationType) { - switch (operationType) { - case INSERT: - return getUpdateDocument(document); - case DELETE: - return null; - case UPDATE: - return lookUpUpdateDocument(changeStreamDocument, document); - } - throw new IllegalArgumentException("Invalid operation type"); - } - - private Document lookUpUpdateDocument(Document changeStreamDocument, Document document) { - Document deltaUpdate = getDeltaUpdate(getUpdateDocument(document)); - if (changeStreamDocument.containsKey(FULL_DOCUMENT) && changeStreamDocument.get(FULL_DOCUMENT).equals("updateLookup")) { - String namespace = (String) document.get(OplogDocumentFields.NAMESPACE); - String databaseName = namespace.split("\\.")[0]; - String collectionName = namespace.split("\\.")[1]; - return backend.resolveDatabase(databaseName) - .resolveCollection(collectionName, true) - .queryAllAsStream() - .filter(d -> d.get(OplogDocumentFields.ID).equals(((Document) document.get(OplogDocumentFields.O2)).get(OplogDocumentFields.ID))) - .findFirst() - .orElse(deltaUpdate); - } - return deltaUpdate; - } - - private Document getDeltaUpdate(Document updateDocument) { - Document delta = new Document(); - if (updateDocument.containsKey("$set")) { - delta.appendAll((Document) updateDocument.get("$set")); - } - if (updateDocument.containsKey("$unset")) { - delta.appendAll((Document) updateDocument.get("$unset")); - } - return delta; - } - - private Document toChangeStreamResponseDocument(Document oplogDocument, Document changeStreamDocument) { - OperationType operationType = OperationType.fromCode(oplogDocument.get(OplogDocumentFields.OPERATION_TYPE).toString()); - Document documentKey = new Document(); - Document document = getUpdateDocument(oplogDocument); - BsonTimestamp timestamp = getOplogTimestamp(oplogDocument); - OplogPosition oplogPosition = new OplogPosition(timestamp); - switch (operationType) { - case UPDATE: - case DELETE: - documentKey = document; - break; - case INSERT: - documentKey.append(OplogDocumentFields.ID, document.get(OplogDocumentFields.ID)); - break; - case COMMAND: - return toChangeStreamCommandResponseDocument(oplogDocument, oplogPosition, timestamp); - default: - throw new IllegalArgumentException("Unexpected operation type: " + operationType); - } - - return new Document() - .append(OplogDocumentFields.ID, new Document(OplogDocumentFields.ID_DATA_KEY, oplogPosition.toHexString())) - .append(OPERATION_TYPE, operationType.getDescription()) - .append(FULL_DOCUMENT, getFullDocument(changeStreamDocument, oplogDocument, operationType)) - .append(DOCUMENT_KEY, documentKey) - .append(CLUSTER_TIME, timestamp); - } - - private Document toChangeStreamCommandResponseDocument(Document oplogDocument, OplogPosition oplogPosition, BsonTimestamp timestamp) { - Document document = getUpdateDocument(oplogDocument); - String operationType = document.keySet().stream().findFirst().orElseThrow( - () -> new MongoServerException("Unspecified command operation type") - ); - - return new Document() - .append(OplogDocumentFields.ID, new Document(OplogDocumentFields.ID_DATA_KEY, oplogPosition.toHexString())) - .append(OPERATION_TYPE, operationType) - .append(CLUSTER_TIME, timestamp); - } - - private static BsonTimestamp getOplogTimestamp(Document document) { - return (BsonTimestamp) document.get(OplogDocumentFields.TIMESTAMP); - } - - private static Document getUpdateDocument(Document document) { - return (Document) document.get(OplogDocumentFields.O); - } - } diff --git a/core/src/main/java/de/bwaldvogel/mongo/oplog/NoopOplog.java b/core/src/main/java/de/bwaldvogel/mongo/oplog/NoopOplog.java index c138c0b6..2bb6b412 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/oplog/NoopOplog.java +++ b/core/src/main/java/de/bwaldvogel/mongo/oplog/NoopOplog.java @@ -2,8 +2,8 @@ import java.util.List; -import de.bwaldvogel.mongo.backend.Cursor; import de.bwaldvogel.mongo.backend.EmptyCursor; +import de.bwaldvogel.mongo.backend.TailableCursor; import de.bwaldvogel.mongo.backend.aggregation.Aggregation; import de.bwaldvogel.mongo.bson.Document; @@ -35,7 +35,12 @@ public void handleDropCollection(String namespace) { } @Override - public Cursor createCursor(Document changeStreamDocument, String namespace, Aggregation aggregation) { + public TailableCursor createCursor(String namespace, OplogPosition initialOplogPosition) { + return EmptyCursor.get(); + } + + @Override + public TailableCursor createChangeStreamCursor(Document changeStreamDocument, String namespace, Aggregation aggregation) { return EmptyCursor.get(); } } diff --git a/core/src/main/java/de/bwaldvogel/mongo/oplog/Oplog.java b/core/src/main/java/de/bwaldvogel/mongo/oplog/Oplog.java index e6572623..2fd39c1e 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/oplog/Oplog.java +++ b/core/src/main/java/de/bwaldvogel/mongo/oplog/Oplog.java @@ -2,7 +2,7 @@ import java.util.List; -import de.bwaldvogel.mongo.backend.Cursor; +import de.bwaldvogel.mongo.backend.TailableCursor; import de.bwaldvogel.mongo.backend.aggregation.Aggregation; import de.bwaldvogel.mongo.bson.Document; @@ -16,5 +16,7 @@ public interface Oplog { void handleDropCollection(String namespace); - Cursor createCursor(Document changeStreamDocument, String namespace, Aggregation aggregation); + TailableCursor createCursor(String namespace, OplogPosition initialOplogPosition); + + TailableCursor createChangeStreamCursor(Document changeStreamDocument, String namespace, Aggregation aggregation); } diff --git a/core/src/main/java/de/bwaldvogel/mongo/oplog/OplogCursor.java b/core/src/main/java/de/bwaldvogel/mongo/oplog/OplogCursor.java index fbc58989..ee2d3f28 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/oplog/OplogCursor.java +++ b/core/src/main/java/de/bwaldvogel/mongo/oplog/OplogCursor.java @@ -9,6 +9,7 @@ import de.bwaldvogel.mongo.backend.CollectionUtils; import de.bwaldvogel.mongo.backend.AbstractCursor; import de.bwaldvogel.mongo.backend.TailableCursor; +import de.bwaldvogel.mongo.bson.BsonTimestamp; import de.bwaldvogel.mongo.bson.Document; public class OplogCursor extends AbstractCursor implements TailableCursor { @@ -16,7 +17,7 @@ public class OplogCursor extends AbstractCursor implements TailableCursor { private final Function<OplogPosition, Stream<Document>> oplogStream; private OplogPosition position; - public OplogCursor(long cursorId, Function<OplogPosition, Stream<Document>> oplogStream, OplogPosition position) { + OplogCursor(long cursorId, Function<OplogPosition, Stream<Document>> oplogStream, OplogPosition position) { super(cursorId); this.oplogStream = oplogStream; this.position = position; @@ -63,9 +64,9 @@ private void updatePosition(List<Document> documents) { } } - private static OplogPosition getOplogPosition(Document document) { - Document id = (Document) document.get(OplogDocumentFields.ID); - return OplogPosition.fromDocument(id); + private static OplogPosition getOplogPosition(Document oplogDocument) { + final BsonTimestamp oplogTimestamp = OplogUtils.getOplogTimestamp(oplogDocument); + return OplogPosition.fromTimestamp(oplogTimestamp); } } diff --git a/core/src/main/java/de/bwaldvogel/mongo/oplog/OplogPosition.java b/core/src/main/java/de/bwaldvogel/mongo/oplog/OplogPosition.java index 642e2f80..69fe147e 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/oplog/OplogPosition.java +++ b/core/src/main/java/de/bwaldvogel/mongo/oplog/OplogPosition.java @@ -29,6 +29,10 @@ private static OplogPosition fromHexString(String hexString) { return new OplogPosition(Long.parseLong(hexString, 16)); } + public static OplogPosition fromTimestamp(BsonTimestamp timestamp) { + return new OplogPosition(timestamp); + } + public static OplogPosition fromDocument(Document document) { return fromHexString((String) document.get(OplogDocumentFields.ID_DATA_KEY)); } diff --git a/core/src/main/java/de/bwaldvogel/mongo/oplog/OplogUtils.java b/core/src/main/java/de/bwaldvogel/mongo/oplog/OplogUtils.java new file mode 100644 index 00000000..26417e47 --- /dev/null +++ b/core/src/main/java/de/bwaldvogel/mongo/oplog/OplogUtils.java @@ -0,0 +1,11 @@ +package de.bwaldvogel.mongo.oplog; + +import de.bwaldvogel.mongo.bson.BsonTimestamp; +import de.bwaldvogel.mongo.bson.Document; + +class OplogUtils { + + static BsonTimestamp getOplogTimestamp(Document document) { + return (BsonTimestamp) document.get(OplogDocumentFields.TIMESTAMP); + } +}