Skip to content

Commit

Permalink
Began working on RocksDB integration
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Apr 15, 2024
1 parent 7d20842 commit 6812ced
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 1 deletion.
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ ThisBuild / Test / testOptions += Tests.Argument(TestFrameworks.ScalaTest, "-oDF

val collectionCompatVersion: String = "2.11.0"
val haloDBVersion: String = "v0.5.6"
val rocksDBVersion: String = "9.0.0"
val catsEffectVersion: String = "3.5.4"
val fabricVersion: String = "1.14.2"
val fs2Version: String = "3.10.2"
Expand Down Expand Up @@ -75,6 +76,7 @@ lazy val core = project.in(file("core"))
"co.fs2" %% "fs2-core" % fs2Version,
"com.outr" %% "scribe-slf4j" % scribeVersion,
"com.github.yahoo" % "HaloDB" % haloDBVersion,
"org.rocksdb" % "rocksdbjni" % rocksDBVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test,
"org.typelevel" %% "cats-effect-testing-scalatest" % catsEffectTestingVersion % Test
),
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/lightdb/LightDB.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ abstract class LightDB(val directory: Path,

protected[lightdb] def createStore(name: String): Store = synchronized {
verifyInitialized()
val store = HaloStore(directory.resolve(name), indexThreads, maxFileSize)
// val store = HaloStore(directory.resolve(name), indexThreads, maxFileSize)
val store = RocksDBStore(directory.resolve(name))
stores = store :: stores
store
}
Expand Down
64 changes: 64 additions & 0 deletions core/src/main/scala/lightdb/Store.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import cats.implicits.{catsSyntaxParallelSequence1, toTraverseOps}
import com.oath.halodb.{HaloDB, HaloDBOptions}
import fabric.io.{JsonFormatter, JsonParser}
import fabric.rw._
import org.rocksdb.RocksDB

import java.nio.file.{Files, Path}
import java.util.concurrent.atomic.AtomicBoolean
Expand Down Expand Up @@ -107,4 +108,67 @@ case class HaloStore(directory: Path,
override def size: IO[Int] = IO(instance.size().toInt)

override def dispose(): IO[Unit] = IO(instance.close())
}

case class RocksDBStore(directory: Path) extends Store {
RocksDB.loadLibrary()

private val db: RocksDB = {
Files.createDirectories(directory.getParent)
RocksDB.open(directory.toAbsolutePath.toString)
}

override def keyStream[D]: fs2.Stream[IO, Id[D]] = fs2.Stream.force(IO {
val rocksIterator = db.newIterator()
rocksIterator.seekToFirst()
new Iterator[Array[Byte]] {
override def hasNext: Boolean = rocksIterator.isValid

override def next(): Array[Byte] = {
rocksIterator.next()
rocksIterator.key()
}
}
}.map { iterator =>
fs2.Stream.fromBlockingIterator[IO](iterator, 1024)
.map { array =>
Id[D](array.string)
}
})

override def stream[D]: fs2.Stream[IO, (Id[D], Array[Byte])] = fs2.Stream.force(IO {
val rocksIterator = db.newIterator()
rocksIterator.seekToFirst()
new Iterator[(Array[Byte], Array[Byte])] {
override def hasNext: Boolean = rocksIterator.isValid

override def next(): (Array[Byte], Array[Byte]) = {
rocksIterator.next()
rocksIterator.key() -> rocksIterator.value()
}
}
}.map { iterator =>
fs2.Stream.fromBlockingIterator[IO](iterator, 1024)
.map {
case (keyArray, valueArray) =>
Id[D](keyArray.string) -> valueArray
}
})

override def get[D](id: Id[D]): IO[Option[Array[Byte]]] = IO(Option(db.get(id.bytes)))

override def put[D](id: Id[D], value: Array[Byte]): IO[Boolean] = IO {
db.put(id.bytes, value)
true
}

override def delete[D](id: Id[D]): IO[Unit] = IO {
db.delete(id.bytes)
}

override def size: IO[Int] = keyStream.compile.count.map(_.toInt)

override def dispose(): IO[Unit] = IO {
db.close()
}
}

0 comments on commit 6812ced

Please sign in to comment.