Skip to content

Commit

Permalink
Lots of tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Apr 12, 2024
1 parent 305cd01 commit 7d20842
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 90 deletions.
32 changes: 17 additions & 15 deletions benchmark/src/main/scala/benchmark/LightDBImplementation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package benchmark

import cats.effect.IO
import fabric.rw.RW
import lightdb.sqlite.{SQLIndexedField, SQLiteSupport}
import lightdb.upgrade.DatabaseUpgrade
import lightdb.{Collection, Document, Id, IndexedLinks, LightDB, MaxLinks}

Expand Down Expand Up @@ -52,17 +53,17 @@ object LightDBImplementation extends BenchmarkImplementation {

override def get(id: String): IO[TitleAkaLDB] = TitleAkaLDB(Id[TitleAkaLDB](id))

override def findByTitleId(titleId: String): IO[List[TitleAkaLDB]] = TitleAkaLDB.withSearchContext { implicit context =>
TitleAkaLDB
.query
.pageSize(100)
.filter(TitleAkaLDB.titleId === titleId)
.search()
.flatMap { page =>
page.docs
}
}
// TitleAkaLDB.titleId.query(titleId).compile.toList
// override def findByTitleId(titleId: String): IO[List[TitleAkaLDB]] = TitleAkaLDB.withSearchContext { implicit context =>
// TitleAkaLDB
// .query
// .pageSize(100)
// .filter(TitleAkaLDB.titleId === titleId)
// .search()
// .flatMap { page =>
// page.docs
// }
// }
override def findByTitleId(titleId: String): IO[List[TitleAkaLDB]] = TitleAkaLDB.titleId.query(titleId).compile.toList

override def flush(): IO[Unit] = for {
_ <- TitleAkaLDB.commit()
Expand All @@ -83,7 +84,7 @@ object LightDBImplementation extends BenchmarkImplementation {
scribe.info(s"TitleBasic counts -- Halo: $haloCount") //, Lucene: $luceneCount")
}

object DB extends LightDB(directory = Paths.get("imdb"), indexThreads = 10, maxFileSize = 1024 * 1024 * 100) {
object DB extends LightDB(directory = Paths.get("imdb"), maxFileSize = 1024 * 1024 * 1024) {
// val titleAka: Collection[TitleAkaLDB] = collection("titleAka", TitleAkaLDB)
// val titleBasics: Collection[TitleBasicsLDB] = collection("titleBasics", TitleBasicsLDB)

Expand All @@ -96,11 +97,12 @@ object LightDBImplementation extends BenchmarkImplementation {

case class TitleAkaLDB(titleId: String, ordering: Int, title: String, region: Option[String], language: Option[String], types: List[String], attributes: List[String], isOriginalTitle: Option[Boolean], _id: Id[TitleAka]) extends Document[TitleAka]

object TitleAkaLDB extends Collection[TitleAkaLDB]("titleAka", DB) {
object TitleAkaLDB extends Collection[TitleAkaLDB]("titleAka", DB) {// with SQLiteSupport[TitleAkaLDB] {
override implicit val rw: RW[TitleAkaLDB] = RW.gen

// val titleId: IndexedLinks[String, TitleAkaLDB] = indexedLinks[String]("titleId", identity, _.titleId, MaxLinks.OverflowTrim(100))
val titleId: StringField[TitleAkaLDB] = index("titleId").string(_.titleId)
val titleId: IndexedLinks[String, TitleAkaLDB] = indexedLinks[String]("titleId", identity, _.titleId, MaxLinks.OverflowTrim(100))
// val titleId: StringField[TitleAkaLDB] = index("titleId").string(_.titleId)
// val titleId: SQLIndexedField[String, TitleAkaLDB] = index("titleId", doc => Some(doc.titleId))
}

case class TitleBasicsLDB(tconst: String, titleType: String, primaryTitle: String, originalTitle: String, isAdult: Boolean, startYear: Int, endYear: Int, runtimeMinutes: Int, genres: List[String], _id: Id[TitleBasics]) extends Document[TitleBasics]
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ lazy val sqlite = project.in(file("sqlite"))
)

lazy val benchmark = project.in(file("benchmark"))
.dependsOn(core)
.dependsOn(core, lucene, sqlite)
.settings(
name := s"$projectName-benchmark",
fork := true,
Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/lightdb/Collection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import cats.effect.IO
import cats.implicits.{catsSyntaxApplicativeByName, toTraverseOps}
import fabric.rw.RW
import lightdb.index._
import lightdb.query.Query

abstract class Collection[D <: Document[D]](val collectionName: String,
protected[lightdb] val db: LightDB,
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/lightdb/LightDB.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import scribe.cats.{io => logger}

abstract class LightDB(val directory: Path,
val indexThreads: Int = 2,
val indexThreads: Int = Runtime.getRuntime.availableProcessors(),
val maxFileSize: Int = 1024 * 1024) {
private val _initialized = new AtomicBoolean(false)

Expand Down Expand Up @@ -54,7 +54,7 @@ abstract class LightDB(val directory: Path,

protected[lightdb] def createStore(name: String): Store = synchronized {
verifyInitialized()
val store = Store(directory.resolve(name), indexThreads, maxFileSize)
val store = HaloStore(directory.resolve(name), indexThreads, maxFileSize)
stores = store :: stores
store
}
Expand Down
94 changes: 60 additions & 34 deletions core/src/main/scala/lightdb/Store.scala
Original file line number Diff line number Diff line change
@@ -1,35 +1,29 @@
package lightdb

import cats.effect.IO
import cats.implicits.{catsSyntaxParallelSequence1, toTraverseOps}
import com.oath.halodb.{HaloDB, HaloDBOptions}
import fabric.io.{JsonFormatter, JsonParser}
import fabric.rw._

import java.nio.file.{Files, Path}
import java.util.concurrent.atomic.AtomicBoolean
import scala.jdk.CollectionConverters._

case class Store(directory: Path,
indexThreads: Int,
maxFileSize: Int) {
private lazy val instance: HaloDB = {
val opts = new HaloDBOptions
opts.setBuildIndexThreads(indexThreads)
opts.setMaxFileSize(maxFileSize)
trait Store {
def keyStream[D]: fs2.Stream[IO, Id[D]]

Files.createDirectories(directory.getParent)
HaloDB.open(directory.toAbsolutePath.toString, opts)
}
def stream[D]: fs2.Stream[IO, (Id[D], Array[Byte])]

def keyStream[D]: fs2.Stream[IO, Id[D]] = fs2.Stream.fromBlockingIterator[IO](instance.newKeyIterator().asScala, 1024)
.map { record =>
Id[D](record.getBytes.string)
}
def get[D](id: Id[D]): IO[Option[Array[Byte]]]

def stream[D]: fs2.Stream[IO, (Id[D], Array[Byte])] = fs2.Stream.fromBlockingIterator[IO](instance.newIterator().asScala, 1024)
.map { record =>
val key = record.getKey.string
Id[D](key) -> record.getValue
}
def put[D](id: Id[D], value: Array[Byte]): IO[Boolean]

def delete[D](id: Id[D]): IO[Unit]

def size: IO[Int]

def dispose(): IO[Unit]

def streamJson[D: RW]: fs2.Stream[IO, D] = stream[D].map {
case (_, bytes) =>
Expand All @@ -38,21 +32,13 @@ case class Store(directory: Path,
json.as[D]
}

def get[D](id: Id[D]): IO[Option[Array[Byte]]] = IO {
Option(instance.get(id.bytes))
}

def getJson[D: RW](id: Id[D]): IO[Option[D]] = get(id)
.map(_.map { bytes =>
val jsonString = bytes.string
val json = JsonParser(jsonString)
json.as[D]
})

def put[D](id: Id[D], value: Array[Byte]): IO[Boolean] = IO {
instance.put(id.bytes, value)
}

def putJson[D <: Document[D]](doc: D)
(implicit rw: RW[D]): IO[D] = IO {
val json = doc.json
Expand All @@ -61,12 +47,6 @@ case class Store(directory: Path,
put(doc._id, jsonString.getBytes).map(_ => doc)
}

def delete[D](id: Id[D]): IO[Unit] = IO {
instance.delete(id.bytes)
}

def size: IO[Int] = IO(instance.size().toInt)

def truncate(): IO[Unit] = keyStream[Any]
.evalMap { id =>
delete(id)
Expand All @@ -79,6 +59,52 @@ case class Store(directory: Path,
case _ => truncate()
}
}
}

case class HaloStore(directory: Path,
indexThreads: Int,
maxFileSize: Int) extends Store {
private lazy val instance: HaloDB = {
val opts = new HaloDBOptions
opts.setBuildIndexThreads(indexThreads)
opts.setMaxFileSize(maxFileSize)
opts.setUseMemoryPool(true)
opts.setMemoryPoolChunkSize(2 * 1024 * 1024)
opts.setFixedKeySize(32)
opts.setFlushDataSizeBytes(10 * 1024 * 1024)
opts.setCompactionThresholdPerFile(0.9)
opts.setCompactionJobRate(50 * 1024 * 1024)
opts.setNumberOfRecords(100_000_000)
opts.setCleanUpTombstonesDuringOpen(true)

Files.createDirectories(directory.getParent)
HaloDB.open(directory.toAbsolutePath.toString, opts)
}

override def keyStream[D]: fs2.Stream[IO, Id[D]] = fs2.Stream.fromBlockingIterator[IO](instance.newKeyIterator().asScala, 1024)
.map { record =>
Id[D](record.getBytes.string)
}

override def stream[D]: fs2.Stream[IO, (Id[D], Array[Byte])] = fs2.Stream.fromBlockingIterator[IO](instance.newIterator().asScala, 1024)
.map { record =>
val key = record.getKey.string
Id[D](key) -> record.getValue
}

override def get[D](id: Id[D]): IO[Option[Array[Byte]]] = IO {
Option(instance.get(id.bytes))
}

override def put[D](id: Id[D], value: Array[Byte]): IO[Boolean] = IO {
instance.put(id.bytes, value)
}

override def delete[D](id: Id[D]): IO[Unit] = IO {
instance.delete(id.bytes)
}

override def size: IO[Int] = IO(instance.size().toInt)

def dispose(): IO[Unit] = IO(instance.close())
override def dispose(): IO[Unit] = IO(instance.close())
}
4 changes: 3 additions & 1 deletion core/src/main/scala/lightdb/query/Query.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ case class Query[D <: Document[D]](indexSupport: IndexSupport[D],
filter: Option[Filter[D]] = None,
sort: List[Sort] = Nil,
scoreDocs: Boolean = false,
pageSize: Int = 1_000) {
pageSize: Int = 1_000,
countTotal: Boolean = false) {
def filter(filter: Filter[D]): Query[D] = copy(filter = Some(filter))
def sort(sort: Sort*): Query[D] = copy(sort = this.sort ::: sort.toList)
def clearSort: Query[D] = copy(sort = Nil)
def scoreDocs(b: Boolean): Query[D] = copy(scoreDocs = b)
def pageSize(size: Int): Query[D] = copy(pageSize = size)
def countTotal(b: Boolean): Query[D] = copy(countTotal = b)

def search()(implicit context: SearchContext[D]): IO[PagedResults[D]] = indexSupport.doSearch(
query = this,
Expand Down
29 changes: 18 additions & 11 deletions core/src/main/scala/lightdb/util/FlushingBacklog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,27 @@ abstract class FlushingBacklog[T](val batchSize: Int, val maxSize: Int) {
}
}

private def prepareWrite(): IO[Unit] = IO {
val pollingIterator = new Iterator[T] {
override def hasNext: Boolean = !queue.isEmpty

override def next(): T = {
val value = queue.poll()
if (value != null) {
FlushingBacklog.this.size.decrementAndGet()
private def prepareWrite(): IO[Unit] = fs2.Stream
.repeatEval(IO {
val o = Option(queue.poll())
if (o.nonEmpty) {
val s = size.decrementAndGet()
if (s < 0) {
scribe.warn("Size fell below zero!")
size.set(0)
}
value
}
o
})
.unNoneTerminate
.compile
.toList
.flatMap { list =>
writeBatched(list)
}
.map { _ =>
flushing.set(false)
}
pollingIterator.toList
}.flatMap(writeBatched).map(_ => flushing.set(false))

private def writeBatched(list: List[T]): IO[Unit] = {
val (current, more) = list.splitAt(batchSize)
Expand Down
Loading

0 comments on commit 7d20842

Please sign in to comment.