diff --git a/backup.go b/backup.go index aa0333916..e4458a1bd 100644 --- a/backup.go +++ b/backup.go @@ -134,6 +134,16 @@ type KVLoader struct { throttle *y.Throttle entries []*Entry entriesSize int64 + + // Accumulated size of the key and value data in entries without + // any accounting for extra encoding bytes. Used to decide when + // to flush the entries to disk. + entriesKeyValueSize int64 + // If non-zero, flushThreshold specifies the number of bytes to observe in keys and + // values before performing a flush. + // + // Note that other metrics are also used to decide when to flush to disk. + keyValueSizeFlushThreshold uint64 } // NewKVLoader returns a new instance of KVLoader. @@ -142,6 +152,8 @@ func (db *DB) NewKVLoader(maxPendingWrites int) *KVLoader { db: db, throttle: y.NewThrottle(maxPendingWrites), entries: make([]*Entry, 0, db.opt.maxBatchCount), + // TODO(reddaly): Use an option in db for this. + keyValueSizeFlushThreshold: 230 * 1024 * 1024, } } @@ -162,15 +174,20 @@ func (l *KVLoader) Set(kv *pb.KV) error { meta: meta, } estimatedSize := int64(e.estimateSize(l.db.opt.ValueThreshold)) + estimatedKeyValueSize := int64(len(e.Key) + len(e.Value)) + willFlush := int64(len(l.entries))+1 >= l.db.opt.maxBatchCount || + l.entriesSize+estimatedSize >= l.db.opt.maxBatchSize || + uint64(l.entriesKeyValueSize+estimatedKeyValueSize) >= l.keyValueSizeFlushThreshold + // Flush entries if inserting the next entry would overflow the transactional limits. - if int64(len(l.entries))+1 >= l.db.opt.maxBatchCount || - l.entriesSize+estimatedSize >= l.db.opt.maxBatchSize { + if willFlush { if err := l.send(); err != nil { return err } } l.entries = append(l.entries, e) l.entriesSize += estimatedSize + l.entriesKeyValueSize += estimatedKeyValueSize return nil } @@ -186,6 +203,7 @@ func (l *KVLoader) send() error { l.entries = make([]*Entry, 0, l.db.opt.maxBatchCount) l.entriesSize = 0 + l.entriesKeyValueSize = 0 return nil }