From 999b4fd2776ecf0241e7a7109ae616a899066c2b Mon Sep 17 00:00:00 2001 From: Cezar Sa Espinola Date: Thu, 22 Feb 2018 09:16:39 -0300 Subject: [PATCH] vendor: update globalsign/mgo version This version includes important fixes in particular a fix for globalsign/mgo#103 --- Gopkg.lock | 2 +- vendor/github.com/globalsign/mgo/bson/bson.go | 16 +- .../globalsign/mgo/bson/compatibility.go | 16 + .../globalsign/mgo/changestreams.go | 357 ++++++++++++++++++ vendor/github.com/globalsign/mgo/cluster.go | 41 +- vendor/github.com/globalsign/mgo/session.go | 64 +++- vendor/github.com/globalsign/mgo/socket.go | 1 + 7 files changed, 475 insertions(+), 22 deletions(-) create mode 100644 vendor/github.com/globalsign/mgo/bson/compatibility.go create mode 100644 vendor/github.com/globalsign/mgo/changestreams.go diff --git a/Gopkg.lock b/Gopkg.lock index ea182acb0e..e01aa42155 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -343,7 +343,7 @@ "internal/sasl", "internal/scram" ] - revision = "896bbb89d21253a28cd5a7f8b81fe091410cb94d" + revision = "baa28fcb8e7d5dfab92026c0920cb6c9ae72faa2" [[projects]] name = "github.com/go-ini/ini" diff --git a/vendor/github.com/globalsign/mgo/bson/bson.go b/vendor/github.com/globalsign/mgo/bson/bson.go index d960f7a37d..31beab1912 100644 --- a/vendor/github.com/globalsign/mgo/bson/bson.go +++ b/vendor/github.com/globalsign/mgo/bson/bson.go @@ -698,9 +698,21 @@ func getStructInfo(st reflect.Type) (*structInfo, error) { info := fieldInfo{Num: i} tag := field.Tag.Get("bson") - if tag == "" && strings.Index(string(field.Tag), ":") < 0 { - tag = string(field.Tag) + + // Fall-back to JSON struct tag, if feature flag is set. + if tag == "" && useJSONTagFallback { + tag = field.Tag.Get("json") } + + // If there's no bson/json tag available. + if tag == "" { + // If there's no tag, and also no tag: value splits (i.e. no colon) + // then assume the entire tag is the value + if strings.Index(string(field.Tag), ":") < 0 { + tag = string(field.Tag) + } + } + if tag == "-" { continue } diff --git a/vendor/github.com/globalsign/mgo/bson/compatibility.go b/vendor/github.com/globalsign/mgo/bson/compatibility.go new file mode 100644 index 0000000000..6afecf53ce --- /dev/null +++ b/vendor/github.com/globalsign/mgo/bson/compatibility.go @@ -0,0 +1,16 @@ +package bson + +// Current state of the JSON tag fallback option. +var useJSONTagFallback = false + +// SetJSONTagFallback enables or disables the JSON-tag fallback for structure tagging. When this is enabled, structures +// without BSON tags on a field will fall-back to using the JSON tag (if present). +func SetJSONTagFallback(state bool) { + useJSONTagFallback = state +} + +// JSONTagFallbackState returns the current status of the JSON tag fallback compatability option. See SetJSONTagFallback +// for more information. +func JSONTagFallbackState() bool { + return useJSONTagFallback +} diff --git a/vendor/github.com/globalsign/mgo/changestreams.go b/vendor/github.com/globalsign/mgo/changestreams.go new file mode 100644 index 0000000000..5c2279c66e --- /dev/null +++ b/vendor/github.com/globalsign/mgo/changestreams.go @@ -0,0 +1,357 @@ +package mgo + +import ( + "errors" + "fmt" + "reflect" + "sync" + "time" + + "github.com/globalsign/mgo/bson" +) + +type FullDocument string + +const ( + Default = "default" + UpdateLookup = "updateLookup" +) + +type ChangeStream struct { + iter *Iter + isClosed bool + options ChangeStreamOptions + pipeline interface{} + resumeToken *bson.Raw + collection *Collection + readPreference *ReadPreference + err error + m sync.Mutex + sessionCopied bool +} + +type ChangeStreamOptions struct { + + // FullDocument controls the amount of data that the server will return when + // returning a changes document. + FullDocument FullDocument + + // ResumeAfter specifies the logical starting point for the new change stream. + ResumeAfter *bson.Raw + + // MaxAwaitTimeMS specifies the maximum amount of time for the server to wait + // on new documents to satisfy a change stream query. + MaxAwaitTimeMS time.Duration + + // BatchSize specifies the number of documents to return per batch. + BatchSize int + + // Collation specifies the way the server should collate returned data. + //TODO Collation *Collation +} + +var errMissingResumeToken = errors.New("resume token missing from result") + +// Watch constructs a new ChangeStream capable of receiving continuing data +// from the database. +func (coll *Collection) Watch(pipeline interface{}, + options ChangeStreamOptions) (*ChangeStream, error) { + + if pipeline == nil { + pipeline = []bson.M{} + } + + csPipe := constructChangeStreamPipeline(pipeline, options) + pipe := coll.Pipe(&csPipe) + if options.MaxAwaitTimeMS > 0 { + pipe.SetMaxTime(options.MaxAwaitTimeMS) + } + if options.BatchSize > 0 { + pipe.Batch(options.BatchSize) + } + pIter := pipe.Iter() + + // check that there was no issue creating the iterator. + // this will fail immediately with an error from the server if running against + // a standalone. + if err := pIter.Err(); err != nil { + return nil, err + } + + pIter.isChangeStream = true + return &ChangeStream{ + iter: pIter, + collection: coll, + resumeToken: nil, + options: options, + pipeline: pipeline, + }, nil +} + +// Next retrieves the next document from the change stream, blocking if necessary. +// Next returns true if a document was successfully unmarshalled into result, +// and false if an error occured. When Next returns false, the Err method should +// be called to check what error occurred during iteration. If there were no events +// available (ErrNotFound), the Err method returns nil so the user can retry the invocaton. +// +// For example: +// +// pipeline := []bson.M{} +// +// changeStream := collection.Watch(pipeline, ChangeStreamOptions{}) +// for changeStream.Next(&changeDoc) { +// fmt.Printf("Change: %v\n", changeDoc) +// } +// +// if err := changeStream.Close(); err != nil { +// return err +// } +// +// If the pipeline used removes the _id field from the result, Next will error +// because the _id field is needed to resume iteration when an error occurs. +// +func (changeStream *ChangeStream) Next(result interface{}) bool { + // the err field is being constantly overwritten and we don't want the user to + // attempt to read it at this point so we lock. + changeStream.m.Lock() + + defer changeStream.m.Unlock() + + // if we are in a state of error, then don't continue. + if changeStream.err != nil { + return false + } + + if changeStream.isClosed { + changeStream.err = fmt.Errorf("illegal use of a closed ChangeStream") + return false + } + + var err error + + // attempt to fetch the change stream result. + err = changeStream.fetchResultSet(result) + if err == nil { + return true + } + + // if we get no results we return false with no errors so the user can call Next + // again, resuming is not needed as the iterator is simply timed out as no events happened. + // The user will call Timeout in order to understand if this was the case. + if err == ErrNotFound { + return false + } + + // check if the error is resumable + if !isResumableError(err) { + // error is not resumable, give up and return it to the user. + changeStream.err = err + return false + } + + // try to resume. + err = changeStream.resume() + if err != nil { + // we've not been able to successfully resume and should only try once, + // so we give up. + changeStream.err = err + return false + } + + // we've successfully resumed the changestream. + // try to fetch the next result. + err = changeStream.fetchResultSet(result) + if err != nil { + changeStream.err = err + return false + } + + return true +} + +// Err returns nil if no errors happened during iteration, or the actual +// error otherwise. +func (changeStream *ChangeStream) Err() error { + changeStream.m.Lock() + defer changeStream.m.Unlock() + return changeStream.err +} + +// Close kills the server cursor used by the iterator, if any, and returns +// nil if no errors happened during iteration, or the actual error otherwise. +func (changeStream *ChangeStream) Close() error { + changeStream.m.Lock() + defer changeStream.m.Unlock() + changeStream.isClosed = true + err := changeStream.iter.Close() + if err != nil { + changeStream.err = err + } + if changeStream.sessionCopied { + changeStream.iter.session.Close() + changeStream.sessionCopied = false + } + return err +} + +// ResumeToken returns a copy of the current resume token held by the change stream. +// This token should be treated as an opaque token that can be provided to instantiate +// a new change stream. +func (changeStream *ChangeStream) ResumeToken() *bson.Raw { + changeStream.m.Lock() + defer changeStream.m.Unlock() + if changeStream.resumeToken == nil { + return nil + } + var tokenCopy = *changeStream.resumeToken + return &tokenCopy +} + +// Timeout returns true if the last call of Next returned false because of an iterator timeout. +func (changeStream *ChangeStream) Timeout() bool { + return changeStream.iter.Timeout() +} + +func constructChangeStreamPipeline(pipeline interface{}, + options ChangeStreamOptions) interface{} { + pipelinev := reflect.ValueOf(pipeline) + + // ensure that the pipeline passed in is a slice. + if pipelinev.Kind() != reflect.Slice { + panic("pipeline argument must be a slice") + } + + // construct the options to be used by the change notification + // pipeline stage. + changeStreamStageOptions := bson.M{} + + if options.FullDocument != "" { + changeStreamStageOptions["fullDocument"] = options.FullDocument + } + if options.ResumeAfter != nil { + changeStreamStageOptions["resumeAfter"] = options.ResumeAfter + } + + changeStreamStage := bson.M{"$changeStream": changeStreamStageOptions} + + pipeOfInterfaces := make([]interface{}, pipelinev.Len()+1) + + // insert the change notification pipeline stage at the beginning of the + // aggregation. + pipeOfInterfaces[0] = changeStreamStage + + // convert the passed in slice to a slice of interfaces. + for i := 0; i < pipelinev.Len(); i++ { + pipeOfInterfaces[1+i] = pipelinev.Index(i).Addr().Interface() + } + var pipelineAsInterface interface{} = pipeOfInterfaces + return pipelineAsInterface +} + +func (changeStream *ChangeStream) resume() error { + // copy the information for the new socket. + + // Thanks to Copy() future uses will acquire a new socket against the newly selected DB. + newSession := changeStream.iter.session.Copy() + + // fetch the cursor from the iterator and use it to run a killCursors + // on the connection. + cursorId := changeStream.iter.op.cursorId + err := runKillCursorsOnSession(newSession, cursorId) + if err != nil { + return err + } + + // change out the old connection to the database with the new connection. + if changeStream.sessionCopied { + changeStream.collection.Database.Session.Close() + } + changeStream.collection.Database.Session = newSession + changeStream.sessionCopied = true + + opts := changeStream.options + if changeStream.resumeToken != nil { + opts.ResumeAfter = changeStream.resumeToken + } + // make a new pipeline containing the resume token. + changeStreamPipeline := constructChangeStreamPipeline(changeStream.pipeline, opts) + + // generate the new iterator with the new connection. + newPipe := changeStream.collection.Pipe(changeStreamPipeline) + changeStream.iter = newPipe.Iter() + if err := changeStream.iter.Err(); err != nil { + return err + } + changeStream.iter.isChangeStream = true + return nil +} + +// fetchResumeToken unmarshals the _id field from the document, setting an error +// on the changeStream if it is unable to. +func (changeStream *ChangeStream) fetchResumeToken(rawResult *bson.Raw) error { + changeStreamResult := struct { + ResumeToken *bson.Raw `bson:"_id,omitempty"` + }{} + + err := rawResult.Unmarshal(&changeStreamResult) + if err != nil { + return err + } + + if changeStreamResult.ResumeToken == nil { + return errMissingResumeToken + } + + changeStream.resumeToken = changeStreamResult.ResumeToken + return nil +} + +func (changeStream *ChangeStream) fetchResultSet(result interface{}) error { + rawResult := bson.Raw{} + + // fetch the next set of documents from the cursor. + gotNext := changeStream.iter.Next(&rawResult) + err := changeStream.iter.Err() + if err != nil { + return err + } + + if !gotNext && err == nil { + // If the iter.Err() method returns nil despite us not getting a next batch, + // it is becuase iter.Err() silences this case. + return ErrNotFound + } + + // grab the resumeToken from the results + if err := changeStream.fetchResumeToken(&rawResult); err != nil { + return err + } + + // put the raw results into the data structure the user provided. + if err := rawResult.Unmarshal(result); err != nil { + return err + } + return nil +} + +func isResumableError(err error) bool { + _, isQueryError := err.(*QueryError) + // if it is not a database error OR it is a database error, + // but the error is a notMaster error + //and is not a missingResumeToken error (caused by the user provided pipeline) + return (!isQueryError || isNotMasterError(err)) && (err != errMissingResumeToken) +} + +func runKillCursorsOnSession(session *Session, cursorId int64) error { + socket, err := session.acquireSocket(true) + if err != nil { + return err + } + err = socket.Query(&killCursorsOp{[]int64{cursorId}}) + if err != nil { + return err + } + socket.Release() + + return nil +} diff --git a/vendor/github.com/globalsign/mgo/cluster.go b/vendor/github.com/globalsign/mgo/cluster.go index 7fc639c24b..ac461d5b94 100644 --- a/vendor/github.com/globalsign/mgo/cluster.go +++ b/vendor/github.com/globalsign/mgo/cluster.go @@ -148,16 +148,38 @@ func (cluster *mongoCluster) isMaster(socket *mongoSocket, result *isMasterResul session := newSession(Monotonic, cluster, 10*time.Second) session.setSocket(socket) - // provide some meta infos on the client, - // see https://github.com/mongodb/specifications/blob/master/source/mongodb-handshake/handshake.rst#connection-handshake - // for details - metaInfo := bson.M{"driver": bson.M{"name": "mgo", "version": "globalsign"}, - "os": bson.M{"type": runtime.GOOS, "architecture": runtime.GOARCH}} + var cmd = bson.D{{Name: "isMaster", Value: 1}} + + // Send client metadata to the server to identify this socket if this is + // the first isMaster call only. + // + // isMaster commands issued after the initial connection handshake MUST NOT contain handshake arguments + // https://github.com/mongodb/specifications/blob/master/source/mongodb-handshake/handshake.rst#connection-handshake + // + socket.sendMeta.Do(func() { + var meta = bson.M{ + "driver": bson.M{ + "name": "mgo", + "version": "globalsign", + }, + "os": bson.M{ + "type": runtime.GOOS, + "architecture": runtime.GOARCH, + }, + } - if cluster.appName != "" { - metaInfo["application"] = bson.M{"name": cluster.appName} - } - err := session.Run(bson.D{{Name: "isMaster", Value: 1}, {Name: "client", Value: metaInfo}}, result) + // Include the application name if set + if cluster.appName != "" { + meta["application"] = bson.M{"name": cluster.appName} + } + + cmd = append(cmd, bson.DocElem{ + Name: "client", + Value: meta, + }) + }) + + err := session.Run(cmd, result) session.Close() return err } @@ -660,6 +682,7 @@ func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout time.Sleep(100 * time.Millisecond) continue } else { + // We've managed to successfully reconnect to the master, we are no longer abnormaly ended server.Lock() server.abended = false server.Unlock() diff --git a/vendor/github.com/globalsign/mgo/session.go b/vendor/github.com/globalsign/mgo/session.go index b62707c844..561f79487a 100644 --- a/vendor/github.com/globalsign/mgo/session.go +++ b/vendor/github.com/globalsign/mgo/session.go @@ -169,7 +169,9 @@ type Iter struct { timeout time.Duration limit int32 timedout bool - findCmd bool + isFindCmd bool + isChangeStream bool + maxTimeMS int64 } var ( @@ -1117,6 +1119,11 @@ func isAuthError(err error) bool { return ok && e.Code == 13 } +func isNotMasterError(err error) bool { + e, ok := err.(*QueryError) + return ok && strings.Contains(e.Message, "not master") +} + func (db *Database) runUserCmd(cmdName string, user *User) error { cmd := make(bson.D, 0, 16) cmd = append(cmd, bson.DocElem{Name: cmdName, Value: user.Username}) @@ -2423,6 +2430,7 @@ type Pipe struct { pipeline interface{} allowDisk bool batchSize int + maxTimeMS int64 } type pipeCmd struct { @@ -2431,6 +2439,7 @@ type pipeCmd struct { Cursor *pipeCmdCursor `bson:",omitempty"` Explain bool `bson:",omitempty"` AllowDisk bool `bson:"allowDiskUse,omitempty"` + MaxTimeMS int64 `bson:"maxTimeMS,omitempty"` } type pipeCmdCursor struct { @@ -2485,6 +2494,9 @@ func (p *Pipe) Iter() *Iter { AllowDisk: p.allowDisk, Cursor: &pipeCmdCursor{p.batchSize}, } + if p.maxTimeMS > 0 { + cmd.MaxTimeMS = p.maxTimeMS + } err := c.Database.Run(cmd, &result) if e, ok := err.(*QueryError); ok && e.Message == `unrecognized field "cursor` { cmd.Cursor = nil @@ -2495,7 +2507,11 @@ func (p *Pipe) Iter() *Iter { if firstBatch == nil { firstBatch = result.Cursor.FirstBatch } - return c.NewIter(p.session, firstBatch, result.Cursor.Id, err) + it := c.NewIter(p.session, firstBatch, result.Cursor.Id, err) + if p.maxTimeMS > 0 { + it.maxTimeMS = p.maxTimeMS + } + return it } // NewIter returns a newly created iterator with the provided parameters. Using @@ -2557,7 +2573,7 @@ func (c *Collection) NewIter(session *Session, firstBatch []bson.Raw, cursorId i } if socket.ServerInfo().MaxWireVersion >= 4 && c.FullName != "admin.$cmd" { - iter.findCmd = true + iter.isFindCmd = true } iter.gotReply.L = &iter.m @@ -2659,6 +2675,13 @@ func (p *Pipe) Batch(n int) *Pipe { return p } +// SetMaxTime sets the maximum amount of time to allow the query to run. +// +func (p *Pipe) SetMaxTime(d time.Duration) *Pipe { + p.maxTimeMS = int64(d / time.Millisecond) + return p +} + // LastError the error status of the preceding write operation on the current connection. // // Relevant documentation: @@ -3801,7 +3824,7 @@ func (q *Query) Iter() *Iter { op.replyFunc = iter.op.replyFunc if prepareFindOp(socket, &op, limit) { - iter.findCmd = true + iter.isFindCmd = true } iter.server = socket.Server() @@ -4015,7 +4038,8 @@ func (iter *Iter) Timeout() bool { // Next returns true if a document was successfully unmarshalled onto result, // and false at the end of the result set or if an error happened. // When Next returns false, the Err method should be called to verify if -// there was an error during iteration. +// there was an error during iteration, and the Timeout method to verify if the +// false return value was caused by a timeout (no available results). // // For example: // @@ -4031,7 +4055,16 @@ func (iter *Iter) Next(result interface{}) bool { iter.m.Lock() iter.timedout = false timeout := time.Time{} + // for a ChangeStream iterator we have to call getMore before the loop otherwise + // we'll always return false + if iter.isChangeStream { + iter.getMore() + } + // check should we expect more data. for iter.err == nil && iter.docData.Len() == 0 && (iter.docsToReceive > 0 || iter.op.cursorId != 0) { + // we should expect more data. + + // If we have yet to receive data, increment the timer until we timeout. if iter.docsToReceive == 0 { if iter.timeout >= 0 { if timeout.IsZero() { @@ -4043,6 +4076,13 @@ func (iter *Iter) Next(result interface{}) bool { return false } } + // for a ChangeStream one loop i enought to declare the timeout + if iter.isChangeStream { + iter.timedout = true + iter.m.Unlock() + return false + } + // run a getmore to fetch more data. iter.getMore() if iter.err != nil { break @@ -4050,7 +4090,7 @@ func (iter *Iter) Next(result interface{}) bool { } iter.gotReply.Wait() } - + // We have data from the getMore. // Exhaust available data before reporting any errors. if docData, ok := iter.docData.Pop().([]byte); ok { close := false @@ -4066,6 +4106,7 @@ func (iter *Iter) Next(result interface{}) bool { } } if iter.op.cursorId != 0 && iter.err == nil { + // we still have a live cursor and currently expect data. iter.docsBeforeMore-- if iter.docsBeforeMore == -1 { iter.getMore() @@ -4255,7 +4296,7 @@ func (iter *Iter) getMore() { } } var op interface{} - if iter.findCmd { + if iter.isFindCmd || iter.isChangeStream { op = iter.getMoreCmd() } else { op = &iter.op @@ -4278,6 +4319,9 @@ func (iter *Iter) getMoreCmd() *queryOp { Collection: iter.op.collection[nameDot+1:], BatchSize: iter.op.limit, } + if iter.maxTimeMS > 0 { + getMore.MaxTimeMS = iter.maxTimeMS + } var op queryOp op.collection = iter.op.collection[:nameDot] + ".$cmd" @@ -4882,7 +4926,7 @@ func (iter *Iter) replyFunc() replyFunc { } else { iter.err = ErrNotFound } - } else if iter.findCmd { + } else if iter.isFindCmd { debugf("Iter %p received reply document %d/%d (cursor=%d)", iter, docNum+1, int(op.replyDocs), op.cursorId) var findReply struct { Ok bool @@ -4894,7 +4938,7 @@ func (iter *Iter) replyFunc() replyFunc { iter.err = err } else if !findReply.Ok && findReply.Errmsg != "" { iter.err = &QueryError{Code: findReply.Code, Message: findReply.Errmsg} - } else if len(findReply.Cursor.FirstBatch) == 0 && len(findReply.Cursor.NextBatch) == 0 { + } else if !iter.isChangeStream && len(findReply.Cursor.FirstBatch) == 0 && len(findReply.Cursor.NextBatch) == 0 { iter.err = ErrNotFound } else { batch := findReply.Cursor.FirstBatch @@ -5262,7 +5306,7 @@ func getRFC2253NameString(RDNElements *pkix.RDNSequence) string { var replacer = strings.NewReplacer(",", "\\,", "=", "\\=", "+", "\\+", "<", "\\<", ">", "\\>", ";", "\\;") //The elements in the sequence needs to be reversed when converting them for i := len(*RDNElements) - 1; i >= 0; i-- { - var nameAndValueList = make([]string,len((*RDNElements)[i])) + var nameAndValueList = make([]string, len((*RDNElements)[i])) for j, attribute := range (*RDNElements)[i] { var shortAttributeName = rdnOIDToShortName(attribute.Type) if len(shortAttributeName) <= 0 { diff --git a/vendor/github.com/globalsign/mgo/socket.go b/vendor/github.com/globalsign/mgo/socket.go index f6158189c7..a9124b0436 100644 --- a/vendor/github.com/globalsign/mgo/socket.go +++ b/vendor/github.com/globalsign/mgo/socket.go @@ -54,6 +54,7 @@ type mongoSocket struct { dead error serverInfo *mongoServerInfo closeAfterIdle bool + sendMeta sync.Once } type queryOpFlags uint32