Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LevelDB support #258

Merged
merged 32 commits into from
May 22, 2015
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
513b4eb
updated build for the levelDb module
BenFradet Mar 10, 2015
1332064
first version of the levelDbStore
BenFradet Mar 10, 2015
c0f982b
moved from String store to Array[Byte] store
BenFradet Mar 11, 2015
e6babba
first version of get for levelDB
BenFradet Mar 11, 2015
9449db8
rmd unused imports in MemcacheMergeableStoreProperties
BenFradet Mar 12, 2015
6b2408a
implemented put in LevelDBStore
BenFradet Mar 12, 2015
5d1914f
started testing LevelDBStore in LevelDBStoreProperties
BenFradet Mar 12, 2015
f3e3a85
closing db connection in futurePool in LevelDBStore
BenFradet Mar 13, 2015
1af7584
added workaround for testing storehaus-leveldb because of how sbt dea…
BenFradet Mar 13, 2015
c112f96
added requires to put and get for the leveldb store
BenFradet Mar 13, 2015
7479867
added byte arrays generators to NonEmpty and commented
BenFradet Mar 13, 2015
8736204
overhaul of LevelDBStoreProperties
BenFradet Mar 13, 2015
f6ecdc5
implementation of multiput for LevelDB
BenFradet Mar 22, 2015
35af660
rmd close(duration: Duration) overload in the levelDB store
BenFradet Mar 22, 2015
fc224a9
rmd multiGet from LevelDBStore as there is no point implementing it
BenFradet Mar 22, 2015
d022396
test for multiput in LevelDBStore
BenFradet Mar 22, 2015
0644028
simplified get for leveldb
BenFradet Apr 23, 2015
f90f042
test of batch writing for leveldb
BenFradet Apr 23, 2015
a08eae5
a bit of refactoring for the multi put test for leveldb
BenFradet Apr 25, 2015
4a4ffb1
enforce UTF-8 encoding on generation of byte arrays
BenFradet Apr 27, 2015
4555049
empty testOptions for ldb as tests are forked
BenFradet Apr 27, 2015
1eac9e3
reorganized logically the leveldb build settings
BenFradet Apr 28, 2015
c8e2cb0
defaulted the numThreads argument for the LevelDBStore constructor to…
BenFradet Apr 28, 2015
5768fb6
converted lambdas to pattern matching lambdas
BenFradet Apr 28, 2015
8a61539
added documentation for the LevelDB constructor and an example usage
BenFradet Apr 28, 2015
bc787e8
force encoding of strings built from array of bytes in UTF-8 in level…
BenFradet Apr 28, 2015
090f8de
rmd dependency for algebra and added one for core for the leveldb sub…
BenFradet Apr 30, 2015
58fc54e
now closing leveldb store properly
BenFradet May 17, 2015
f00ea96
merged develop
BenFradet May 17, 2015
2f8e75d
modified readme, fixes #51
BenFradet May 17, 2015
3852480
updated readme to reflect currently published artifacts
BenFradet May 17, 2015
c13ad32
Merge branch 'develop' of https://github.com/twitter/storehaus into f…
BenFradet May 22, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ object StorehausBuild extends Build {
val utilVersion = "6.22.0"
val scaldingVersion = "0.13.1"
lazy val storehaus = Project(

id = "storehaus",
base = file("."),
settings = sharedSettings ++ DocGen.publishSettings
Expand All @@ -141,6 +140,7 @@ object StorehausBuild extends Build {
storehausRedis,
storehausHBase,
storehausDynamoDB,
storehausLevelDB,
storehausKafka,
storehausKafka08,
storehausMongoDB,
Expand Down Expand Up @@ -200,7 +200,7 @@ object StorehausBuild extends Build {
parallelExecution in Test := false
).dependsOn(storehausAlgebra % "test->test;compile->compile")

lazy val storehausHBase= module("hbase").settings(
lazy val storehausHBase = module("hbase").settings(
libraryDependencies ++= Seq(
"com.twitter" %% "algebird-core" % algebirdVersion,
"com.twitter" %% "bijection-core" % bijectionVersion,
Expand All @@ -214,7 +214,7 @@ object StorehausBuild extends Build {
parallelExecution in Test := false
).dependsOn(storehausAlgebra % "test->test;compile->compile")

lazy val storehausDynamoDB= module("dynamodb").settings(
lazy val storehausDynamoDB = module("dynamodb").settings(
libraryDependencies ++= Seq(
"com.twitter" %% "algebird-core" % algebirdVersion,
"com.twitter" %% "bijection-core" % bijectionVersion,
Expand All @@ -226,19 +226,29 @@ object StorehausBuild extends Build {
parallelExecution in Test := false
).dependsOn(storehausAlgebra % "test->test;compile->compile")

lazy val storehausLevelDB = module("leveldb").settings(
testOptions in Test := Seq(),
libraryDependencies +=
"org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8",
parallelExecution in Test := false,
// workaround because of how sbt handles native libraries
// http://stackoverflow.com/questions/19425613/unsatisfiedlinkerror-with-native-library-under-sbt
fork in Test := true
).dependsOn(storehausAlgebra % "test->test;compile->compile")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be just storehausCore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I think that's what I meant to do when I did this.


lazy val storehausKafka = module("kafka").settings(
libraryDependencies ++= Seq (
"com.twitter" %% "bijection-core" % bijectionVersion,
"com.twitter" %% "bijection-avro" % bijectionVersion,
"com.twitter"%"kafka_2.9.2"%"0.7.0" % "provided" excludeAll(
ExclusionRule("com.sun.jdmk","jmxtools"),
ExclusionRule( "com.sun.jmx","jmxri"),
ExclusionRule( "javax.jms","jms")
)
"com.twitter" % "kafka_2.9.2" % "0.7.0" % "provided" excludeAll(
ExclusionRule("com.sun.jdmk", "jmxtools"),
ExclusionRule("com.sun.jmx", "jmxri"),
ExclusionRule("javax.jms", "jms")
)
),
// we don't want various tests clobbering each others keys
parallelExecution in Test := false
).dependsOn(storehausAlgebra % "test->test;compile->compile")
).dependsOn(storehausCore % "test->test;compile->compile")

lazy val storehausKafka08 = module("kafka-08").settings(
libraryDependencies ++= Seq (
Expand Down Expand Up @@ -272,7 +282,6 @@ object StorehausBuild extends Build {
parallelExecution in Test := false
).dependsOn(storehausAlgebra % "test->test;compile->compile")


val storehausTesting = Project(
id = "storehaus-testing",
base = file("storehaus-testing"),
Expand All @@ -299,6 +308,4 @@ object StorehausBuild extends Build {
"com.twitter" %% "bijection-netty" % bijectionVersion
)
).dependsOn(storehausCore)


}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.twitter.util.{ Closable, Duration, Future, Time }
*/
trait WritableStore[-K, -V] extends Closable {
/**
* replace a value
* Replace a value
* Delete is the same as put((k,None))
*/
def put(kv: (K, V)): Future[Unit] = multiPut(Map(kv)).apply(kv._1)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2015 Twitter inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.storehaus.leveldb

import java.io.File
import java.util.concurrent.Executors

import com.twitter.storehaus.Store
import com.twitter.util.{FuturePool, Time, Future, Duration}
import org.iq80.leveldb._
import org.fusesource.leveldbjni.JniDBFactory._

/**
* Store dealing with a LevelDB database.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some more comments about this store's usage, what the parameters mean etc. Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

* @author Ben Fradet
* @since 10/03/15
*/
class LevelDBStore(val dir: File, val options: Options, val numThreads: Int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numThreads can be defaulted to Runtime.getRuntime.availableProcessors perhaps. Some of the other store wrappers that use future pool do this.

extends Store[Array[Byte], Array[Byte]] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make this Store[K, V] and and accept an implicit Codec[K] and Codec[V].

Not a blocker though given we have a combinator that can be used instead. ConvertedStore I think.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I tend to prefer wrapping as close to the underlying store as possible and using combinators (I know @ianoc has some information that can cause problems, but otherwise we have a ton of duplication, which I also think is a problem). What do you think of this comment, @rubanm ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me.

@BenFradet Could you change to something like:

class LevelDBStore[K: Codec, V: Codec]( ... )
  extends Store[K, V]

There are a few existing stores that use Codec to convert between T and Array[Byte] that can serve as examples.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be a performance concern, but unless its been identified as one I'd just leave it to combinators

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry I misread the comment. Yes combinators it is then. @BenFradet please disregard my earlier comment.


private lazy val db = factory.open(dir, options)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this thread-safe? Can you link to docs showing this? If not, we should use:
https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/concurrent/AsyncMutex.scala

Then we'd do something like:

dbMutex.acquireAndRun(futurePool {
  // method logic here.
})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will look into it and post my findings.

private val futurePool = FuturePool(Executors.newFixedThreadPool(numThreads))

/** Get a single key from the store.
* Prefer multiGet if you are getting more than one key at a time.
*/
override def get(k: Array[Byte]): Future[Option[Array[Byte]]] = futurePool {
Option(db.get(k))
}

/**
* Replace a value
* Delete is the same as put((k,None))
*/
override def put(kv: (Array[Byte], Option[Array[Byte]])): Future[Unit] = {
require(kv._1 != null)
kv match {
case (k, Some(v)) => futurePool {
db.put(k, v)
}
case (k, None) => futurePool {
db.delete(k)
}
}
}

/** Replace a set of keys at one time */
override def multiPut[K1 <: Array[Byte]](kvs: Map[K1, Option[Array[Byte]]])
: Map[K1, Future[Unit]] = {
val future = futurePool {
val batch = db.createWriteBatch()
kvs.foreach(kv => kv match {
case (k, Some(v)) => batch.put(k, v)
case (k, None) => batch.delete(k)
})
db.write(batch)
batch.close()
}
kvs.mapValues(_ => future)
}

/** Close this store and release any resources.
* It is undefined what happens on get/multiGet after close
*/
override def close(time: Time): Future[Unit] = {
futurePool { db.close() }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be: futurePool { db.close() }.flatMap { _ => super.close(time) }

super.close(time)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.twitter.storehaus.leveldb

import java.io.File
import java.util

import com.twitter.storehaus.Store
import com.twitter.storehaus.testing.generator.NonEmpty
import com.twitter.util.{Future, Await}
import org.iq80.leveldb.Options
import org.scalacheck.{Gen, Properties}
import org.scalacheck.Prop.forAll

import scala.util.Random

/**
* @author Ben Fradet
*/
object LevelDBStoreProperties extends Properties("LevelDBStore") {

def putAndGetTest(store: Store[Array[Byte], Array[Byte]],
pairs: Gen[List[(Array[Byte], Option[Array[Byte]])]]) =
forAll(pairs) { examples: List[(Array[Byte], Option[Array[Byte]])] =>
examples.forall {
case (k, v) => {
Await.result(store.put(k, v))
val found = Await.result(store.get(k))
found match {
case Some(a) => util.Arrays.equals(a, v.get)
case None => found == v
}
}
}
}

def multiPutAndGetTest(store: Store[Array[Byte], Array[Byte]],
pairs: Gen[List[(Array[Byte], Option[Array[Byte]])]]) =
forAll(pairs) { examples: List[(Array[Byte], Option[Array[Byte]])] =>
val examplesMap = examples.toMap
Await.result(Future.collect(store.multiPut(examplesMap).values.toList))
val result = store.multiGet(examplesMap.keySet).map(kv => kv match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can just be .map { case (key, v) => ... } right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I meant to change that but forgot, thanks for reminding me.

case (key, v) => (key, Await.result(v))
})

val stringifiedResults = stringifyMap(result)
val stringifiedExamples = stringifyMap(examplesMap)

stringifiedResults == stringifiedExamples
}

private def stringifyMap(map: Map[Array[Byte], Option[Array[Byte]]])
:Map[String, Option[String]] = {
map.map(kv => kv match {
case (k, Some(v)) => (new String(k), Some(new String(v)))
case (k, None) => (new String(k), None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to also specify the encoding here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup.

})
}

property("LevelDB[Array[Byte], Array[Byte]] single") = {
val dir = new File(System.getProperty("java.io.tmpdir"),
"leveldb-test-" + new Random().nextInt(Int.MaxValue))
dir.mkdirs()
val store = new LevelDBStore(dir, new Options(), 2)
putAndGetTest(store, NonEmpty.Pairing.byteArrays())
}

property("LevelDB[Array[Byte], Array[Byte] multi") = {
val dir = new File(System.getProperty("java.io.tmpdir"),
"leveldb-test-multi-" + new Random().nextInt(Int.MaxValue))
dir.mkdirs()
val store = new LevelDBStore(dir, new Options(), 1)
multiPutAndGetTest(store, NonEmpty.Pairing.byteArrays())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@
package com.twitter.storehaus.memcache

import com.twitter.algebird.Semigroup
import com.twitter.bijection.{ Injection, NumericInjections }
import com.twitter.bijection.Injection
import com.twitter.bijection.netty.ChannelBufferBijection
import com.twitter.finagle.memcached.Client
import com.twitter.storehaus.testing.SelfAggregatingCloseableCleanup
import com.twitter.storehaus.testing.generator.NonEmpty
import com.twitter.util.{Await, Future}
import com.twitter.util.Await

import org.jboss.netty.buffer.ChannelBuffer
import org.scalacheck.Arbitrary
import org.scalacheck.Gen
import org.scalacheck.Properties
import org.scalacheck.Prop.forAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,20 @@ import org.scalacheck.{ Choose, Gen }
object NonEmpty {
/** Generator for non-empty alpha strings of random length */
def alphaStr: Gen[String] =
for(cs <- Gen.listOf1(Gen.alphaChar)) yield cs.mkString
for (cs <- Gen.listOf1(Gen.alphaChar)) yield cs.mkString

/** Generator for Options of non-empty alpha strings of random length */
def alphaStrOpt: Gen[Option[String]] =
alphaStr.flatMap(str => Gen.oneOf(Some(str), None))

/** Generator for non-empty byte arrays of random length */
def byteArray: Gen[Array[Byte]] =
for (cs <- Gen.listOf1(Gen.alphaChar)) yield cs.mkString.getBytes("UTF-8")

/** Generator for Options of non-empty by arrays of random length */
def byteArrayOpt: Gen[Option[Array[Byte]]] =
byteArray.flatMap(array => Gen.oneOf(Some(array), None))

/** Storehaus pairings of non-empty data.
* In most cases this means 2 element tuples of(K, Option[V]) */
object Pairing {
Expand All @@ -37,12 +45,20 @@ object NonEmpty {
opt <- NonEmpty.alphaStrOpt
} yield (str, opt)

/** Generator for pairings of non-empty byte arrays and non-empty byte
arrays options*/
def byteArrayPair: Gen[(Array[Byte], Option[Array[Byte]])] = for {
array <- NonEmpty.byteArray
opt <- NonEmpty.byteArrayOpt
} yield (array, opt)

/** Generator for pairings of non-empty alpha strings to options of positive numerics */
def alphaStrPosNumericPair[T : Numeric : Choose]: Gen[(String, Option[T])] = for {
str <- NonEmpty.alphaStr
opt <- Gen.posNum[T].flatMap(l => Gen.oneOf(Some(l), None))
} yield (str, opt)

/** Generator for pairings of numerics */
def numericPair[T : Numeric : Choose]: Gen[(T, Option[T])] = for {
num <- Gen.posNum[T]
opt <- Gen.posNum[T].flatMap(l => Gen.oneOf(Some(l), None))
Expand All @@ -52,6 +68,10 @@ object NonEmpty {
def alphaStrs(n: Int = 10) =
Gen.listOfN(n, alphaStrPair)

/** Generator for non-empty lists of (Array[Byte], Option[Array[Byte]])'s */
def byteArrays(n: Int = 10) =
Gen.listOfN(n, byteArrayPair)

/** Generator for non-empty lists of (String, Option[T])'s */
def alphaStrNumerics[T : Numeric : Choose](n: Int = 10) =
Gen.listOfN(n, alphaStrPosNumericPair[T])
Expand Down