-
Notifications
You must be signed in to change notification settings - Fork 85
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
LevelDB support #258
LevelDB support #258
Changes from 27 commits
513b4eb
1332064
c0f982b
e6babba
9449db8
6b2408a
5d1914f
f3e3a85
1af7584
c112f96
7479867
8736204
f6ecdc5
35af660
fc224a9
d022396
0644028
f90f042
a08eae5
4a4ffb1
4555049
1eac9e3
c8e2cb0
5768fb6
8a61539
bc787e8
090f8de
58fc54e
f00ea96
2f8e75d
3852480
c13ad32
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
/* | ||
* Copyright 2015 Twitter inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.twitter.storehaus.leveldb | ||
|
||
import java.io.File | ||
import java.util.concurrent.Executors | ||
|
||
import com.twitter.storehaus.Store | ||
import com.twitter.util.{FuturePool, Time, Future} | ||
import org.iq80.leveldb._ | ||
import org.fusesource.leveldbjni.JniDBFactory._ | ||
|
||
/** | ||
* Store for interacting with a LevelDB database. | ||
* Example usage: | ||
* {{{ | ||
* import java.io.File | ||
* import org.iq80.leveldb.Options | ||
* import com.twitter.storehaus.leveldb.LevelDBStore | ||
* | ||
* val dir = new File("/some/path/myleveldb-directory") | ||
* dir.mkdirs() | ||
* val options = { | ||
* val opt = new Options | ||
* opt.createIfMissing(true) | ||
* opt.blockSize(8192) | ||
* opt | ||
* } | ||
* val store = new LevelDBStore(dir, new Options, 4) | ||
* }}} | ||
* @constructor Create a new LevelDB store. | ||
* @param dir Directory where the database is/will be stored. | ||
* @param options Different options for the database, see: https://github.com/google/leveldb/blob/master/util/options.cc | ||
* @param numThreads Number of threads in the pool of threads interacting with the db. | ||
* @author Ben Fradet | ||
* @since 10/03/15 | ||
*/ | ||
class LevelDBStore(val dir: File, | ||
val options: Options, | ||
val numThreads: Int = Runtime.getRuntime.availableProcessors) | ||
extends Store[Array[Byte], Array[Byte]] { | ||
|
||
private lazy val db = factory.open(dir, options) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this thread-safe? Can you link to docs showing this? If not, we should use: Then we'd do something like: dbMutex.acquireAndRun(futurePool {
// method logic here.
}) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will look into it and post my findings. |
||
private val futurePool = FuturePool(Executors.newFixedThreadPool(numThreads)) | ||
|
||
/** | ||
* Get a single key from the store. | ||
* Prefer multiGet if you are getting more than one key at a time. | ||
*/ | ||
override def get(k: Array[Byte]): Future[Option[Array[Byte]]] = | ||
futurePool { Option(db.get(k)) } | ||
|
||
/** | ||
* Replace a value. | ||
* Delete is the same as put((k,None)). | ||
*/ | ||
override def put(kv: (Array[Byte], Option[Array[Byte]])): Future[Unit] = { | ||
require(kv._1 != null) | ||
kv match { | ||
case (k, Some(v)) => futurePool { | ||
db.put(k, v) | ||
} | ||
case (k, None) => futurePool { | ||
db.delete(k) | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Replace a set of keys at one time. | ||
*/ | ||
override def multiPut[K1 <: Array[Byte]](kvs: Map[K1, Option[Array[Byte]]]) | ||
: Map[K1, Future[Unit]] = { | ||
val future = futurePool { | ||
val batch = db.createWriteBatch() | ||
kvs.foreach { | ||
case (k, Some(v)) => batch.put(k, v) | ||
case (k, None) => batch.delete(k) | ||
} | ||
db.write(batch) | ||
batch.close() | ||
} | ||
kvs.mapValues(_ => future) | ||
} | ||
|
||
/** | ||
* Close this store and release any resources. | ||
* It is undefined what happens on get/multiGet after close | ||
*/ | ||
override def close(time: Time): Future[Unit] = { | ||
futurePool { db.close() } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should be: |
||
super.close(time) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
package com.twitter.storehaus.leveldb | ||
|
||
import java.io.File | ||
import java.util | ||
|
||
import com.twitter.storehaus.Store | ||
import com.twitter.storehaus.testing.generator.NonEmpty | ||
import com.twitter.util.{Future, Await} | ||
import org.iq80.leveldb.Options | ||
import org.scalacheck.{Gen, Properties} | ||
import org.scalacheck.Prop.forAll | ||
|
||
import scala.util.Random | ||
|
||
/** | ||
* @author Ben Fradet | ||
*/ | ||
object LevelDBStoreProperties extends Properties("LevelDBStore") { | ||
|
||
def putAndGetTest(store: Store[Array[Byte], Array[Byte]], | ||
pairs: Gen[List[(Array[Byte], Option[Array[Byte]])]]) = | ||
forAll(pairs) { examples: List[(Array[Byte], Option[Array[Byte]])] => | ||
examples.forall { | ||
case (k, v) => { | ||
Await.result(store.put(k, v)) | ||
val found = Await.result(store.get(k)) | ||
found match { | ||
case Some(a) => util.Arrays.equals(a, v.get) | ||
case None => found == v | ||
} | ||
} | ||
} | ||
} | ||
|
||
def multiPutAndGetTest(store: Store[Array[Byte], Array[Byte]], | ||
pairs: Gen[List[(Array[Byte], Option[Array[Byte]])]]) = | ||
forAll(pairs) { examples: List[(Array[Byte], Option[Array[Byte]])] => | ||
val examplesMap = examples.toMap | ||
Await.result(Future.collect(store.multiPut(examplesMap).values.toList)) | ||
val result = store.multiGet(examplesMap.keySet) | ||
.map { case (key, v) => (key, Await.result(v)) } | ||
|
||
val stringifiedResults = stringifyMap(result) | ||
val stringifiedExamples = stringifyMap(examplesMap) | ||
|
||
stringifiedResults == stringifiedExamples | ||
} | ||
|
||
private def stringifyMap(map: Map[Array[Byte], Option[Array[Byte]]]) | ||
:Map[String, Option[String]] = { | ||
map.map { | ||
case (k, Some(v)) => (new String(k, "UTF-8"), | ||
Some(new String(v, "UTF-8"))) | ||
case (k, None) => (new String(k, "UTF-8"), None) | ||
} | ||
} | ||
|
||
property("LevelDB[Array[Byte], Array[Byte]] single") = { | ||
val dir = new File(System.getProperty("java.io.tmpdir"), | ||
"leveldb-test-" + new Random().nextInt(Int.MaxValue)) | ||
dir.mkdirs() | ||
val store = new LevelDBStore(dir, new Options(), 2) | ||
putAndGetTest(store, NonEmpty.Pairing.byteArrays()) | ||
} | ||
|
||
property("LevelDB[Array[Byte], Array[Byte] multi") = { | ||
val dir = new File(System.getProperty("java.io.tmpdir"), | ||
"leveldb-test-multi-" + new Random().nextInt(Int.MaxValue)) | ||
dir.mkdirs() | ||
val store = new LevelDBStore(dir, new Options(), 1) | ||
multiPutAndGetTest(store, NonEmpty.Pairing.byteArrays()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could make this Store[K, V] and and accept an implicit Codec[K] and Codec[V].
Not a blocker though given we have a combinator that can be used instead. ConvertedStore I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I tend to prefer wrapping as close to the underlying store as possible and using combinators (I know @ianoc has some information that can cause problems, but otherwise we have a ton of duplication, which I also think is a problem). What do you think of this comment, @rubanm ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me.
@BenFradet Could you change to something like:
There are a few existing stores that use Codec to convert between T and Array[Byte] that can serve as examples.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be a performance concern, but unless its been identified as one I'd just leave it to combinators
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, sorry I misread the comment. Yes combinators it is then. @BenFradet please disregard my earlier comment.