Skip to content

Commit

Permalink
GODRIVER-2821 De-propagate unacknowledged sentinel error (mongodb#1661)
Browse files Browse the repository at this point in the history
  • Loading branch information
prestonvasquez authored Jul 22, 2024
1 parent c71ef48 commit ee88719
Show file tree
Hide file tree
Showing 11 changed files with 236 additions and 83 deletions.
133 changes: 116 additions & 17 deletions internal/integration/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package integration

import (
"context"
"errors"
"strings"
"testing"

Expand Down Expand Up @@ -1472,6 +1471,122 @@ func TestCollection(t *testing.T) {
assert.NotNil(mt, we.WriteConcernError, "expected write concern error, got %v", err)
})
})

unackClientOpts := options.Client().
SetWriteConcern(writeconcern.Unacknowledged())
unackMtOpts := mtest.NewOptions().
ClientOptions(unackClientOpts).
MinServerVersion("3.6")
mt.RunOpts("unacknowledged writes", unackMtOpts, func(mt *mtest.T) {
mt.Run("bulk write", func(mt *mtest.T) {
models := []mongo.WriteModel{
mongo.NewInsertOneModel().SetDocument(bson.D{{"x", 1}}),
}

res, err := mt.Coll.BulkWrite(context.Background(), models)

assert.NoError(mt, err)
assert.False(mt, res.Acknowledged)
})

mt.Run("insert one", func(mt *mtest.T) {
res, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})

assert.NoError(mt, err)
assert.False(mt, res.Acknowledged)
})

mt.Run("insert many", func(t *mtest.T) {
docs := []interface{}{
bson.D{{"x", 1}},
bson.D{{"y", 1}},
}

res, err := mt.Coll.InsertMany(context.Background(), docs)

assert.NoError(mt, err)
assert.False(mt, res.Acknowledged)
})

mt.Run("delete", func(mt *mtest.T) {
res, err := mt.Coll.DeleteOne(context.Background(), bson.D{{"x", 1}})

assert.NoError(mt, err)
assert.False(mt, res.Acknowledged)
})

mt.Run("update", func(t *mtest.T) {
res, err := mt.Coll.UpdateOne(context.Background(), bson.D{{"x", 1}}, bson.D{{"$set", bson.D{{"x", "2"}}}})

assert.NoError(mt, err)
assert.False(mt, res.Acknowledged)
})

mt.Run("find and modify", func(mt *mtest.T) {
res := mt.Coll.FindOneAndDelete(context.Background(), bson.D{{"x", 1}})

assert.ErrorIs(mt, res.Err(), mongo.ErrNoDocuments)
assert.False(mt, res.Acknowledged)

})

mt.Run("dropping a collection", func(mt *mtest.T) {
err := mt.Coll.Drop(context.Background())
assert.NoError(mt, err)
})

mt.Run("creating a collection", func(mt *mtest.T) {
err := mt.DB.CreateCollection(context.Background(), "test coll")
assert.NoError(mt, err)
})

mt.Run("creating an index", func(mt *mtest.T) {
indexModel := mongo.IndexModel{
Keys: bson.M{"username": 1},
Options: options.Index().SetUnique(true),
}

_, err := mt.Coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{indexModel})
assert.NoError(mt, err)
})

mt.Run("creating an index view", func(mt *mtest.T) {
projectStage := bson.D{
{"$project", bson.D{
{"_id", 0},
{"fullName", bson.D{
{"$concat", []string{"$firstName", " ", "$lastName"}},
}},
}},
}

pipeline := mongo.Pipeline{projectStage}

err := mt.DB.CreateView(context.Background(), "testview", "coll", pipeline, nil)
assert.NoError(mt, err)
})

mt.Run("dropping a database", func(mt *mtest.T) {
db := mt.Client.Database("bd7b09e4-7d12-4bcb-9fc6-9852ad93715a")

err := db.Drop(context.Background())
assert.NoError(mt, err)
})

mt.Run("dropping an index", func(t *mtest.T) {
indexModel := mongo.IndexModel{
Keys: bson.M{"username": 1},
Options: options.Index().SetUnique(true).SetName("username_1"),
}

_, err := mt.Coll.Indexes().CreateOne(context.TODO(), indexModel)
assert.NoError(mt, err, "failed to create index")

_, err = mt.Coll.Indexes().DropOne(context.Background(), "username_1")
assert.NoError(mt, err)
})
})

