Skip to content

Commit

Permalink
Add support for OP_MSG with checksum #71
Browse files Browse the repository at this point in the history
  • Loading branch information
bwaldvogel committed Jun 8, 2020
1 parent 5be8bfa commit abb27e0
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 26 deletions.
23 changes: 23 additions & 0 deletions core/src/main/java/de/bwaldvogel/mongo/wire/MessageFlag.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package de.bwaldvogel.mongo.wire;

public enum MessageFlag {
CHECKSUM_PRESENT(0),
MORE_TO_COME(1),
EXHAUST_ALLOWED(16),
;

private final int value;

MessageFlag(int bit) {
this.value = 1 << bit;
}

public boolean isSet(int flags) {
return (flags & value) == value;
}

public int removeFrom(int flags) {
return flags - value;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -40,6 +43,7 @@ public class MongoWireProtocolHandler extends LengthFieldBasedFrameDecoder {
private static final int LENGTH_FIELD_LENGTH = 4;
private static final int LENGTH_ADJUSTMENT = -LENGTH_FIELD_LENGTH;
private static final int INITIAL_BYTES_TO_STRIP = 0;
private static final int CHECKSUM_LENGTH = 4;

public MongoWireProtocolHandler() {
super(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP);
Expand Down Expand Up @@ -221,39 +225,67 @@ private ClientRequest handleKillCursors(Channel channel, MessageHeader header, B
}

private ClientRequest handleMessage(Channel channel, MessageHeader header, ByteBuf buffer) {
int flags = buffer.readIntLE();
if (flags != 0) {
throw new UnsupportedOperationException("flags=" + flags + " not yet supported");
int flagBits = buffer.readIntLE();

Set<MessageFlag> flags = EnumSet.noneOf(MessageFlag.class);
if (MessageFlag.CHECKSUM_PRESENT.isSet(flagBits)) {
flagBits = MessageFlag.CHECKSUM_PRESENT.removeFrom(flagBits);
flags.add(MessageFlag.CHECKSUM_PRESENT);
}

if (flagBits != 0) {
throw new UnsupportedOperationException("flags=" + flagBits + " not yet supported");
}

byte firstSectionKind = buffer.readByte();
Assert.equals(firstSectionKind, MongoMessage.SECTION_KIND_BODY);
Document document = BsonDecoder.decodeBson(buffer);
int expectedPayloadSize = header.getTotalLength() - LENGTH_FIELD_LENGTH;
if (flags.contains(MessageFlag.CHECKSUM_PRESENT)) {
expectedPayloadSize -= CHECKSUM_LENGTH;
}

while (readerDidNotReachEnd(header, buffer)) {
Document body = null;
Document documentSequence = new Document();
while (buffer.readerIndex() < expectedPayloadSize) {
byte sectionKind = buffer.readByte();
Assert.equals(sectionKind, MongoMessage.SECTION_KIND_DOCUMENT_SEQUENCE);

int sectionSize = buffer.readIntLE();
int expectedSize = header.getTotalLength() - buffer.readerIndex();
Assert.equals(sectionSize, expectedSize);
String documentIdentifier = BsonDecoder.decodeCString(buffer);
List<Document> documents = new ArrayList<>();
do {
Document subDocument = BsonDecoder.decodeBson(buffer);
documents.add(subDocument);
} while (readerDidNotReachEnd(header, buffer));

Assert.notEmpty(documents);
Object old = document.put(documentIdentifier, documents);
switch (sectionKind) {
case MongoMessage.SECTION_KIND_BODY:
Assert.isNull(body);
body = BsonDecoder.decodeBson(buffer);
break;
case MongoMessage.SECTION_KIND_DOCUMENT_SEQUENCE:
decodeKindDocumentSequence(buffer, documentSequence);
break;
default:
throw new IllegalArgumentException("Unexpected section kind: " + sectionKind);
}
}

if (flags.contains(MessageFlag.CHECKSUM_PRESENT)) {
int checksum = buffer.readIntLE();
log.trace("Ignoring checksum {}", checksum);
}

Assert.notNull(body);
for (Map.Entry<String, Object> entry : documentSequence.entrySet()) {
Object old = body.put(entry.getKey(), entry.getValue());
Assert.isNull(old);
}

return new MongoMessage(channel, header, document);
return new MongoMessage(channel, header, body);
}

private static boolean readerDidNotReachEnd(MessageHeader header, ByteBuf buffer) {
return buffer.readerIndex() < header.getTotalLength() - LENGTH_FIELD_LENGTH;
private void decodeKindDocumentSequence(ByteBuf buffer, Document documentSequence) {
int readerStartOffset = buffer.readerIndex();
int sectionSize = buffer.readIntLE();
String documentIdentifier = BsonDecoder.decodeCString(buffer);
List<Document> documents = new ArrayList<>();
do {
Document subDocument = BsonDecoder.decodeBson(buffer);
documents.add(subDocument);
} while (buffer.readerIndex() - readerStartOffset < sectionSize);

Assert.notEmpty(documents);
Object old = documentSequence.put(documentIdentifier, documents);
Assert.isNull(old);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3992,8 +3992,7 @@ public void testRenameField_embeddedDocument() {
assertThat(collection.find().first()).isEqualTo(json("_id: 1, foo: { b: 2 }, bar: { c: 3, d: 4 }, a: 1}"));

assertThatExceptionOfType(MongoWriteException.class)
.isThrownBy(() -> collection.updateOne(json("_id: 1"), json("$rename: {'foo.b.c': 'foo.b.d'}")
));
.isThrownBy(() -> collection.updateOne(json("_id: 1"), json("$rename: {'foo.b.c': 'foo.b.d'}")));
}

@Test
Expand Down

0 comments on commit abb27e0

Please sign in to comment.