-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(store/v2): parallel writes in storage sqlite backends #18320
Conversation
WalkthroughWalkthroughThe codebase has been updated to switch from the "modernc.org/sqlite" package to the "github.com/mattn/go-sqlite3" package for SQLite database operations. This change is reflected in the import statements and driver names. Additionally, two new test functions have been introduced to verify the concurrent write and pruning capabilities of the database. Changes
TipsChat with CodeRabbit Bot (
|
@cool-develope your pull request is missing a changelog! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 2
Configuration used: CodeRabbit UI
Files ignored due to filter (2)
- store/go.mod
- store/go.sum
Files selected for processing (3)
- store/storage/sqlite/db.go (1 hunks)
- store/storage/sqlite/db_test.go (2 hunks)
- store/storage/sqlite/iterator.go (1 hunks)
Files skipped from review due to trivial changes (1)
- store/storage/sqlite/iterator.go
Additional comments: 2
store/storage/sqlite/db_test.go (1)
- 2-10: The import statements are well organized and only necessary packages are imported. Good use of the
sync
package for managing goroutines.store/storage/sqlite/db.go (1)
- 8-21: The switch from "modernc.org/sqlite" to "github.com/mattn/go-sqlite3" and the change in the connection string to enable shared cache mode and write-ahead logging (WAL) are good for improving concurrency and performance. However, ensure that the new driver is compatible with the rest of your codebase and that the WAL mode is suitable for your use case. WAL mode can increase disk I/O and may not be suitable if your application has many read transactions compared to write transactions.
New(dataDir string):
Ensure that thedataDir
path exists and is writable before trying to open the database file. This can prevent runtime errors.Close():
Ensure that all database connections are closed before settingdb.storage
tonil
. This can prevent potential data races.GetLatestVersion(), SetLatestVersion(version uint64), Has(storeKey string, version uint64, key []byte), Get(storeKey string, targetVersion uint64, key []byte), ApplyChangeset(version uint64, cs *store.Changeset), Prune(version uint64), Iterator(storeKey string, version uint64, start, end []byte), ReverseIterator(storeKey string, version uint64, start, end []byte):
These methods are well-written and handle errors properly. However, consider adding context to the error messages to make them more informative.PrintRowsDebug():
This method is useful for debugging, but it should not be included in production code. Consider using a logging library that can be configured to print debug messages based on the environment or log level.
store/storage/sqlite/db_test.go
Outdated
func TestParallelWrites(t *testing.T) { | ||
db, err := New(t.TempDir()) | ||
require.NoError(t, err) | ||
defer db.Close() | ||
|
||
wg := sync.WaitGroup{} | ||
|
||
// start 10 goroutines that write to the database | ||
for i := 0; i < 10; i++ { | ||
wg.Add(1) | ||
go func(i int) { | ||
t.Log("start time", i, time.Now()) | ||
defer wg.Done() | ||
cs := new(store.Changeset) | ||
for j := 0; j < 100; j++ { | ||
key := fmt.Sprintf("key-%d-%03d", i, j) | ||
val := fmt.Sprintf("val-%d-%03d", i, j) | ||
|
||
cs.AddKVPair(store.KVPair{StoreKey: storeKey1, Key: []byte(key), Value: []byte(val)}) | ||
} | ||
|
||
require.NoError(t, db.ApplyChangeset(uint64(i+1), cs)) | ||
t.Log("end time", i, time.Now()) | ||
}(i) | ||
|
||
} | ||
|
||
wg.Wait() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TestParallelWrites
function is well implemented. It correctly uses a WaitGroup
to ensure all goroutines finish before the test function returns. However, it's important to note that the test does not verify the correctness of the concurrent writes. It would be beneficial to add a verification step after the wg.Wait()
call to ensure that all expected key-value pairs are present in the database.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cool-develope indeed, for a super rigorous test, you can just run over the keys/values written and ensure we've got 0-N
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like humans are talking. I'll be quiet for now. If you need me again, just tag @coderabbitai in a new comment.
store/storage/sqlite/db_test.go
Outdated
func TestParallelWriteAndPruning(t *testing.T) { | ||
db, err := New(t.TempDir()) | ||
require.NoError(t, err) | ||
defer db.Close() | ||
|
||
latestVersion := 100 | ||
|
||
wg := sync.WaitGroup{} | ||
wg.Add(2) | ||
// start a goroutine that write to the database | ||
go func() { | ||
defer wg.Done() | ||
for i := 0; i < latestVersion; i++ { | ||
cs := new(store.Changeset) | ||
for j := 0; j < 100; j++ { | ||
key := fmt.Sprintf("key-%d-%03d", i, j) | ||
val := fmt.Sprintf("val-%d-%03d", i, j) | ||
|
||
cs.AddKVPair(store.KVPair{StoreKey: storeKey1, Key: []byte(key), Value: []byte(val)}) | ||
} | ||
|
||
require.NoError(t, db.ApplyChangeset(uint64(i+1), cs)) | ||
} | ||
}() | ||
// start a goroutine that prunes the database | ||
go func() { | ||
defer wg.Done() | ||
for i := 10; i < latestVersion; i += 5 { | ||
for { | ||
v, err := db.GetLatestVersion() | ||
require.NoError(t, err) | ||
if v > uint64(i) { | ||
t.Log("pruning version", v-1) | ||
require.NoError(t, db.Prune(v-1)) | ||
break | ||
} | ||
} | ||
} | ||
}() | ||
|
||
wg.Wait() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TestParallelWriteAndPruning
function is also well implemented. It correctly uses a WaitGroup
to ensure all goroutines finish before the test function returns. However, similar to TestParallelWrites
, this test does not verify the correctness of the concurrent writes and pruning. It would be beneficial to add a verification step after the wg.Wait()
call to ensure that all expected key-value pairs are present in the database and that the pruning was successful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm OK with using the CGO variant if it's significantly faster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this change @cool-develope! I've added some feedback to ensure that you can coordinate concurrency. Also if possible, please ensure that these tests run with -race.
// start 10 goroutines that write to the database | ||
for i := 0; i < 10; i++ { | ||
wg.Add(1) | ||
go func(i int) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this code is super fast, it could all run sequentially and we only have 10 goroutines. What we can however to ensure that all the goroutines write at the exact same time is to create a waiter semaphore channel that makes all the goroutines wait until we are ready for them to trigger the concurrent writes.
triggerStartCh := make(chan bool)
for i := 0; i < n; i++ {
wg.Add(1)
go func(i int) {
<-triggerStartCh // Wait until the semaphore to start the work.
defer wg.Done()
...
}()
}
// We can now trigger the start as all the goroutines are waiting to start
close(triggerStartCh)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tx, good to know!
store/storage/sqlite/db_test.go
Outdated
func TestParallelWrites(t *testing.T) { | ||
db, err := New(t.TempDir()) | ||
require.NoError(t, err) | ||
defer db.Close() | ||
|
||
wg := sync.WaitGroup{} | ||
|
||
// start 10 goroutines that write to the database | ||
for i := 0; i < 10; i++ { | ||
wg.Add(1) | ||
go func(i int) { | ||
t.Log("start time", i, time.Now()) | ||
defer wg.Done() | ||
cs := new(store.Changeset) | ||
for j := 0; j < 100; j++ { | ||
key := fmt.Sprintf("key-%d-%03d", i, j) | ||
val := fmt.Sprintf("val-%d-%03d", i, j) | ||
|
||
cs.AddKVPair(store.KVPair{StoreKey: storeKey1, Key: []byte(key), Value: []byte(val)}) | ||
} | ||
|
||
require.NoError(t, db.ApplyChangeset(uint64(i+1), cs)) | ||
t.Log("end time", i, time.Now()) | ||
}(i) | ||
|
||
} | ||
|
||
wg.Wait() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cool-develope indeed, for a super rigorous test, you can just run over the keys/values written and ensure we've got 0-N
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 1
Configuration used: CodeRabbit UI
Files ignored due to filter (2)
- store/go.mod
- store/go.sum
Files selected for processing (1)
- store/storage/sqlite/db_test.go (2 hunks)
Additional comments: 1
store/storage/sqlite/db_test.go (1)
- 93-141: The
TestParallelWrites
function looks good. It correctly uses async.WaitGroup
and a channel to synchronize the start of goroutines. It also correctly checks the written data after all goroutines have finished.
func TestParallelWriteAndPruning(t *testing.T) { | ||
db, err := New(t.TempDir()) | ||
require.NoError(t, err) | ||
defer db.Close() | ||
|
||
latestVersion := 100 | ||
kvCount := 100 | ||
prunePeriod := 5 | ||
|
||
wg := sync.WaitGroup{} | ||
triggerStartCh := make(chan bool) | ||
|
||
// start a goroutine that write to the database | ||
wg.Add(1) | ||
go func() { | ||
<-triggerStartCh | ||
defer wg.Done() | ||
for i := 0; i < latestVersion; i++ { | ||
cs := new(store.Changeset) | ||
for j := 0; j < kvCount; j++ { | ||
key := fmt.Sprintf("key-%d-%03d", i, j) | ||
val := fmt.Sprintf("val-%d-%03d", i, j) | ||
|
||
cs.AddKVPair(store.KVPair{StoreKey: storeKey1, Key: []byte(key), Value: []byte(val)}) | ||
} | ||
|
||
require.NoError(t, db.ApplyChangeset(uint64(i+1), cs)) | ||
} | ||
}() | ||
// start a goroutine that prunes the database | ||
wg.Add(1) | ||
go func() { | ||
<-triggerStartCh | ||
defer wg.Done() | ||
for i := 10; i < latestVersion; i += prunePeriod { | ||
for { | ||
v, err := db.GetLatestVersion() | ||
require.NoError(t, err) | ||
if v > uint64(i) { | ||
t.Log("pruning version", v-1) | ||
require.NoError(t, db.Prune(v-1)) | ||
break | ||
} | ||
} | ||
} | ||
}() | ||
|
||
// start the goroutines | ||
close(triggerStartCh) | ||
wg.Wait() | ||
|
||
// check if the data is pruned | ||
version := uint64(latestVersion - prunePeriod) | ||
val, err := db.Get(storeKey1, version, []byte(fmt.Sprintf("key-%d-%03d", version-1, 0))) | ||
require.NoError(t, err) | ||
require.Nil(t, val) | ||
|
||
version = uint64(latestVersion) | ||
val, err = db.Get(storeKey1, version, []byte(fmt.Sprintf("key-%d-%03d", version-1, 0))) | ||
require.NoError(t, err) | ||
require.Equal(t, []byte(fmt.Sprintf("val-%d-%03d", version-1, 0)), val) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TestParallelWriteAndPruning
function also correctly uses a sync.WaitGroup
and a channel to synchronize the start of goroutines. It tests both writing and pruning operations concurrently. However, there is a potential data race condition in the pruning goroutine. The db.GetLatestVersion()
function is called in a loop without any delay, which could lead to a high CPU usage if the latest version is not updated quickly enough by the writing goroutine. Consider adding a small delay or a backoff mechanism in the loop to avoid this issue.
+ import "time"
...
for i := 10; i < latestVersion; i += prunePeriod {
for {
v, err := db.GetLatestVersion()
require.NoError(t, err)
if v > uint64(i) {
t.Log("pruning version", v-1)
require.NoError(t, db.Prune(v-1))
break
}
+ time.Sleep(10 * time.Millisecond) // add a small delay
}
}
Commitable suggestion
[!IMPORTANT]
Carefully review the code before committing. Make sure it correctly replaces the highlighted code, has no missing lines and indentaion issues.
func TestParallelWriteAndPruning(t *testing.T) { | |
db, err := New(t.TempDir()) | |
require.NoError(t, err) | |
defer db.Close() | |
latestVersion := 100 | |
kvCount := 100 | |
prunePeriod := 5 | |
wg := sync.WaitGroup{} | |
triggerStartCh := make(chan bool) | |
// start a goroutine that write to the database | |
wg.Add(1) | |
go func() { | |
<-triggerStartCh | |
defer wg.Done() | |
for i := 0; i < latestVersion; i++ { | |
cs := new(store.Changeset) | |
for j := 0; j < kvCount; j++ { | |
key := fmt.Sprintf("key-%d-%03d", i, j) | |
val := fmt.Sprintf("val-%d-%03d", i, j) | |
cs.AddKVPair(store.KVPair{StoreKey: storeKey1, Key: []byte(key), Value: []byte(val)}) | |
} | |
require.NoError(t, db.ApplyChangeset(uint64(i+1), cs)) | |
} | |
}() | |
// start a goroutine that prunes the database | |
wg.Add(1) | |
go func() { | |
<-triggerStartCh | |
defer wg.Done() | |
for i := 10; i < latestVersion; i += prunePeriod { | |
for { | |
v, err := db.GetLatestVersion() | |
require.NoError(t, err) | |
if v > uint64(i) { | |
t.Log("pruning version", v-1) | |
require.NoError(t, db.Prune(v-1)) | |
break | |
} | |
} | |
} | |
}() | |
// start the goroutines | |
close(triggerStartCh) | |
wg.Wait() | |
// check if the data is pruned | |
version := uint64(latestVersion - prunePeriod) | |
val, err := db.Get(storeKey1, version, []byte(fmt.Sprintf("key-%d-%03d", version-1, 0))) | |
require.NoError(t, err) | |
require.Nil(t, val) | |
version = uint64(latestVersion) | |
val, err = db.Get(storeKey1, version, []byte(fmt.Sprintf("key-%d-%03d", version-1, 0))) | |
require.NoError(t, err) | |
require.Equal(t, []byte(fmt.Sprintf("val-%d-%03d", version-1, 0)), val) | |
} | |
import "time" | |
func TestParallelWriteAndPruning(t *testing.T) { | |
db, err := New(t.TempDir()) | |
require.NoError(t, err) | |
defer db.Close() | |
latestVersion := 100 | |
kvCount := 100 | |
prunePeriod := 5 | |
wg := sync.WaitGroup{} | |
triggerStartCh := make(chan bool) | |
// start a goroutine that write to the database | |
wg.Add(1) | |
go func() { | |
<-triggerStartCh | |
defer wg.Done() | |
for i := 0; i < latestVersion; i++ { | |
cs := new(store.Changeset) | |
for j := 0; j < kvCount; j++ { | |
key := fmt.Sprintf("key-%d-%03d", i, j) | |
val := fmt.Sprintf("val-%d-%03d", i, j) | |
cs.AddKVPair(store.KVPair{StoreKey: storeKey1, Key: []byte(key), Value: []byte(val)}) | |
} | |
require.NoError(t, db.ApplyChangeset(uint64(i+1), cs)) | |
} | |
}() | |
// start a goroutine that prunes the database | |
wg.Add(1) | |
go func() { | |
<-triggerStartCh | |
defer wg.Done() | |
for i := 10; i < latestVersion; i += prunePeriod { | |
for { | |
v, err := db.GetLatestVersion() | |
require.NoError(t, err) | |
if v > uint64(i) { | |
t.Log("pruning version", v-1) | |
require.NoError(t, db.Prune(v-1)) | |
break | |
} | |
time.Sleep(10 * time.Millisecond) // add a small delay | |
} | |
} | |
}() | |
// start the goroutines | |
close(triggerStartCh) | |
wg.Wait() | |
// check if the data is pruned | |
version := uint64(latestVersion - prunePeriod) | |
val, err := db.Get(storeKey1, version, []byte(fmt.Sprintf("key-%d-%03d", version-1, 0))) | |
require.NoError(t, err) | |
require.Nil(t, val) | |
version = uint64(latestVersion) | |
val, err = db.Get(storeKey1, version, []byte(fmt.Sprintf("key-%d-%03d", version-1, 0))) | |
require.NoError(t, err) | |
require.Equal(t, []byte(fmt.Sprintf("val-%d-%03d", version-1, 0)), val) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 0
Configuration used: CodeRabbit UI
Files selected for processing (1)
- store/pruning/manager_test.go (2 hunks)
Files not reviewed due to errors (1)
- store/pruning/manager_test.go (Error: Server error. Please try again later.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you @cool-develope!
@cool-develope let's get the conflicting file resolved and get this merged 🙏 |
Description
Closes: #XXXX
Author Checklist
All items are required. Please add a note to the item if the item is not applicable and
please add links to any relevant follow up issues.
I have...
!
to the type prefix if API or client breaking changeCHANGELOG.md
make lint
andmake test
Reviewers Checklist
All items are required. Please add a note if the item is not applicable and please add
your handle next to the items reviewed if you only reviewed selected items.
I have...
!
in the type prefix if API or client breaking changeSummary by CodeRabbit
TestParallelWrites
to verify concurrent write capability.TestParallelWriteAndPruning
to test concurrent write and pruning functionality.