-
Notifications
You must be signed in to change notification settings - Fork 409
/
store.rs
143 lines (124 loc) · 3.87 KB
/
store.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
use rocksdb;
use std::path::{Path, PathBuf};
use util::Bytes;
#[derive(Clone)]
pub struct Row {
pub key: Bytes,
pub value: Bytes,
}
impl Row {
pub fn into_pair(self) -> (Bytes, Bytes) {
(self.key, self.value)
}
}
pub trait ReadStore: Sync {
fn get(&self, key: &[u8]) -> Option<Bytes>;
fn scan(&self, prefix: &[u8]) -> Vec<Row>;
}
pub trait WriteStore: Sync {
fn write(&self, rows: Vec<Row>);
fn flush(&self);
}
#[derive(Clone)]
struct Options {
path: PathBuf,
bulk_import: bool,
}
pub struct DBStore {
db: rocksdb::DB,
opts: Options,
}
impl DBStore {
fn open_opts(opts: Options) -> Self {
debug!("opening DB at {:?}", opts.path);
let mut db_opts = rocksdb::Options::default();
db_opts.create_if_missing(true);
// db_opts.set_keep_log_file_num(10);
db_opts.set_max_open_files(if opts.bulk_import { 16 } else { 256 });
db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
db_opts.set_compression_type(rocksdb::DBCompressionType::Snappy);
db_opts.set_target_file_size_base(256 << 20);
db_opts.set_write_buffer_size(256 << 20);
db_opts.set_disable_auto_compactions(opts.bulk_import); // for initial bulk load
db_opts.set_advise_random_on_open(!opts.bulk_import); // bulk load uses sequential I/O
let mut block_opts = rocksdb::BlockBasedOptions::default();
block_opts.set_block_size(512 << 10);
DBStore {
db: rocksdb::DB::open(&db_opts, &opts.path).unwrap(),
opts,
}
}
/// Opens a new RocksDB at the specified location.
pub fn open(path: &Path) -> Self {
DBStore::open_opts(Options {
path: path.to_path_buf(),
bulk_import: true,
})
}
pub fn enable_compaction(self) -> Self {
let mut opts = self.opts.clone();
if opts.bulk_import == true {
opts.bulk_import = false;
drop(self); // DB must be closed before being re-opened
info!("enabling auto-compactions");
DBStore::open_opts(opts)
} else {
self
}
}
pub fn compact(self) -> Self {
let opts = self.opts.clone();
drop(self); // DB must be closed before being re-opened
let store = DBStore::open_opts(opts);
info!("starting full compaction");
store.db.compact_range(None, None); // would take a while
info!("finished full compaction");
store
}
}
impl ReadStore for DBStore {
fn get(&self, key: &[u8]) -> Option<Bytes> {
self.db.get(key).unwrap().map(|v| v.to_vec())
}
// TODO: use generators
fn scan(&self, prefix: &[u8]) -> Vec<Row> {
let mut rows = vec![];
for (key, value) in self.db.iterator(rocksdb::IteratorMode::From(
prefix,
rocksdb::Direction::Forward,
)) {
if !key.starts_with(prefix) {
break;
}
rows.push(Row {
key: key.to_vec(),
value: value.to_vec(),
});
}
rows
}
}
impl WriteStore for DBStore {
fn write(&self, rows: Vec<Row>) {
let mut batch = rocksdb::WriteBatch::default();
for row in rows {
batch.put(row.key.as_slice(), row.value.as_slice()).unwrap();
}
let mut opts = rocksdb::WriteOptions::new();
opts.set_sync(!self.opts.bulk_import);
opts.disable_wal(self.opts.bulk_import);
self.db.write_opt(batch, &opts).unwrap();
}
fn flush(&self) {
let mut opts = rocksdb::WriteOptions::new();
opts.set_sync(true);
opts.disable_wal(false);
let empty = rocksdb::WriteBatch::default();
self.db.write_opt(empty, &opts).unwrap();
}
}
impl Drop for DBStore {
fn drop(&mut self) {
trace!("closing DB at {:?}", self.opts.path);
}
}