-
-
Notifications
You must be signed in to change notification settings - Fork 148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix mongo db client to use GridFS API on 16MB exceed file #1077
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,7 @@ | |
"go.mongodb.org/mongo-driver/bson" | ||
"go.mongodb.org/mongo-driver/bson/primitive" | ||
"go.mongodb.org/mongo-driver/mongo" | ||
"go.mongodb.org/mongo-driver/mongo/gridfs" | ||
"go.mongodb.org/mongo-driver/mongo/options" | ||
"go.mongodb.org/mongo-driver/mongo/readpref" | ||
|
||
|
@@ -43,7 +44,8 @@ | |
|
||
const ( | ||
// StatusKey is the key of the status field. | ||
StatusKey = "status" | ||
StatusKey = "status" | ||
BSONMaxSnapshotSize = 16 * 1024 * 1024 // 16MB | ||
) | ||
|
||
// Client is a client that connects to Mongo DB and reads or saves Yorkie data. | ||
|
@@ -1073,16 +1075,52 @@ | |
return err | ||
} | ||
|
||
if _, err := c.collection(ColSnapshots).InsertOne(ctx, bson.M{ | ||
"project_id": docRefKey.ProjectID, | ||
"doc_id": docRefKey.DocID, | ||
"server_seq": doc.Checkpoint().ServerSeq, | ||
"lamport": doc.Lamport(), | ||
"version_vector": doc.VersionVector(), | ||
"snapshot": snapshot, | ||
"created_at": gotime.Now(), | ||
}); err != nil { | ||
return fmt.Errorf("insert snapshot: %w", err) | ||
if len(snapshot) > BSONMaxSnapshotSize { | ||
db := c.client.Database(c.config.YorkieDatabase) | ||
|
||
// create GridFS bucket | ||
bucket, err := gridfs.NewBucket(db) | ||
if err != nil { | ||
return fmt.Errorf("failed to create GridFS bucket: %w", err) | ||
} | ||
|
||
uploadStream, err := bucket.OpenUploadStream(fmt.Sprintf("%s_snapshot", docRefKey.DocID)) | ||
if err != nil { | ||
return fmt.Errorf("failed to open GridFS upload stream: %w", err) | ||
} | ||
defer uploadStream.Close() | ||
|
||
_, err = uploadStream.Write(snapshot) | ||
if err != nil { | ||
return fmt.Errorf("failed to write to GridFS: %w", err) | ||
} | ||
|
||
fileID := uploadStream.FileID | ||
|
||
if _, err := c.collection(ColSnapshots).InsertOne(ctx, bson.M{ | ||
"project_id": docRefKey.ProjectID, | ||
"doc_id": docRefKey.DocID, | ||
"server_seq": doc.Checkpoint().ServerSeq, | ||
"lamport": doc.Lamport(), | ||
"version_vector": doc.VersionVector(), | ||
"snapshot_file_id": fileID, // GridFS file ID | ||
"created_at": gotime.Now(), | ||
}); err != nil { | ||
return fmt.Errorf("insert snapshot info: %w", err) | ||
} | ||
|
||
Comment on lines
+1078
to
+1111
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handle error from uploadStream.Close() While the GridFS implementation is solid, the deferred Apply this fix: -defer uploadStream.Close()
+defer func() {
+ if err := uploadStream.Close(); err != nil {
+ log.Printf("Failed to close GridFS upload stream: %v", err)
+ }
+}()
🧰 Tools🪛 golangci-lint (1.61.0)1091-1091: Error return value of (errcheck) |
||
} else { | ||
if _, err := c.collection(ColSnapshots).InsertOne(ctx, bson.M{ | ||
"project_id": docRefKey.ProjectID, | ||
"doc_id": docRefKey.DocID, | ||
"server_seq": doc.Checkpoint().ServerSeq, | ||
"lamport": doc.Lamport(), | ||
"version_vector": doc.VersionVector(), | ||
"snapshot": snapshot, | ||
"created_at": gotime.Now(), | ||
}); err != nil { | ||
return fmt.Errorf("insert snapshot: %w", err) | ||
} | ||
} | ||
|
||
return nil | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1619,3 +1619,33 @@ | |
} | ||
assert.EqualValues(t, expectedKeys, keys) | ||
} | ||
|
||
func RunCreateLargeSnapshotTest(t *testing.T, db database.Database, projectID types.ID) { | ||
t.Run("store and validate large snapshot test", func(t *testing.T) { | ||
ctx := context.Background() | ||
docKey := key.Key(fmt.Sprintf("tests$%s", t.Name())) | ||
|
||
clientInfo, _ := db.ActivateClient(ctx, projectID, t.Name()) | ||
bytesID, _ := clientInfo.ID.Bytes() | ||
actorID, _ := time.ActorIDFromBytes(bytesID) | ||
docInfo, _ := db.FindDocInfoByKeyAndOwner(ctx, clientInfo.RefKey(), docKey, true) | ||
|
||
doc := document.New(docKey) | ||
doc.SetActor(actorID) | ||
|
||
largeData := make([]byte, 16*1024*1024+1) | ||
for i := range largeData { | ||
largeData[i] = byte('A' + (i % 26)) | ||
} | ||
|
||
assert.NoError(t, doc.Update(func(root *json.Object, p *presence.Presence) error { | ||
root.SetBytes("largeField", largeData) | ||
return nil | ||
})) | ||
|
||
docRefKey := docInfo.RefKey() | ||
|
||
err := db.CreateSnapshotInfo(ctx, docRefKey, doc.InternalDocument()) | ||
assert.NoError(t, err) | ||
}) | ||
Comment on lines
+1624
to
+1650
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Enhance test coverage and validation. The test only verifies that the snapshot was created without error. Consider:
Add validation and error cases: func RunCreateLargeSnapshotTest(t *testing.T, db database.Database, projectID types.ID) {
- t.Run("store and validate large snapshot test", func(t *testing.T) {
+ t.Run("store large snapshot test", func(t *testing.T) {
// ... existing setup code ...
err := db.CreateSnapshotInfo(ctx, docRefKey, doc.InternalDocument())
assert.NoError(t, err)
+
+ // Validate stored snapshot
+ snapshot, err := db.FindClosestSnapshotInfo(ctx, docRefKey, 0, false)
+ assert.NoError(t, err)
+
+ var storedDoc document.InternalDocument
+ err = snapshot.LoadSnapshot(&storedDoc)
+ assert.NoError(t, err)
+ assert.Equal(t, largeData, storedDoc.Root().GetBytes("largeField"))
})
+
+ t.Run("store extremely large snapshot test", func(t *testing.T) {
+ // Test with even larger document (e.g., 32MB)
+ // Verify appropriate error handling
+ })
+
+ t.Cleanup(func() {
+ // Add cleanup code
+ })
}
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle error returned by
uploadStream.Close()
The deferred call to
uploadStream.Close()
does not check for errors. SinceClose()
can return an error, it's important to handle it to ensure resources are properly released and any potential errors are captured.Apply this diff to handle the error:
📝 Committable suggestion
🧰 Tools
🪛 GitHub Check: build
[failure] 1097-1097:
Error return value of
uploadStream.Close
is not checked (errcheck)