From 7582034b6012e685ce4aa80ed7072c133910bfd9 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Mon, 4 May 2020 22:32:06 +0530 Subject: [PATCH 1/3] Add Write method to batch --- batch.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/batch.go b/batch.go index 586ad224d..7c370b247 100644 --- a/batch.go +++ b/batch.go @@ -19,6 +19,7 @@ package badger import ( "sync" + "github.com/dgraph-io/badger/v2/pb" "github.com/dgraph-io/badger/v2/y" "github.com/pkg/errors" ) @@ -89,6 +90,20 @@ func (wb *WriteBatch) callback(err error) { wb.err = err } +func (wb *WriteBatch) Write(kvList *pb.KVList) error { + for _, kv := range kvList.Kv { + e := Entry{Key: kv.Key, Value: kv.Value} + if len(kv.UserMeta) > 0 { + e.UserMeta = kv.UserMeta[0] + } + y.AssertTrue(kv.Version != 0) + if err := wb.SetEntryAt(&e, kv.Version); err != nil { + return err + } + } + return nil +} + // SetEntryAt is the equivalent of Txn.SetEntry but it also allows setting version for the entry. // SetEntryAt can be used only in managed mode. func (wb *WriteBatch) SetEntryAt(e *Entry, ts uint64) error { From cf08d7ce5b1b1b13c2469f04dee702b82f271f5c Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 5 May 2020 12:29:59 +0530 Subject: [PATCH 2/3] Reduce lock contention in wb.Write --- batch.go | 19 +++++++++++++------ value.go | 10 +++++----- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/batch.go b/batch.go index 7c370b247..5dd7d7a50 100644 --- a/batch.go +++ b/batch.go @@ -91,13 +91,16 @@ func (wb *WriteBatch) callback(err error) { } func (wb *WriteBatch) Write(kvList *pb.KVList) error { + wb.Lock() + defer wb.Unlock() for _, kv := range kvList.Kv { e := Entry{Key: kv.Key, Value: kv.Value} if len(kv.UserMeta) > 0 { e.UserMeta = kv.UserMeta[0] } y.AssertTrue(kv.Version != 0) - if err := wb.SetEntryAt(&e, kv.Version); err != nil { + e.version = kv.Version + if err := wb.handleEntry(&e); err != nil { return err } } @@ -114,11 +117,8 @@ func (wb *WriteBatch) SetEntryAt(e *Entry, ts uint64) error { return wb.SetEntry(e) } -// SetEntry is the equivalent of Txn.SetEntry. -func (wb *WriteBatch) SetEntry(e *Entry) error { - wb.Lock() - defer wb.Unlock() - +// Should be called with lock acquired. +func (wb *WriteBatch) handleEntry(e *Entry) error { if err := wb.txn.SetEntry(e); err != ErrTxnTooBig { return err } @@ -135,6 +135,13 @@ func (wb *WriteBatch) SetEntry(e *Entry) error { return nil } +// SetEntry is the equivalent of Txn.SetEntry. +func (wb *WriteBatch) SetEntry(e *Entry) error { + wb.Lock() + defer wb.Unlock() + return wb.handleEntry(e) +} + // Set is equivalent of Txn.Set(). func (wb *WriteBatch) Set(k, v []byte) error { e := &Entry{Key: k, Value: v} diff --git a/value.go b/value.go index f95f7a326..0883edffc 100644 --- a/value.go +++ b/value.go @@ -1435,11 +1435,11 @@ func (vlog *valueLog) write(reqs []*request) error { // log (this happens when a transaction contains entries with large value sizes) and // badger might run into out of memory errors. We flush the buffer here if it's size // grows beyond the max value log size. - if int64(buf.Len()) > vlog.db.opt.ValueLogFileSize { - if err := flushWrites(); err != nil { - return err - } - } + // if int64(buf.Len()) > vlog.db.opt.ValueLogFileSize { + // if err := flushWrites(); err != nil { + // return err + // } + // } } vlog.numEntriesWritten += uint32(written) // We write to disk here so that all entries that are part of the same transaction are From 1903f39cf02a027b15e95adc87260c9301452e35 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Wed, 6 May 2020 12:35:43 +0530 Subject: [PATCH 3/3] Remove dead code --- value.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/value.go b/value.go index 0883edffc..f95f7a326 100644 --- a/value.go +++ b/value.go @@ -1435,11 +1435,11 @@ func (vlog *valueLog) write(reqs []*request) error { // log (this happens when a transaction contains entries with large value sizes) and // badger might run into out of memory errors. We flush the buffer here if it's size // grows beyond the max value log size. - // if int64(buf.Len()) > vlog.db.opt.ValueLogFileSize { - // if err := flushWrites(); err != nil { - // return err - // } - // } + if int64(buf.Len()) > vlog.db.opt.ValueLogFileSize { + if err := flushWrites(); err != nil { + return err + } + } } vlog.numEntriesWritten += uint32(written) // We write to disk here so that all entries that are part of the same transaction are