Skip to content

Commit

Permalink
TOOLS-1755: OP_MSG support in mongoreplay
Browse files Browse the repository at this point in the history
  • Loading branch information
Will Banfield committed Nov 10, 2017
1 parent 3f5fcf3 commit 918cb1d
Show file tree
Hide file tree
Showing 18 changed files with 1,720 additions and 565 deletions.
20 changes: 10 additions & 10 deletions common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ functions:
set -e
echo "starting repl set"
mkdir -p /data/db/
PATH=./bin:$PATH ./bin/mongo --nodb --eval 'TestData = new Object(); TestData.minPort="${mongod_port}"; var repl = new ReplSetTest({nodes:3});repl.startSet();repl.initiate();repl.awaitSecondaryNodes();while(true){sleep(1000);}'
PATH=./bin:$PATH ./bin/mongo --nodb --eval 'TestData = new Object(); TestData.minPort="${mongod_port}"; var repl = new ReplSetTest({nodes:3, name:"repltester"});repl.startSet();repl.initiate();repl.awaitSecondaryNodes();while(true){sleep(1000);}'
- command: shell.exec
params:
working_dir: src
Expand Down Expand Up @@ -848,7 +848,7 @@ tasks:
value: "${args} -test.types=db"
- func: "download mongod"
vars:
mongo_version: "3.4"
mongo_version: "3.5.12"
- func: "start mongod"
- func: "wait for mongod to be ready"
- func: "setup integration test"
Expand Down Expand Up @@ -925,7 +925,7 @@ tasks:
value: "${args} -test.types=${integration_test_args}"
- func: "download mongod"
vars:
mongo_version: "3.4"
mongo_version: "3.5.12"
- func: "start mongod"
- func: "wait for mongod to be ready"
- func: "run tool integration tests"
Expand All @@ -949,7 +949,7 @@ tasks:
value: "db.createUser({ user: '${auth_username}', pwd: '${auth_password}', roles: [{ role: '__system', db: 'admin' }] });"
- func: "download mongod"
vars:
mongo_version: "3.4"
mongo_version: "3.5.12"
- func: "start mongod"
- func: "wait for mongod to be ready"
- func: "run tool integration tests"
Expand Down Expand Up @@ -1502,7 +1502,7 @@ tasks:
tool: mongoreplay
- func: "download mongod"
vars:
mongo_version: "3.4"
mongo_version: "3.5.12"
- func: "start mongod"
- func: "wait for mongod to be ready"
- command: shell.exec
Expand Down Expand Up @@ -1532,7 +1532,7 @@ tasks:
pcapFname: getmore_single_channel.pcap
- func: "download mongod"
vars:
mongo_version: "3.4"
mongo_version: "3.5.12"
- func: "start mongod"
- func: "wait for mongod to be ready"
- func: "run go_test"
Expand All @@ -1557,7 +1557,7 @@ tasks:
pcapFname: getmore_single_channel.pcap
- func: "download mongod"
vars:
mongo_version: "3.4"
mongo_version: "3.5.12"
- func: "create sharded_cluster"
- func: "run go_test"
vars:
Expand All @@ -1581,7 +1581,7 @@ tasks:
pcapFname: getmore_single_channel.pcap
- func: "download mongod"
vars:
mongo_version: "3.4"
mongo_version: "3.5.12"
- func: "start mongod"
vars:
additional_args: --auth
Expand Down Expand Up @@ -1611,7 +1611,7 @@ tasks:
pcapFname: getmore_single_channel.pcap
- func: "download mongod"
vars:
mongo_version: "3.4"
mongo_version: "3.5.12"
- func: "create repl_set"
vars:
mongod_port: ${mongod_port}
Expand All @@ -1631,7 +1631,7 @@ tasks:
tool: mongoreplay
- func: "download mongod"
vars:
mongo_version: "3.4"
mongo_version: "3.5.12"
- func: "fetch ftdc"
- command: shell.exec
params:
Expand Down
81 changes: 11 additions & 70 deletions mongoreplay/command_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,34 +41,11 @@ func (gmCommand *CommandGetMore) getCursorIDs() ([]int64, error) {
if gmCommand.cachedCursor != nil {
return []int64{*gmCommand.cachedCursor}, nil
}

var err error
switch t := gmCommand.CommandArgs.(type) {
case *bson.D:
for _, bsonDoc := range *t {
if bsonDoc.Name == "getMore" {
getmoreID, ok := bsonDoc.Value.(int64)
if !ok {
return []int64{}, fmt.Errorf("cursorID is not int64")
}
gmCommand.cachedCursor = &getmoreID
break
}
}
case *bson.Raw:
doc := &struct {
GetMore int64 `bson:"getMore"`
}{}
err = t.Unmarshal(doc)
if err != nil {
return []int64{}, fmt.Errorf("failed to unmarshal bson.Raw into struct: %v", err)
}

gmCommand.cachedCursor = &doc.GetMore
default:
panic("not a *bson.D or *bson.Raw")
cursorID, err := getGetMoreCursorID(gmCommand.CommandArgs)
if err != nil {
return []int64{}, err
}

gmCommand.cachedCursor = &cursorID
return []int64{*gmCommand.cachedCursor}, err
}

Expand All @@ -78,38 +55,12 @@ func (gmCommand *CommandGetMore) getCursorIDs() ([]int64, error) {
// errors, as it only ever expects one. It may also error if unmarshalling the
// underlying bson fails.
func (gmCommand *CommandGetMore) setCursorIDs(newCursorIDs []int64) error {
var newCursorID int64

if len(newCursorIDs) > 1 {
return fmt.Errorf("rewriting getmore command cursorIDs requires 1 id, received: %d", len(newCursorIDs))
}
if len(newCursorIDs) < 1 {
newCursorID = 0
} else {
newCursorID = newCursorIDs[0]
}
var doc bson.D
switch t := gmCommand.CommandArgs.(type) {
case *bson.D:
doc = *t
case *bson.Raw:
err := t.Unmarshal(&doc)
if err != nil {
return fmt.Errorf("failed to unmarshal bson.Raw into struct: %v", err)
}
default:
panic("not a *bson.D or *bson.Raw")
}

// loop over the keys of the bson.D and the set the correct one
for i, bsonDoc := range doc {
if bsonDoc.Name == "getMore" {
doc[i].Value = newCursorID
break
}
newDoc, newCursorID, err := setCursorID(gmCommand.CommandArgs, newCursorIDs)
if err != nil {
return err
}
gmCommand.CommandArgs = &newDoc
gmCommand.cachedCursor = &newCursorID
gmCommand.CommandArgs = &doc
return nil
}

Expand Down Expand Up @@ -273,22 +224,12 @@ func (op *CommandOp) Execute(socket *mgo.MongoSocket) (Replyable, error) {
return nil, err
}
commandReplyOp.CommandReply = commandReplyAsRaw
doc := &struct {
Cursor struct {
FirstBatch []bson.Raw `bson:"firstBatch"`
NextBatch []bson.Raw `bson:"nextBatch"`
} `bson:"cursor"`
}{}
err = commandReplyAsRaw.Unmarshal(&doc)

cursorDocs, err := getCursorDocs(commandReplyAsRaw)
if err != nil {
return nil, err
}

if doc.Cursor.FirstBatch != nil {
commandReplyOp.Docs = doc.Cursor.FirstBatch
} else if doc.Cursor.NextBatch != nil {
commandReplyOp.Docs = doc.Cursor.NextBatch
}
commandReplyOp.Docs = cursorDocs

for _, d := range replyData {
dataDoc := &bson.Raw{}
Expand Down
29 changes: 11 additions & 18 deletions mongoreplay/command_reply.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ import (
type CommandReplyOp struct {
Header MsgHeader
mgo.CommandReplyOp
Docs []bson.Raw
Latency time.Duration
cursorCached bool
cursorID int64
Docs []bson.Raw
Latency time.Duration
cursorID *int64
}

// OpCode returns the OpCode for a CommandReplyOp.
Expand Down Expand Up @@ -75,23 +74,17 @@ func (op *CommandReplyOp) Abbreviated(chars int) string {
// caches in the CommandReplyOp struct so that multiple calls to this function
// do not incur the cost of unmarshalling the bson.
func (op *CommandReplyOp) getCursorID() (int64, error) {
if op.cursorCached {
return op.cursorID, nil
}
doc := &struct {
Cursor struct {
ID int64 `bson:"id"`
} `bson:"cursor"`
}{}
if op.cursorID != nil {
return *op.cursorID, nil
}
replyArgs := op.CommandReply.(*bson.Raw)
err := replyArgs.Unmarshal(doc)

id, err := getCursorID(replyArgs)
if err != nil {
// can happen if there's corrupt bson in the doc.
return 0, fmt.Errorf("failed to unmarshal bson.Raw into struct: %v", err)
return 0, err
}
op.cursorCached = true
op.cursorID = doc.Cursor.ID
return op.cursorID, nil
op.cursorID = &id
return *op.cursorID, nil
}

func (op *CommandReplyOp) getOpBodyString() (string, string, string, error) {
Expand Down
14 changes: 9 additions & 5 deletions mongoreplay/cursors.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,14 @@ func newPreprocessCursorManager(opChan <-chan *RecordedOp) (*preprocessCursorMan
// Loop over all the ops found in the file
for op := range opChan {

opCode := op.RawOp.Header.OpCode
// If they don't produce a cursor, skip them
if op.RawOp.Header.OpCode != OpCodeGetMore && op.RawOp.Header.OpCode != OpCodeKillCursors &&
op.RawOp.Header.OpCode != OpCodeReply && op.RawOp.Header.OpCode != OpCodeCommandReply && op.RawOp.Header.OpCode != OpCodeCommand {
if opCode != OpCodeGetMore && opCode != OpCodeKillCursors &&
opCode != OpCodeReply && opCode != OpCodeCommandReply &&
opCode != OpCodeCommand && opCode != OpCodeMessage {
continue
}
if op.RawOp.Header.OpCode == OpCodeCommand {
if opCode == OpCodeCommand {
commandName, err := getCommandName(&op.RawOp)
if err != nil {
userInfoLogger.Logvf(DebugLow, "preprocessing op no command name: %v", err)
Expand Down Expand Up @@ -225,7 +227,10 @@ func newPreprocessCursorManager(opChan <-chan *RecordedOp) (*preprocessCursorMan
continue
}
cursorsSeen.trackReplied(cursorID, op)

default:
// In this case, parsing the op revealed it to not be a replyable
// or able to be rewritten
continue
}
}

Expand All @@ -238,7 +243,6 @@ func newPreprocessCursorManager(opChan <-chan *RecordedOp) (*preprocessCursorMan
replyConn: counter.replyConn,
}
result.opToCursors[counter.opOriginKey] = cursorID

}
}
userInfoLogger.Logvf(Always, "Preprocess complete")
Expand Down
6 changes: 3 additions & 3 deletions mongoreplay/cursors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestPreprocessingFile(t *testing.T) {
generator := newRecordedOpGenerator()
var err error

err = generator.generateReply(requestID, testCursorID, 0)
err = generator.generateReply(requestID, testCursorID)
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -235,7 +235,7 @@ func TestSkipOutOfOrderCursor(t *testing.T) {
t.Error(err)
}

err = generator.generateReply(requestID, testCursorID, 0)
err = generator.generateReply(requestID, testCursorID)
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -288,7 +288,7 @@ func TestSkipOnMarkFailed(t *testing.T) {
if err != nil {
t.Error(err)
}
err = generator.generateReply(requestID, testCursorID, 0)
err = generator.generateReply(requestID, testCursorID)
if err != nil {
t.Error(err)
}
Expand Down
19 changes: 11 additions & 8 deletions mongoreplay/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ func (context *ExecutionContext) AddFromWire(reply Replyable, recordedOp *Record
// on the reversed src/dest of the recordedOp which should the RecordedOp that
// this ReplyOp was unmarshaled out of.
func (context *ExecutionContext) AddFromFile(reply Replyable, recordedOp *RecordedOp) {
if cursorID, _ := reply.getCursorID(); cursorID == 0 {
return
}
key := cacheKey(recordedOp, true)
toolDebugLogger.Logvf(DebugHigh, "Adding recorded reply with key %v", key)
context.completeReply(key, reply, ReplyFromFile)
Expand Down Expand Up @@ -233,15 +236,17 @@ func (context *ExecutionContext) Execute(op *RecordedOp, socket *mgo.MongoSocket
toolDebugLogger.Logvf(Always, "Skipping incomplete op: %v", op.RawOp.Header.OpCode)
return nil, nil, nil
}
if recordedReply, ok := opToExec.(*ReplyOp); ok {
context.AddFromFile(recordedReply, op)
} else if recordedCommandReply, ok := opToExec.(*CommandReplyOp); ok {
context.AddFromFile(recordedCommandReply, op)
} else {
switch replyable := opToExec.(type) {
case *ReplyOp:
context.AddFromFile(replyable, op)
case *CommandReplyOp:
context.AddFromFile(replyable, op)
case *MsgOpReply:
context.AddFromFile(replyable, op)
default:
if !context.driverOpsFiltered && IsDriverOp(opToExec) {
return opToExec, nil, nil
}

if rewriteable, ok1 := opToExec.(cursorsRewriteable); ok1 {
ok2, err := context.rewriteCursors(rewriteable, op.SeenConnectionNum)
if err != nil {
Expand All @@ -267,9 +272,7 @@ func (context *ExecutionContext) Execute(op *RecordedOp, socket *mgo.MongoSocket
}
if reply != nil {
context.AddFromWire(reply, op)

}

}
context.handleCompletedReplies()
return opToExec, reply, nil
Expand Down
Loading

0 comments on commit 918cb1d

Please sign in to comment.