mt.RunOpts("bulk write", noClientOpts, func(mt *mtest.T) {
wcCollOpts := options.Collection().SetWriteConcern(impossibleWc)
wcTestOpts := mtest.NewOptions().CollectionOptions(wcCollOpts).Topologies(mtest.ReplicaSet).CreateClient(false)
Expand Down Expand Up @@ -1703,22 +1818,6 @@ func TestCollection(t *testing.T) {
assert.Equal(mt, res.UpsertedIDs[1].(string), id1, "expected UpsertedIDs[1] to be %v, got %v", id1, res.UpsertedIDs[1])
assert.Equal(mt, res.UpsertedIDs[3].(string), id3, "expected UpsertedIDs[3] to be %v, got %v", id3, res.UpsertedIDs[3])
})
unackClientOpts := options.Client().
SetWriteConcern(writeconcern.Unacknowledged())
unackMtOpts := mtest.NewOptions().
ClientOptions(unackClientOpts).
MinServerVersion("3.6")
mt.RunOpts("unacknowledged write", unackMtOpts, func(mt *mtest.T) {
models := []mongo.WriteModel{
mongo.NewInsertOneModel().SetDocument(bson.D{{"x", 1}}),
}
_, err := mt.Coll.BulkWrite(context.Background(), models)
if !errors.Is(err, mongo.ErrUnacknowledgedWrite) {
// Use a direct comparison rather than assert.Equal because assert.Equal will compare the error strings,
// so the assertion would succeed even if the error had not been wrapped.
mt.Fatalf("expected BulkWrite error %v, got %v", mongo.ErrUnacknowledgedWrite, err)
}
})
mt.RunOpts("insert and delete with batches", mtest.NewOptions().ClientType(mtest.Mock), func(mt *mtest.T) {
// grouped together because delete requires the documents to be inserted
maxBatchCount := int(mtest.MockDescription.MaxBatchCount)
Expand Down
14 changes: 0 additions & 14 deletions internal/integration/index_view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package integration

import (
"context"
"errors"
"testing"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -248,19 +247,6 @@ func TestIndexView(t *testing.T) {
})
}
})
unackClientOpts := options.Client().
SetWriteConcern(writeconcern.Unacknowledged())
unackMtOpts := mtest.NewOptions().
ClientOptions(unackClientOpts).
MinServerVersion("3.6")
mt.RunOpts("unacknowledged write", unackMtOpts, func(mt *mtest.T) {
_, err := mt.Coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{Keys: bson.D{{"x", 1}}})
if !errors.Is(err, mongo.ErrUnacknowledgedWrite) {
// Use a direct comparison rather than assert.Equal because assert.Equal will compare the error strings,
// so the assertion would succeed even if the error had not been wrapped.
mt.Fatalf("expected CreateOne error %v, got %v", mongo.ErrUnacknowledgedWrite, err)
}
})
// Needs to run on these versions for failpoints
mt.RunOpts("replace error", mtest.NewOptions().Topologies(mtest.ReplicaSet).MinServerVersion("4.0"), func(mt *mtest.T) {
mt.SetFailPoint(mtest.FailPoint{
Expand Down
16 changes: 6 additions & 10 deletions internal/integration/mtest/mongotest.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,17 +514,13 @@ func (t *T) ClearCollections() {
DropEncryptedCollection(t, coll.created, coll.CreateOpts.EncryptedFields)
}

err := coll.created.Drop(context.Background())
if errors.Is(err, mongo.ErrUnacknowledgedWrite) || errors.Is(err, driver.ErrUnacknowledgedWrite) {
// It's possible that a collection could have an unacknowledged write concern, which
// could prevent it from being dropped for sharded clusters. We can resolve this by
// re-instantiating the collection with a majority write concern before dropping.
collname := coll.created.Name()
wcm := writeconcern.Majority()
wccoll := t.DB.Collection(collname, options.Collection().SetWriteConcern(wcm))
_ = wccoll.Drop(context.Background())
// It's possible that a collection could have an unacknowledged write
// concern, which could prevent it from being dropped for sharded
// clusters. We can resolve this by re-instantiating the collection with
// a majority write concern before dropping.
clonedColl := coll.created.Clone(options.Collection().SetWriteConcern(writeconcern.Majority()))

}
_ = clonedColl.Drop(context.Background())
}
}
t.createdColls = t.createdColls[:0]
Expand Down
9 changes: 6 additions & 3 deletions internal/integration/sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,16 @@ func TestSessions(t *testing.T) {
assert.Nil(mt, err, "StartSession error: %v", err)
defer sess.EndSession(context.Background())

var res *mongo.InsertOneResult

err = mongo.WithSession(context.Background(), sess, func(sc context.Context) error {
_, err := mt.Coll.InsertOne(sc, bson.D{{"x", 1}})
res, err = mt.Coll.InsertOne(sc, bson.D{{"x", 1}})

return err
})

assert.Equal(mt, err, mongo.ErrUnacknowledgedWrite,
"expected ErrUnacknowledgedWrite on unacknowledged write in session, got %v", err)
assert.NoError(mt, err)
assert.False(mt, res.Acknowledged)
})

// Regression test for GODRIVER-2533. Note that this test assumes the race
Expand Down
10 changes: 7 additions & 3 deletions mongo/bulk_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,14 @@ func (bw *bulkWrite) execute(ctx context.Context) error {
}

bw.result.MatchedCount -= bw.result.UpsertedCount
if lastErr != nil {
_, lastErr = processWriteError(lastErr)
return lastErr

rr, err := processWriteError(lastErr)
if err != nil {
return err
}

bw.result.Acknowledged = rr.isAcknowledged()

if len(bwErr.WriteErrors) > 0 || bwErr.WriteConcernError != nil {
return bwErr
}
Expand Down
30 changes: 21 additions & 9 deletions mongo/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,10 +432,14 @@ func (coll *Collection) InsertOne(ctx context.Context, document interface{},
res, err := coll.insert(ctx, []interface{}{document}, imOpts)

rr, err := processWriteError(err)
if rr&rrOne == 0 {
if rr&rrOne == 0 && rr.isAcknowledged() {
return nil, err
}
return &InsertOneResult{InsertedID: res[0]}, err

return &InsertOneResult{
InsertedID: res[0],
Acknowledged: rr.isAcknowledged(),
}, err
}

// InsertMany executes an insert command to insert multiple documents into the collection. If write errors occur
Expand Down Expand Up @@ -471,7 +475,10 @@ func (coll *Collection) InsertMany(ctx context.Context, documents interface{},
return nil, err
}

imResult := &InsertManyResult{InsertedIDs: result}
imResult := &InsertManyResult{
InsertedIDs: result,
Acknowledged: rr.isAcknowledged(),
}
var writeException WriteException
if !errors.As(err, &writeException) {
return imResult, err
Expand Down Expand Up @@ -603,7 +610,10 @@ func (coll *Collection) delete(ctx context.Context, filter interface{}, deleteOn
if rr&expectedRr == 0 {
return nil, err
}
return &DeleteResult{DeletedCount: op.Result().N}, err
return &DeleteResult{
DeletedCount: op.Result().N,
Acknowledged: rr.isAcknowledged(),
}, err
}

// DeleteOne executes a delete command to delete at most one document from the collection.
Expand Down Expand Up @@ -754,6 +764,7 @@ func (coll *Collection) updateOrReplace(ctx context.Context, filter bsoncore.Doc
MatchedCount: opRes.N,
ModifiedCount: opRes.NModified,
UpsertedCount: int64(len(opRes.Upserted)),
Acknowledged: rr.isAcknowledged(),
}
if len(opRes.Upserted) > 0 {
res.UpsertedID = opRes.Upserted[0].ID
Expand Down Expand Up @@ -1724,16 +1735,17 @@ func (coll *Collection) findAndModify(ctx context.Context, op *operation.FindAnd
Retry(retry).
Crypt(coll.client.cryptFLE)

_, err = processWriteError(op.Execute(ctx))
rr, err := processWriteError(op.Execute(ctx))
if err != nil {
return &SingleResult{err: err}
}

return &SingleResult{
ctx: ctx,
rdr: bson.Raw(op.Result().Value),
bsonOpts: coll.bsonOpts,
reg: coll.registry,
ctx: ctx,
rdr: bson.Raw(op.Result().Value),
bsonOpts: coll.bsonOpts,
reg: coll.registry,
Acknowledged: rr.isAcknowledged(),
}
}

Expand Down
13 changes: 7 additions & 6 deletions mongo/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,14 @@ func (db *Database) RunCommand(ctx context.Context, runCommand interface{}, opts

err = op.Execute(ctx)
// RunCommand can be used to run a write, thus execute may return a write error
_, convErr := processWriteError(err)
rr, convErr := processWriteError(err)
return &SingleResult{
ctx: ctx,
err: convErr,
rdr: bson.Raw(op.Result()),
bsonOpts: db.bsonOpts,
reg: db.registry,
ctx: ctx,
err: convErr,
rdr: bson.Raw(op.Result()),
bsonOpts: db.bsonOpts,
reg: db.registry,
Acknowledged: rr.isAcknowledged(),
}
}

Expand Down
Loading

0 comments on commit ee88719

Please sign in to comment.