Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changeStream support #97

Merged
merged 44 commits into from
Feb 15, 2018
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
de872a6
Merge branch 'master' into development
domodwyer Aug 8, 2017
1519fd3
Merge branch 'master' into development
domodwyer Aug 15, 2017
454da02
add DropAllIndexes() method (#25)
feliixx Aug 30, 2017
93aaa6e
readme: credit @feliixx for #25 (#26)
domodwyer Aug 30, 2017
165af68
send metadata during handshake (#28)
feliixx Sep 6, 2017
a76b1a0
Update README to add appName (#32)
domodwyer Sep 6, 2017
25200e4
add method CreateView() (#33)
feliixx Sep 11, 2017
1f4c10f
readme: credit @feliixx in the README (#36)
domodwyer Sep 11, 2017
934a190
Don't panic on indexed int64 fields (#23)
domodwyer Sep 11, 2017
b37e3c1
Add collation option to collection.Create() (#37)
feliixx Sep 13, 2017
10876f5
support the $changeStream aggregation in 3.6+
Aug 2, 2017
b82ca4c
changestreams: fix import path
domodwyer Sep 15, 2017
aead58f
Test against MongoDB 3.4.x (#35)
feliixx Sep 15, 2017
5b7a419
MGO-142 implement changestream status functions for err, close and
Sep 14, 2017
950ed5a
Introduce constants for BSON element types (#41)
bozaro Sep 20, 2017
d21a525
bson.Unmarshal returns time in UTC (#42)
Sep 28, 2017
9d743b4
readme: add missing features / credit
domodwyer Sep 28, 2017
c86ed84
Merge pull request #45 from globalsign/feature/update-readme
Sep 28, 2017
97bd0cd
fix golint, go vet and gofmt warnings (#44)
feliixx Oct 6, 2017
fd79249
readme: credit @feliixx (#46)
domodwyer Oct 9, 2017
dba7b4c
Fix GetBSON() method usage (#40)
bozaro Oct 11, 2017
12fb1c2
readme: credit @bozaro (#47)
domodwyer Oct 11, 2017
199dc25
Improve cursorData struct unmarshaling speed (#49)
bozaro Oct 19, 2017
345ab0b
readme: credit @bozaro and @idy (#53)
domodwyer Oct 19, 2017
0454966
do not lock while writing to a socket (#52) (#54)
domodwyer Oct 19, 2017
663dfe5
Add proper DN construction (#55)
csucu Nov 2, 2017
7cd0b89
reduce memory allocation in bulk op (#56)
feliixx Nov 3, 2017
ea8e8e6
readme: credit @feliixx (#58)
domodwyer Nov 6, 2017
90c056c
MongoDB 3.6: implement the new wire protocol (#61)
feliixx Dec 12, 2017
1ac9b5d
Merge branch 'master' into development
domodwyer Dec 12, 2017
a104bfb
Recover from persistent "i/o timeout" or "Closed explicitly" pool err…
bachue Dec 27, 2017
f9be6c5
development: revert #61 (#73)
domodwyer Jan 9, 2018
138ba2f
readme: credit @bachue (#74)
domodwyer Jan 9, 2018
9acbd68
auth: add an example for x509 authentication (#75)
domodwyer Jan 9, 2018
eeedc17
session: add example concurrent usage (#78)
domodwyer Jan 15, 2018
88cedcd
Brings in a patch on having flusher not suppress errors. (#81)
jameinel Jan 25, 2018
90ad51b
Fallback to JSON tags when BSON tag isn't present (#91)
steve-gray Jan 31, 2018
9b03c58
Merge branch 'development' into feature/changestream
peterdeka Feb 6, 2018
4eb6ac9
Added maxAwaitTimeMS support to ChangeStream so Next() times out. Add…
peterdeka Feb 6, 2018
467c79f
Enabled journaling in test harness db config that made the ChangeStre…
peterdeka Feb 6, 2018
c8cbfa8
Better refactored harness daemons env file --nojournal removal. Added…
peterdeka Feb 6, 2018
5534387
Changed stream iterator timeout handling from driver to DB
peterdeka Feb 8, 2018
f341232
Merge branch 'development' into feature/changestream
peterdeka Feb 12, 2018
cebee9a
Renamed Pipe MaxTimeMS to SetMaxTime and changed parameter to a time.…
peterdeka Feb 14, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,23 @@ language: go

go_import_path: github.com/globalsign/mgo

go:
- 1.8.x
- 1.9.x

env:
global:
- BUCKET=https://s3.eu-west-2.amazonaws.com/globalsign-mgo
- FASTDL=https://fastdl.mongodb.org/linux
matrix:
- GO=1.7 MONGODB=x86_64-2.6.11
- GO=1.8.x MONGODB=x86_64-2.6.11
- GO=1.7 MONGODB=x86_64-3.0.9
- GO=1.8.x MONGODB=x86_64-3.0.9
- GO=1.7 MONGODB=x86_64-3.2.3-nojournal
- GO=1.8.x MONGODB=x86_64-3.2.3-nojournal
- GO=1.7 MONGODB=x86_64-3.2.12
- GO=1.8.x MONGODB=x86_64-3.2.12
- GO=1.7 MONGODB=x86_64-3.2.16
- GO=1.8.x MONGODB=x86_64-3.2.16
- GO=1.7 MONGODB=x86_64-3.4.8
- GO=1.8.x MONGODB=x86_64-3.4.8
- MONGODB=x86_64-ubuntu1404-3.0.15
- MONGODB=x86_64-ubuntu1404-3.2.17
- MONGODB=x86_64-ubuntu1404-3.4.10
- MONGODB=x86_64-ubuntu1404-3.6.0

install:
- eval "$(gimme $GO)"

- wget $BUCKET/mongodb-linux-$MONGODB.tgz
- wget $FASTDL/mongodb-linux-$MONGODB.tgz
- tar xzvf mongodb-linux-$MONGODB.tgz
- export PATH=$PWD/mongodb-linux-$MONGODB/bin:$PATH

Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili
* Fixes attempting to authenticate before every query ([details](https://github.com/go-mgo/mgo/issues/254))
* Removes bulk update / delete batch size limitations ([details](https://github.com/go-mgo/mgo/issues/288))
* Adds native support for `time.Duration` marshalling ([details](https://github.com/go-mgo/mgo/pull/373))
* Reduce memory footprint / garbage collection pressure by reusing buffers ([details](https://github.com/go-mgo/mgo/pull/229))
* Reduce memory footprint / garbage collection pressure by reusing buffers ([details](https://github.com/go-mgo/mgo/pull/229), [more](https://github.com/globalsign/mgo/pull/56))
* Support majority read concerns ([details](https://github.com/globalsign/mgo/pull/2))
* Improved connection handling ([details](https://github.com/globalsign/mgo/pull/5))
* Hides SASL warnings ([details](https://github.com/globalsign/mgo/pull/7))
Expand All @@ -32,10 +32,13 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili
* GetBSON correctly handles structs with both fields and pointers ([details](https://github.com/globalsign/mgo/pull/40))
* Improved bson.Raw unmarshalling performance ([details](https://github.com/globalsign/mgo/pull/49))
* Minimise socket connection timeouts due to excessive locking ([details](https://github.com/globalsign/mgo/pull/52))
* Natively support X509 client authentication ([details](https://github.com/globalsign/mgo/pull/55))
* Gracefully recover from a temporarily unreachable server ([details](https://github.com/globalsign/mgo/pull/69))

---

### Thanks to
* @bachue
* @bozaro
* @BenLubar
* @carter2000
Expand Down
70 changes: 69 additions & 1 deletion auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ package mgo_test

import (
"crypto/tls"
"crypto/x509"
"flag"
"fmt"
"io/ioutil"
Expand All @@ -38,7 +39,7 @@ import (
"sync"
"time"

mgo "github.com/globalsign/mgo"
"github.com/globalsign/mgo"
. "gopkg.in/check.v1"
)

Expand Down Expand Up @@ -963,6 +964,73 @@ func (s *S) TestAuthX509Cred(c *C) {
c.Assert(len(names) > 0, Equals, true)
}

func (s *S) TestAuthX509CredRDNConstruction(c *C) {
session, err := mgo.Dial("localhost:40001")
c.Assert(err, IsNil)
defer session.Close()
binfo, err := session.BuildInfo()
c.Assert(err, IsNil)
if binfo.OpenSSLVersion == "" {
c.Skip("server does not support SSL")
}

clientCertPEM, err := ioutil.ReadFile("harness/certs/client.pem")
c.Assert(err, IsNil)

clientCert, err := tls.X509KeyPair(clientCertPEM, clientCertPEM)
c.Assert(err, IsNil)

clientCert.Leaf, err = x509.ParseCertificate(clientCert.Certificate[0])
c.Assert(err, IsNil)

tlsConfig := &tls.Config{
InsecureSkipVerify: true,
Certificates: []tls.Certificate{clientCert},
}

var host = "localhost:40003"
c.Logf("Connecting to %s...", host)
session, err = mgo.DialWithInfo(&mgo.DialInfo{
Addrs: []string{host},
DialServer: func(addr *mgo.ServerAddr) (net.Conn, error) {
return tls.Dial("tcp", addr.String(), tlsConfig)
},
})
c.Assert(err, IsNil)
defer session.Close()

cred := &mgo.Credential{
Username: "root",
Mechanism: "MONGODB-X509",
Source: "$external",
Certificate: tlsConfig.Certificates[0].Leaf,
}
err = session.Login(cred)
c.Assert(err, NotNil)

err = session.Login(&mgo.Credential{Username: "root", Password: "rapadura"})
c.Assert(err, IsNil)

// This needs to be kept in sync with client.pem
x509Subject := "CN=localhost,OU=Client,O=MGO,L=MGO,ST=MGO,C=GO"

externalDB := session.DB("$external")
var x509User = mgo.User{
Username: x509Subject,
OtherDBRoles: map[string][]mgo.Role{"admin": {mgo.RoleRoot}},
}
err = externalDB.UpsertUser(&x509User)
c.Assert(err, IsNil)

session.LogoutAll()

cred.Username = ""
c.Logf("Authenticating...")
err = session.Login(cred)
c.Assert(err, IsNil)
c.Logf("Authenticated!")
}

var (
plainFlag = flag.String("plain", "", "Host to test PLAIN authentication against (depends on custom environment)")
plainUser = "einstein"
Expand Down
16 changes: 14 additions & 2 deletions bson/bson.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,9 +698,21 @@ func getStructInfo(st reflect.Type) (*structInfo, error) {
info := fieldInfo{Num: i}

tag := field.Tag.Get("bson")
if tag == "" && strings.Index(string(field.Tag), ":") < 0 {
tag = string(field.Tag)

// Fall-back to JSON struct tag, if feature flag is set.
if tag == "" && useJSONTagFallback {
tag = field.Tag.Get("json")
}

// If there's no bson/json tag available.
if tag == "" {
// If there's no tag, and also no tag: value splits (i.e. no colon)
// then assume the entire tag is the value
if strings.Index(string(field.Tag), ":") < 0 {
tag = string(field.Tag)
}
}

if tag == "-" {
continue
}
Expand Down
54 changes: 54 additions & 0 deletions bson/compatability_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package bson_test

import (
"github.com/globalsign/mgo/bson"
. "gopkg.in/check.v1"
)

type mixedTagging struct {
First string
Second string `bson:"second_field"`
Third string `json:"third_field"`
Fourth string `bson:"fourth_field" json:"alternate"`
}

// TestTaggingFallback checks that tagging fallback can be used/works as expected.
func (s *S) TestTaggingFallback(c *C) {
initial := &mixedTagging{
First: "One",
Second: "Two",
Third: "Three",
Fourth: "Four",
}

// Take only testing.T, leave only footprints.
initialState := bson.JSONTagFallbackState()
defer bson.SetJSONTagFallback(initialState)

// Marshal with the new mode applied.
bson.SetJSONTagFallback(true)
bsonState, errBSON := bson.Marshal(initial)
c.Assert(errBSON, IsNil)

// Unmarshal into a generic map so that we can pick up the actual field names
// selected.
target := make(map[string]string)
errUnmarshal := bson.Unmarshal(bsonState, target)
c.Assert(errUnmarshal, IsNil)

// No tag, so standard naming
_, firstExists := target["first"]
c.Assert(firstExists, Equals, true)

// Just a BSON tag
_, secondExists := target["second_field"]
c.Assert(secondExists, Equals, true)

// Just a JSON tag
_, thirdExists := target["third_field"]
c.Assert(thirdExists, Equals, true)

// Should marshal 4th as fourth_field (since we have both tags)
_, fourthExists := target["fourth_field"]
c.Assert(fourthExists, Equals, true)
}
16 changes: 16 additions & 0 deletions bson/compatibility.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package bson

// Current state of the JSON tag fallback option.
var useJSONTagFallback = false

// SetJSONTagFallback enables or disables the JSON-tag fallback for structure tagging. When this is enabled, structures
// without BSON tags on a field will fall-back to using the JSON tag (if present).
func SetJSONTagFallback(state bool) {
useJSONTagFallback = state
}

// JSONTagFallbackState returns the current status of the JSON tag fallback compatability option. See SetJSONTagFallback
// for more information.
func JSONTagFallbackState() bool {
return useJSONTagFallback
}
17 changes: 16 additions & 1 deletion bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mgo
import (
"bytes"
"sort"
"sync"

"github.com/globalsign/mgo/bson"
)
Expand Down Expand Up @@ -118,6 +119,15 @@ func (e *BulkError) Cases() []BulkErrorCase {
return e.ecases
}

var actionPool = sync.Pool{
New: func() interface{} {
return &bulkAction{
docs: make([]interface{}, 0),
idxs: make([]int, 0),
}
},
}

// Bulk returns a value to prepare the execution of a bulk operation.
func (c *Collection) Bulk() *Bulk {
return &Bulk{c: c, ordered: true}
Expand Down Expand Up @@ -145,7 +155,9 @@ func (b *Bulk) action(op bulkOp, opcount int) *bulkAction {
}
}
if action == nil {
b.actions = append(b.actions, bulkAction{op: op})
a := actionPool.Get().(*bulkAction)
a.op = op
b.actions = append(b.actions, *a)
action = &b.actions[len(b.actions)-1]
}
for i := 0; i < opcount; i++ {
Expand Down Expand Up @@ -288,6 +300,9 @@ func (b *Bulk) Run() (*BulkResult, error) {
default:
panic("unknown bulk operation")
}
action.idxs = action.idxs[0:0]
action.docs = action.docs[0:0]
actionPool.Put(action)
if !ok {
failed = true
if b.ordered {
Expand Down
Loading