From 8a049e5e6fbc786e833729379462b8cfa5201c03 Mon Sep 17 00:00:00 2001 From: Pietro De Caro Date: Thu, 15 Feb 2018 11:35:26 +0100 Subject: [PATCH] changeStream support (#97) Add $changeStream support --- changestreams.go | 357 ++++++++++++++++++++++++++++++++ changestreams_test.go | 464 ++++++++++++++++++++++++++++++++++++++++++ harness/daemons/.env | 11 +- session.go | 64 +++++- 4 files changed, 885 insertions(+), 11 deletions(-) create mode 100644 changestreams.go create mode 100644 changestreams_test.go diff --git a/changestreams.go b/changestreams.go new file mode 100644 index 000000000..5c2279c66 --- /dev/null +++ b/changestreams.go @@ -0,0 +1,357 @@ +package mgo + +import ( + "errors" + "fmt" + "reflect" + "sync" + "time" + + "github.com/globalsign/mgo/bson" +) + +type FullDocument string + +const ( + Default = "default" + UpdateLookup = "updateLookup" +) + +type ChangeStream struct { + iter *Iter + isClosed bool + options ChangeStreamOptions + pipeline interface{} + resumeToken *bson.Raw + collection *Collection + readPreference *ReadPreference + err error + m sync.Mutex + sessionCopied bool +} + +type ChangeStreamOptions struct { + + // FullDocument controls the amount of data that the server will return when + // returning a changes document. + FullDocument FullDocument + + // ResumeAfter specifies the logical starting point for the new change stream. + ResumeAfter *bson.Raw + + // MaxAwaitTimeMS specifies the maximum amount of time for the server to wait + // on new documents to satisfy a change stream query. + MaxAwaitTimeMS time.Duration + + // BatchSize specifies the number of documents to return per batch. + BatchSize int + + // Collation specifies the way the server should collate returned data. + //TODO Collation *Collation +} + +var errMissingResumeToken = errors.New("resume token missing from result") + +// Watch constructs a new ChangeStream capable of receiving continuing data +// from the database. +func (coll *Collection) Watch(pipeline interface{}, + options ChangeStreamOptions) (*ChangeStream, error) { + + if pipeline == nil { + pipeline = []bson.M{} + } + + csPipe := constructChangeStreamPipeline(pipeline, options) + pipe := coll.Pipe(&csPipe) + if options.MaxAwaitTimeMS > 0 { + pipe.SetMaxTime(options.MaxAwaitTimeMS) + } + if options.BatchSize > 0 { + pipe.Batch(options.BatchSize) + } + pIter := pipe.Iter() + + // check that there was no issue creating the iterator. + // this will fail immediately with an error from the server if running against + // a standalone. + if err := pIter.Err(); err != nil { + return nil, err + } + + pIter.isChangeStream = true + return &ChangeStream{ + iter: pIter, + collection: coll, + resumeToken: nil, + options: options, + pipeline: pipeline, + }, nil +} + +// Next retrieves the next document from the change stream, blocking if necessary. +// Next returns true if a document was successfully unmarshalled into result, +// and false if an error occured. When Next returns false, the Err method should +// be called to check what error occurred during iteration. If there were no events +// available (ErrNotFound), the Err method returns nil so the user can retry the invocaton. +// +// For example: +// +// pipeline := []bson.M{} +// +// changeStream := collection.Watch(pipeline, ChangeStreamOptions{}) +// for changeStream.Next(&changeDoc) { +// fmt.Printf("Change: %v\n", changeDoc) +// } +// +// if err := changeStream.Close(); err != nil { +// return err +// } +// +// If the pipeline used removes the _id field from the result, Next will error +// because the _id field is needed to resume iteration when an error occurs. +// +func (changeStream *ChangeStream) Next(result interface{}) bool { + // the err field is being constantly overwritten and we don't want the user to + // attempt to read it at this point so we lock. + changeStream.m.Lock() + + defer changeStream.m.Unlock() + + // if we are in a state of error, then don't continue. + if changeStream.err != nil { + return false + } + + if changeStream.isClosed { + changeStream.err = fmt.Errorf("illegal use of a closed ChangeStream") + return false + } + + var err error + + // attempt to fetch the change stream result. + err = changeStream.fetchResultSet(result) + if err == nil { + return true + } + + // if we get no results we return false with no errors so the user can call Next + // again, resuming is not needed as the iterator is simply timed out as no events happened. + // The user will call Timeout in order to understand if this was the case. + if err == ErrNotFound { + return false + } + + // check if the error is resumable + if !isResumableError(err) { + // error is not resumable, give up and return it to the user. + changeStream.err = err + return false + } + + // try to resume. + err = changeStream.resume() + if err != nil { + // we've not been able to successfully resume and should only try once, + // so we give up. + changeStream.err = err + return false + } + + // we've successfully resumed the changestream. + // try to fetch the next result. + err = changeStream.fetchResultSet(result) + if err != nil { + changeStream.err = err + return false + } + + return true +} + +// Err returns nil if no errors happened during iteration, or the actual +// error otherwise. +func (changeStream *ChangeStream) Err() error { + changeStream.m.Lock() + defer changeStream.m.Unlock() + return changeStream.err +} + +// Close kills the server cursor used by the iterator, if any, and returns +// nil if no errors happened during iteration, or the actual error otherwise. +func (changeStream *ChangeStream) Close() error { + changeStream.m.Lock() + defer changeStream.m.Unlock() + changeStream.isClosed = true + err := changeStream.iter.Close() + if err != nil { + changeStream.err = err + } + if changeStream.sessionCopied { + changeStream.iter.session.Close() + changeStream.sessionCopied = false + } + return err +} + +// ResumeToken returns a copy of the current resume token held by the change stream. +// This token should be treated as an opaque token that can be provided to instantiate +// a new change stream. +func (changeStream *ChangeStream) ResumeToken() *bson.Raw { + changeStream.m.Lock() + defer changeStream.m.Unlock() + if changeStream.resumeToken == nil { + return nil + } + var tokenCopy = *changeStream.resumeToken + return &tokenCopy +} + +// Timeout returns true if the last call of Next returned false because of an iterator timeout. +func (changeStream *ChangeStream) Timeout() bool { + return changeStream.iter.Timeout() +} + +func constructChangeStreamPipeline(pipeline interface{}, + options ChangeStreamOptions) interface{} { + pipelinev := reflect.ValueOf(pipeline) + + // ensure that the pipeline passed in is a slice. + if pipelinev.Kind() != reflect.Slice { + panic("pipeline argument must be a slice") + } + + // construct the options to be used by the change notification + // pipeline stage. + changeStreamStageOptions := bson.M{} + + if options.FullDocument != "" { + changeStreamStageOptions["fullDocument"] = options.FullDocument + } + if options.ResumeAfter != nil { + changeStreamStageOptions["resumeAfter"] = options.ResumeAfter + } + + changeStreamStage := bson.M{"$changeStream": changeStreamStageOptions} + + pipeOfInterfaces := make([]interface{}, pipelinev.Len()+1) + + // insert the change notification pipeline stage at the beginning of the + // aggregation. + pipeOfInterfaces[0] = changeStreamStage + + // convert the passed in slice to a slice of interfaces. + for i := 0; i < pipelinev.Len(); i++ { + pipeOfInterfaces[1+i] = pipelinev.Index(i).Addr().Interface() + } + var pipelineAsInterface interface{} = pipeOfInterfaces + return pipelineAsInterface +} + +func (changeStream *ChangeStream) resume() error { + // copy the information for the new socket. + + // Thanks to Copy() future uses will acquire a new socket against the newly selected DB. + newSession := changeStream.iter.session.Copy() + + // fetch the cursor from the iterator and use it to run a killCursors + // on the connection. + cursorId := changeStream.iter.op.cursorId + err := runKillCursorsOnSession(newSession, cursorId) + if err != nil { + return err + } + + // change out the old connection to the database with the new connection. + if changeStream.sessionCopied { + changeStream.collection.Database.Session.Close() + } + changeStream.collection.Database.Session = newSession + changeStream.sessionCopied = true + + opts := changeStream.options + if changeStream.resumeToken != nil { + opts.ResumeAfter = changeStream.resumeToken + } + // make a new pipeline containing the resume token. + changeStreamPipeline := constructChangeStreamPipeline(changeStream.pipeline, opts) + + // generate the new iterator with the new connection. + newPipe := changeStream.collection.Pipe(changeStreamPipeline) + changeStream.iter = newPipe.Iter() + if err := changeStream.iter.Err(); err != nil { + return err + } + changeStream.iter.isChangeStream = true + return nil +} + +// fetchResumeToken unmarshals the _id field from the document, setting an error +// on the changeStream if it is unable to. +func (changeStream *ChangeStream) fetchResumeToken(rawResult *bson.Raw) error { + changeStreamResult := struct { + ResumeToken *bson.Raw `bson:"_id,omitempty"` + }{} + + err := rawResult.Unmarshal(&changeStreamResult) + if err != nil { + return err + } + + if changeStreamResult.ResumeToken == nil { + return errMissingResumeToken + } + + changeStream.resumeToken = changeStreamResult.ResumeToken + return nil +} + +func (changeStream *ChangeStream) fetchResultSet(result interface{}) error { + rawResult := bson.Raw{} + + // fetch the next set of documents from the cursor. + gotNext := changeStream.iter.Next(&rawResult) + err := changeStream.iter.Err() + if err != nil { + return err + } + + if !gotNext && err == nil { + // If the iter.Err() method returns nil despite us not getting a next batch, + // it is becuase iter.Err() silences this case. + return ErrNotFound + } + + // grab the resumeToken from the results + if err := changeStream.fetchResumeToken(&rawResult); err != nil { + return err + } + + // put the raw results into the data structure the user provided. + if err := rawResult.Unmarshal(result); err != nil { + return err + } + return nil +} + +func isResumableError(err error) bool { + _, isQueryError := err.(*QueryError) + // if it is not a database error OR it is a database error, + // but the error is a notMaster error + //and is not a missingResumeToken error (caused by the user provided pipeline) + return (!isQueryError || isNotMasterError(err)) && (err != errMissingResumeToken) +} + +func runKillCursorsOnSession(session *Session, cursorId int64) error { + socket, err := session.acquireSocket(true) + if err != nil { + return err + } + err = socket.Query(&killCursorsOp{[]int64{cursorId}}) + if err != nil { + return err + } + socket.Release() + + return nil +} diff --git a/changestreams_test.go b/changestreams_test.go new file mode 100644 index 000000000..792f5d6ef --- /dev/null +++ b/changestreams_test.go @@ -0,0 +1,464 @@ +package mgo_test + +import ( + mgo "github.com/globalsign/mgo" + "github.com/globalsign/mgo/bson" + . "gopkg.in/check.v1" +) + +type updateDesc struct { + UpdatedFields map[string]interface{} `bson:"updatedFields"` + RemovedFields []string `bson:"removedFields"` +} + +type evNamespace struct { + DB string `bson:"db"` + Coll string `bson:"coll"` +} + +type changeEvent struct { + ID interface{} `bson:"_id"` + OperationType string `bson:"operationType"` + FullDocument *bson.Raw `bson:"fullDocument,omitempty"` + Ns evNamespace `bson:"ns"` + DocumentKey M `bson:"documentKey"` + UpdateDescription *updateDesc `bson:"updateDescription,omitempty"` +} + +func (s *S) TestStreamsWatch(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + coll := session.DB("mydb").C("mycoll") + //add a mock document + coll.Insert(M{"a": 0}) + + pipeline := []bson.M{} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{}) + c.Assert(err, IsNil) + + err = changeStream.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsInsert(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add a mock document in order for the DB to be created + err = coll.Insert(M{"a": 0}) + c.Assert(err, IsNil) + + //create the stream + pipeline := []M{} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + + //insert a new document + id := bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 1}) + c.Assert(err, IsNil) + //get the _id for later check + type A struct { + ID bson.ObjectId `bson:"_id"` + A int `bson:"a"` + } + + //get the event + ev := changeEvent{} + hasEvent := changeStream.Next(&ev) + c.Assert(hasEvent, Equals, true) + + //check event is correct + oid := ev.DocumentKey["_id"].(bson.ObjectId) + c.Assert(oid, Equals, id) + c.Assert(ev.OperationType, Equals, "insert") + c.Assert(ev.FullDocument, NotNil) + a := A{} + err = ev.FullDocument.Unmarshal(&a) + c.Assert(err, IsNil) + c.Assert(a.A, Equals, 1) + c.Assert(ev.Ns.DB, Equals, "mydb") + c.Assert(ev.Ns.Coll, Equals, "mycoll") + + err = changeStream.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsNextNoEventTimeout(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add a mock document in order for the DB to be created + id := bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 0}) + c.Assert(err, IsNil) + + //create the stream + pipeline := []M{} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + + //check we timeout correctly on no events + //we should get a false result and no error + ev := changeEvent{} + hasEvent := changeStream.Next(&ev) + c.Assert(hasEvent, Equals, false) + c.Assert(changeStream.Err(), IsNil) + c.Assert(changeStream.Timeout(), Equals, true) + + //test the same with default timeout (MaxTimeMS=1000) + //create the stream + changeStream, err = coll.Watch(pipeline, mgo.ChangeStreamOptions{}) + c.Assert(err, IsNil) + hasEvent = changeStream.Next(&ev) + c.Assert(hasEvent, Equals, false) + c.Assert(changeStream.Err(), IsNil) + c.Assert(changeStream.Timeout(), Equals, true) + + err = changeStream.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsNextTimeout(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add a mock document in order for the DB to be created + id := bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 0}) + c.Assert(err, IsNil) + + //create the stream + pipeline := []M{} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + + //insert a new document to trigger an event + id = bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 1}) + c.Assert(err, IsNil) + + //ensure we get the event + ev := changeEvent{} + hasEvent := changeStream.Next(&ev) + c.Assert(hasEvent, Equals, true) + + //check we timeout correctly on no subsequent events + //we should get a false result and no error + ev = changeEvent{} + hasEvent = changeStream.Next(&ev) + c.Assert(hasEvent, Equals, false) + c.Assert(changeStream.Err(), IsNil) + c.Assert(changeStream.Timeout(), Equals, true) + + //insert a new document to trigger an event + id = bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 1}) + c.Assert(err, IsNil) + + //ensure we get the event + ev = changeEvent{} + hasEvent = changeStream.Next(&ev) + c.Assert(hasEvent, Equals, true) + + err = changeStream.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsDelete(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add a mock document in order for the DB to be created + id := bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 0}) + c.Assert(err, IsNil) + + //create the changeStream + pipeline := []M{} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + + //delete the document + err = coll.Remove(M{"_id": id}) + c.Assert(err, IsNil) + + //get the event + ev := changeEvent{} + hasEvent := changeStream.Next(&ev) + c.Assert(hasEvent, Equals, true) + + //check event is correct + oid := ev.DocumentKey["_id"].(bson.ObjectId) + c.Assert(oid, Equals, id) + c.Assert(ev.OperationType, Equals, "delete") + c.Assert(ev.FullDocument, IsNil) + c.Assert(ev.Ns.DB, Equals, "mydb") + c.Assert(ev.Ns.Coll, Equals, "mycoll") + + err = changeStream.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsUpdate(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add a mock document in order for the DB to be created + id := bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 0, "toremove": 2}) + c.Assert(err, IsNil) + + //create the stream + pipeline := []M{} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + + //update document + err = coll.UpdateId(id, M{"$set": M{"a": 1}, "$unset": M{"toremove": ""}}) + c.Assert(err, IsNil) + + //get the event + ev := changeEvent{} + hasEvent := changeStream.Next(&ev) + c.Assert(hasEvent, Equals, true) + + //check event is correct + oid := ev.DocumentKey["_id"].(bson.ObjectId) + c.Assert(oid, Equals, id) + c.Assert(ev.OperationType, Equals, "update") + c.Assert(ev.FullDocument, IsNil) + c.Assert(len(ev.UpdateDescription.UpdatedFields), Equals, 1) + c.Assert(len(ev.UpdateDescription.RemovedFields), Equals, 1) + c.Assert(ev.UpdateDescription.UpdatedFields["a"], Equals, 1) + c.Assert(ev.UpdateDescription.RemovedFields[0], Equals, "toremove") + c.Assert(ev.Ns.DB, Equals, "mydb") + c.Assert(ev.Ns.Coll, Equals, "mycoll") + + err = changeStream.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsUpdateFullDocument(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add a mock document in order for the DB to be created + id := bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 0, "toremove": "bla"}) + c.Assert(err, IsNil) + + //create the stream + pipeline := []M{} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500, FullDocument: mgo.UpdateLookup}) + c.Assert(err, IsNil) + + //update document + err = coll.UpdateId(id, M{"$set": M{"a": 1}, "$unset": M{"toremove": ""}}) + c.Assert(err, IsNil) + + //get the event + ev := changeEvent{} + hasEvent := changeStream.Next(&ev) + c.Assert(hasEvent, Equals, true) + + type A struct { + A int `bson:"a"` + ToRemove *string `bson:"toremove"` + } + + //check event is correct + oid := ev.DocumentKey["_id"].(bson.ObjectId) + c.Assert(oid, Equals, id) + c.Assert(ev.OperationType, Equals, "update") + c.Assert(len(ev.UpdateDescription.UpdatedFields), Equals, 1) + c.Assert(len(ev.UpdateDescription.RemovedFields), Equals, 1) + c.Assert(ev.UpdateDescription.UpdatedFields["a"], Equals, 1) + c.Assert(ev.UpdateDescription.RemovedFields[0], Equals, "toremove") + + c.Assert(ev.FullDocument, NotNil) + a := A{} + err = ev.FullDocument.Unmarshal(&a) + c.Assert(err, IsNil) + c.Assert(a.A, Equals, 1) + c.Assert(a.ToRemove, IsNil) + c.Assert(ev.Ns.DB, Equals, "mydb") + c.Assert(ev.Ns.Coll, Equals, "mycoll") + + err = changeStream.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsUpdateWithPipeline(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add two docs + id1 := bson.NewObjectId() + err = coll.Insert(M{"_id": id1, "a": 1}) + c.Assert(err, IsNil) + id2 := bson.NewObjectId() + err = coll.Insert(M{"_id": id2, "a": 2}) + c.Assert(err, IsNil) + + pipeline1 := []M{M{"$match": M{"documentKey._id": id1}}} + changeStream1, err := coll.Watch(pipeline1, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + pipeline2 := []M{M{"$match": M{"documentKey._id": id2}}} + changeStream2, err := coll.Watch(pipeline2, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + + //update documents + _, err = coll.UpdateAll(M{"_id": M{"$in": []bson.ObjectId{id1, id2}}}, M{"$inc": M{"a": 1}}) + c.Assert(err, IsNil) + + got1 := false + got2 := false + + //check we got the update for id1 (and no other) + for i := 0; i < 2; i++ { + ev := changeEvent{} + hasEvent := changeStream1.Next(&ev) + //we will accept only one event, the one that corresponds to our id1 + c.Assert(got1 && hasEvent, Equals, false) + if hasEvent { + oid := ev.DocumentKey["_id"].(bson.ObjectId) + c.Assert(oid, Equals, id1) + got1 = true + } + } + c.Assert(got1, Equals, true) + + //check we got the update for id2 (and no other) + for i := 0; i < 2; i++ { + ev := changeEvent{} + hasEvent := changeStream2.Next(&ev) + //we will accept only one event, the one that corresponds to our id2 + c.Assert(got2 && hasEvent, Equals, false) + if hasEvent { + oid := ev.DocumentKey["_id"].(bson.ObjectId) + c.Assert(oid, Equals, id2) + got2 = true + } + } + c.Assert(got2, Equals, true) + + err = changeStream1.Close() + c.Assert(err, IsNil) + err = changeStream2.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsResumeTokenMissingError(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add a mock document in order for the DB to be created + err = coll.Insert(M{"a": 0}) + c.Assert(err, IsNil) + + //create the stream + pipeline := []M{{"$project": M{"_id": 0}}} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + + //insert a new document + id := bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 1}) + c.Assert(err, IsNil) + + //check we get the correct error + ev := changeEvent{} + hasEvent := changeStream.Next(&ev) + c.Assert(hasEvent, Equals, false) + c.Assert(changeStream.Err().Error(), Equals, "resume token missing from result") + + err = changeStream.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsClosedStreamError(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add a mock document in order for the DB to be created + err = coll.Insert(M{"a": 0}) + c.Assert(err, IsNil) + + //create the stream + pipeline := []M{{"$project": M{"_id": 0}}} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + + //insert a new document + id := bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 1}) + c.Assert(err, IsNil) + + err = changeStream.Close() + c.Assert(err, IsNil) + + //check we get the correct error + ev := changeEvent{} + hasEvent := changeStream.Next(&ev) + c.Assert(hasEvent, Equals, false) + c.Assert(changeStream.Err().Error(), Equals, "illegal use of a closed ChangeStream") +} diff --git a/harness/daemons/.env b/harness/daemons/.env index 7ba8cf599..70acb5b92 100644 --- a/harness/daemons/.env +++ b/harness/daemons/.env @@ -59,7 +59,16 @@ if versionAtLeast 3 2; then COMMONDOPTS="$(echo "$COMMONDOPTS" | sed '/--nohttpinterface/d')" COMMONCOPTS="$(echo "$COMMONCOPTS" | sed '/--nohttpinterface/d')" - # config server need to be started as replica set + + if versionAtLeast 3 6; then + #In version 3.6 --nojournal is deprecated for replica set members using WiredTiger + COMMONDOPTSNOIP="$(echo "$COMMONDOPTSNOIP" | sed '/--nojournal/d')" + COMMONDOPTS="$(echo "$COMMONDOPTS" | sed '/--nojournal/d')" + COMMONCOPTS="$(echo "$COMMONCOPTS" | sed '/--nojournal/d')" + fi + + # config server need to be started as replica set + CFG1OPTS="--replSet conf1" CFG2OPTS="--replSet conf2" CFG3OPTS="--replSet conf3" diff --git a/session.go b/session.go index b62707c84..561f79487 100644 --- a/session.go +++ b/session.go @@ -169,7 +169,9 @@ type Iter struct { timeout time.Duration limit int32 timedout bool - findCmd bool + isFindCmd bool + isChangeStream bool + maxTimeMS int64 } var ( @@ -1117,6 +1119,11 @@ func isAuthError(err error) bool { return ok && e.Code == 13 } +func isNotMasterError(err error) bool { + e, ok := err.(*QueryError) + return ok && strings.Contains(e.Message, "not master") +} + func (db *Database) runUserCmd(cmdName string, user *User) error { cmd := make(bson.D, 0, 16) cmd = append(cmd, bson.DocElem{Name: cmdName, Value: user.Username}) @@ -2423,6 +2430,7 @@ type Pipe struct { pipeline interface{} allowDisk bool batchSize int + maxTimeMS int64 } type pipeCmd struct { @@ -2431,6 +2439,7 @@ type pipeCmd struct { Cursor *pipeCmdCursor `bson:",omitempty"` Explain bool `bson:",omitempty"` AllowDisk bool `bson:"allowDiskUse,omitempty"` + MaxTimeMS int64 `bson:"maxTimeMS,omitempty"` } type pipeCmdCursor struct { @@ -2485,6 +2494,9 @@ func (p *Pipe) Iter() *Iter { AllowDisk: p.allowDisk, Cursor: &pipeCmdCursor{p.batchSize}, } + if p.maxTimeMS > 0 { + cmd.MaxTimeMS = p.maxTimeMS + } err := c.Database.Run(cmd, &result) if e, ok := err.(*QueryError); ok && e.Message == `unrecognized field "cursor` { cmd.Cursor = nil @@ -2495,7 +2507,11 @@ func (p *Pipe) Iter() *Iter { if firstBatch == nil { firstBatch = result.Cursor.FirstBatch } - return c.NewIter(p.session, firstBatch, result.Cursor.Id, err) + it := c.NewIter(p.session, firstBatch, result.Cursor.Id, err) + if p.maxTimeMS > 0 { + it.maxTimeMS = p.maxTimeMS + } + return it } // NewIter returns a newly created iterator with the provided parameters. Using @@ -2557,7 +2573,7 @@ func (c *Collection) NewIter(session *Session, firstBatch []bson.Raw, cursorId i } if socket.ServerInfo().MaxWireVersion >= 4 && c.FullName != "admin.$cmd" { - iter.findCmd = true + iter.isFindCmd = true } iter.gotReply.L = &iter.m @@ -2659,6 +2675,13 @@ func (p *Pipe) Batch(n int) *Pipe { return p } +// SetMaxTime sets the maximum amount of time to allow the query to run. +// +func (p *Pipe) SetMaxTime(d time.Duration) *Pipe { + p.maxTimeMS = int64(d / time.Millisecond) + return p +} + // LastError the error status of the preceding write operation on the current connection. // // Relevant documentation: @@ -3801,7 +3824,7 @@ func (q *Query) Iter() *Iter { op.replyFunc = iter.op.replyFunc if prepareFindOp(socket, &op, limit) { - iter.findCmd = true + iter.isFindCmd = true } iter.server = socket.Server() @@ -4015,7 +4038,8 @@ func (iter *Iter) Timeout() bool { // Next returns true if a document was successfully unmarshalled onto result, // and false at the end of the result set or if an error happened. // When Next returns false, the Err method should be called to verify if -// there was an error during iteration. +// there was an error during iteration, and the Timeout method to verify if the +// false return value was caused by a timeout (no available results). // // For example: // @@ -4031,7 +4055,16 @@ func (iter *Iter) Next(result interface{}) bool { iter.m.Lock() iter.timedout = false timeout := time.Time{} + // for a ChangeStream iterator we have to call getMore before the loop otherwise + // we'll always return false + if iter.isChangeStream { + iter.getMore() + } + // check should we expect more data. for iter.err == nil && iter.docData.Len() == 0 && (iter.docsToReceive > 0 || iter.op.cursorId != 0) { + // we should expect more data. + + // If we have yet to receive data, increment the timer until we timeout. if iter.docsToReceive == 0 { if iter.timeout >= 0 { if timeout.IsZero() { @@ -4043,6 +4076,13 @@ func (iter *Iter) Next(result interface{}) bool { return false } } + // for a ChangeStream one loop i enought to declare the timeout + if iter.isChangeStream { + iter.timedout = true + iter.m.Unlock() + return false + } + // run a getmore to fetch more data. iter.getMore() if iter.err != nil { break @@ -4050,7 +4090,7 @@ func (iter *Iter) Next(result interface{}) bool { } iter.gotReply.Wait() } - + // We have data from the getMore. // Exhaust available data before reporting any errors. if docData, ok := iter.docData.Pop().([]byte); ok { close := false @@ -4066,6 +4106,7 @@ func (iter *Iter) Next(result interface{}) bool { } } if iter.op.cursorId != 0 && iter.err == nil { + // we still have a live cursor and currently expect data. iter.docsBeforeMore-- if iter.docsBeforeMore == -1 { iter.getMore() @@ -4255,7 +4296,7 @@ func (iter *Iter) getMore() { } } var op interface{} - if iter.findCmd { + if iter.isFindCmd || iter.isChangeStream { op = iter.getMoreCmd() } else { op = &iter.op @@ -4278,6 +4319,9 @@ func (iter *Iter) getMoreCmd() *queryOp { Collection: iter.op.collection[nameDot+1:], BatchSize: iter.op.limit, } + if iter.maxTimeMS > 0 { + getMore.MaxTimeMS = iter.maxTimeMS + } var op queryOp op.collection = iter.op.collection[:nameDot] + ".$cmd" @@ -4882,7 +4926,7 @@ func (iter *Iter) replyFunc() replyFunc { } else { iter.err = ErrNotFound } - } else if iter.findCmd { + } else if iter.isFindCmd { debugf("Iter %p received reply document %d/%d (cursor=%d)", iter, docNum+1, int(op.replyDocs), op.cursorId) var findReply struct { Ok bool @@ -4894,7 +4938,7 @@ func (iter *Iter) replyFunc() replyFunc { iter.err = err } else if !findReply.Ok && findReply.Errmsg != "" { iter.err = &QueryError{Code: findReply.Code, Message: findReply.Errmsg} - } else if len(findReply.Cursor.FirstBatch) == 0 && len(findReply.Cursor.NextBatch) == 0 { + } else if !iter.isChangeStream && len(findReply.Cursor.FirstBatch) == 0 && len(findReply.Cursor.NextBatch) == 0 { iter.err = ErrNotFound } else { batch := findReply.Cursor.FirstBatch @@ -5262,7 +5306,7 @@ func getRFC2253NameString(RDNElements *pkix.RDNSequence) string { var replacer = strings.NewReplacer(",", "\\,", "=", "\\=", "+", "\\+", "<", "\\<", ">", "\\>", ";", "\\;") //The elements in the sequence needs to be reversed when converting them for i := len(*RDNElements) - 1; i >= 0; i-- { - var nameAndValueList = make([]string,len((*RDNElements)[i])) + var nameAndValueList = make([]string, len((*RDNElements)[i])) for j, attribute := range (*RDNElements)[i] { var shortAttributeName = rdnOIDToShortName(attribute.Type) if len(shortAttributeName) <= 0 {