Skip to content

Commit

Permalink
Updated FlushingBacklog to support removing from the queue
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed May 10, 2024
1 parent e8a01f9 commit e77a16c
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 40 deletions.
2 changes: 1 addition & 1 deletion benchmark/src/main/scala/benchmark/IMDBBenchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ object IMDBBenchmark { // extends IOApp {
val limit: Limit = Limit.OneMillion

implicit val runtime: IORuntime = IORuntime.global
val implementation: BenchmarkImplementation = SQLiteImplementation
val implementation: BenchmarkImplementation = LightDBImplementation

private var ids: List[Ids] = Nil

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ object MariaDBImplementation extends BenchmarkImplementation {
c
}

private lazy val backlogAka = new FlushingBacklog[TitleAka](1000, 10000) {
private lazy val backlogAka = new FlushingBacklog[String, TitleAka](1000, 10000) {
override protected def write(list: List[TitleAkaPG]): IO[Unit] = IO {
val ps = connection.prepareStatement("INSERT INTO title_aka(id, titleId, ordering, title, region, language, types, attributes, isOriginalTitle) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)")
try {
Expand All @@ -41,7 +41,7 @@ object MariaDBImplementation extends BenchmarkImplementation {
}
}

private lazy val backlogBasics = new FlushingBacklog[TitleBasics](1000, 10000) {
private lazy val backlogBasics = new FlushingBacklog[String, TitleBasics](1000, 10000) {
override protected def write(list: List[TitleBasicsPG]): IO[Unit] = IO {
val ps = connection.prepareStatement("INSERT INTO title_basics(id, tconst, titleType, primaryTitle, originalTitle, isAdult, startYear, endYear, runtimeMinutes, genres) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
try {
Expand Down Expand Up @@ -100,9 +100,9 @@ object MariaDBImplementation extends BenchmarkImplementation {
genres = map.value("genres")
)

override def persistTitleAka(t: TitleAka): IO[Unit] = backlogAka.enqueue(t).map(_ => ())
override def persistTitleAka(t: TitleAka): IO[Unit] = backlogAka.enqueue(t.id, t).map(_ => ())

override def persistTitleBasics(t: TitleBasicsPG): IO[Unit] = backlogBasics.enqueue(t).map(_ => ())
override def persistTitleBasics(t: TitleBasicsPG): IO[Unit] = backlogBasics.enqueue(t.id, t).map(_ => ())

private def fromRS(rs: ResultSet): TitleAkaPG = TitleAkaPG(
id = rs.getString("id"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object MongoDBImplementation extends BenchmarkImplementation {
).asJava)
}

private lazy val backlogAka = new FlushingBacklog[Document](1000, 10000) {
private lazy val backlogAka = new FlushingBacklog[String, Document](1000, 10000) {
override protected def write(list: List[Document]): IO[Unit] = IO {
val javaList = new util.ArrayList[Document](batchSize)
list.foreach(javaList.add)
Expand All @@ -63,7 +63,7 @@ object MongoDBImplementation extends BenchmarkImplementation {
}
}

private lazy val backlogBasics = new FlushingBacklog[Document](1000, 10000) {
private lazy val backlogBasics = new FlushingBacklog[String, Document](1000, 10000) {
override protected def write(list: List[Document]): IO[Unit] = IO {
val javaList = new util.ArrayList[Document](batchSize)
list.foreach(javaList.add)
Expand All @@ -76,9 +76,9 @@ object MongoDBImplementation extends BenchmarkImplementation {
titleAka.createIndex(Indexes.ascending("titleId"))
}

override def persistTitleAka(t: Document): IO[Unit] = backlogAka.enqueue(t).map(_ => ())
override def persistTitleAka(t: Document): IO[Unit] = backlogAka.enqueue(t.getString("_id"), t).map(_ => ())

override def persistTitleBasics(t: Document): IO[Unit] = backlogBasics.enqueue(t).map(_ => ())
override def persistTitleBasics(t: Document): IO[Unit] = backlogBasics.enqueue(t.getString("_id"), t).map(_ => ())

override def streamTitleAka(): fs2.Stream[IO, Document] = {
val iterator: Iterator[Document] = titleAka.find().iterator().asScala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ object PostgresImplementation extends BenchmarkImplementation {
c
}

private lazy val backlogAka = new FlushingBacklog[TitleAka](1000, 10000) {
private lazy val backlogAka = new FlushingBacklog[String, TitleAka](1000, 10000) {
override protected def write(list: List[TitleAkaPG]): IO[Unit] = IO {
val ps = connection.prepareStatement("INSERT INTO title_aka(id, titleId, ordering, title, region, language, types, attributes, isOriginalTitle) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)")
try {
Expand All @@ -41,7 +41,7 @@ object PostgresImplementation extends BenchmarkImplementation {
}
}

private lazy val backlogBasics = new FlushingBacklog[TitleBasics](1000, 10000) {
private lazy val backlogBasics = new FlushingBacklog[String, TitleBasics](1000, 10000) {
override protected def write(list: List[TitleBasicsPG]): IO[Unit] = IO {
val ps = connection.prepareStatement("INSERT INTO title_basics(id, tconst, titleType, primaryTitle, originalTitle, isAdult, startYear, endYear, runtimeMinutes, genres) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
try {
Expand Down Expand Up @@ -100,9 +100,9 @@ object PostgresImplementation extends BenchmarkImplementation {
genres = map.value("genres")
)

override def persistTitleAka(t: TitleAka): IO[Unit] = backlogAka.enqueue(t).map(_ => ())
override def persistTitleAka(t: TitleAka): IO[Unit] = backlogAka.enqueue(t.id, t).map(_ => ())

override def persistTitleBasics(t: TitleBasicsPG): IO[Unit] = backlogBasics.enqueue(t).map(_ => ())
override def persistTitleBasics(t: TitleBasicsPG): IO[Unit] = backlogBasics.enqueue(t.id, t).map(_ => ())

private def fromRS(rs: ResultSet): TitleAkaPG = TitleAkaPG(
id = rs.getString("id"),
Expand Down
8 changes: 4 additions & 4 deletions benchmark/src/main/scala/benchmark/SQLiteImplementation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ object SQLiteImplementation extends BenchmarkImplementation {
c
}

private lazy val backlogAka = new FlushingBacklog[TitleAka](1000, 10000) {
private lazy val backlogAka = new FlushingBacklog[String, TitleAka](1000, 10000) {
override protected def write(list: List[TitleAkaPG]): IO[Unit] = IO {
val ps = connection.prepareStatement("INSERT INTO title_aka(id, titleId, ordering, title, region, language, types, attributes, isOriginalTitle) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)")
try {
Expand All @@ -41,7 +41,7 @@ object SQLiteImplementation extends BenchmarkImplementation {
}
}

private lazy val backlogBasics = new FlushingBacklog[TitleBasics](1000, 10000) {
private lazy val backlogBasics = new FlushingBacklog[String, TitleBasics](1000, 10000) {
override protected def write(list: List[TitleBasicsPG]): IO[Unit] = IO {
val ps = connection.prepareStatement("INSERT INTO title_basics(id, tconst, titleType, primaryTitle, originalTitle, isAdult, startYear, endYear, runtimeMinutes, genres) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
try {
Expand Down Expand Up @@ -100,9 +100,9 @@ object SQLiteImplementation extends BenchmarkImplementation {
genres = map.value("genres")
)

override def persistTitleAka(t: TitleAka): IO[Unit] = backlogAka.enqueue(t).map(_ => ())
override def persistTitleAka(t: TitleAka): IO[Unit] = backlogAka.enqueue(t.id, t).map(_ => ())

override def persistTitleBasics(t: TitleBasicsPG): IO[Unit] = backlogBasics.enqueue(t).map(_ => ())
override def persistTitleBasics(t: TitleBasicsPG): IO[Unit] = backlogBasics.enqueue(t.id, t).map(_ => ())

private def fromRS(rs: ResultSet): TitleAkaPG = TitleAkaPG(
id = rs.getString("id"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ object ScarangoImplementation extends BenchmarkImplementation {
override type TitleAka = TitleAkaADB
override type TitleBasics = TitleBasicsADB

private lazy val backlogAka = new FlushingBacklog[TitleAkaADB](1000, 10000) {
private lazy val backlogAka = new FlushingBacklog[Id[TitleAkaADB], TitleAkaADB](1000, 10000) {
override protected def write(list: List[TitleAkaADB]): IO[Unit] = db.titleAka.batch.insert(list).map(_ => ())
}

private lazy val backlogBasics = new FlushingBacklog[TitleBasicsADB](1000, 10000) {
private lazy val backlogBasics = new FlushingBacklog[Id[TitleBasicsADB], TitleBasicsADB](1000, 10000) {
override protected def write(list: List[TitleBasicsADB]): IO[Unit] =
db.titleBasics.batch.insert(list).map(_ => ())
}
Expand Down Expand Up @@ -52,9 +52,9 @@ object ScarangoImplementation extends BenchmarkImplementation {
genres = map.list("genres")
)

override def persistTitleAka(t: TitleAkaADB): IO[Unit] = backlogAka.enqueue(t).map(_ => ())
override def persistTitleAka(t: TitleAkaADB): IO[Unit] = backlogAka.enqueue(t._id, t).map(_ => ())

override def persistTitleBasics(t: TitleBasicsADB): IO[Unit] = backlogBasics.enqueue(t).map(_ => ())
override def persistTitleBasics(t: TitleBasicsADB): IO[Unit] = backlogBasics.enqueue(t._id, t).map(_ => ())

override def flush(): IO[Unit] = for {
_ <- backlogAka.flush()
Expand Down
46 changes: 30 additions & 16 deletions core/src/main/scala/lightdb/util/FlushingBacklog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,39 @@ package lightdb.util

import cats.effect.IO

import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import scala.concurrent.duration.DurationInt
import scala.jdk.CollectionConverters._

abstract class FlushingBacklog[T](val batchSize: Int, val maxSize: Int) {
private val queue = new ConcurrentLinkedQueue[T]()
abstract class FlushingBacklog[Key, Value](val batchSize: Int, val maxSize: Int) {
private val map = new ConcurrentHashMap[Key, Value]
private val size = new AtomicInteger(0)
private val flushing = new AtomicBoolean(false)

def enqueue(value: T): IO[T] = IO {
queue.add(value)
val size = this.size.incrementAndGet()
def enqueue(key: Key, value: Value): IO[Value] = IO {
val exists = map.put(key, value) != null
var doFlush = false
if (size >= batchSize) {
doFlush = shouldFlush()
if (!exists) {
val size = this.size.incrementAndGet()
if (size >= batchSize) {
doFlush = shouldFlush()
}
}
doFlush
}.flatMap {
case true => prepareWrite().map(_ => value)
case false => waitForBuffer().map(_ => value)
}

def remove(key: Key): Boolean = {
val b = map.remove(key) != null
if (b) {
size.decrementAndGet()
}
b
}

private def waitForBuffer(): IO[Unit] = if (size.get() > maxSize) {
IO.sleep(1.second).flatMap(_ => waitForBuffer())
} else {
Expand All @@ -39,9 +50,10 @@ abstract class FlushingBacklog[T](val batchSize: Int, val maxSize: Int) {
}
}

private def prepareWrite(): IO[Unit] = fs2.Stream
.repeatEval(IO {
val o = Option(queue.poll())
private def pollingStream: fs2.Stream[IO, Value] = fs2.Stream
.fromBlockingIterator[IO](map.keys().asIterator().asScala, 512)
.map { key =>
val o = Option(map.remove(key))
if (o.nonEmpty) {
val s = size.decrementAndGet()
if (s < 0) {
Expand All @@ -50,8 +62,10 @@ abstract class FlushingBacklog[T](val batchSize: Int, val maxSize: Int) {
}
}
o
})
.unNoneTerminate
}
.unNone

private def prepareWrite(): IO[Unit] = pollingStream
.compile
.toList
.flatMap { list =>
Expand All @@ -61,7 +75,7 @@ abstract class FlushingBacklog[T](val batchSize: Int, val maxSize: Int) {
flushing.set(false)
}

private def writeBatched(list: List[T]): IO[Unit] = {
private def writeBatched(list: List[Value]): IO[Unit] = {
val (current, more) = list.splitAt(batchSize)
val w = write(current)
if (more.nonEmpty) {
Expand All @@ -71,9 +85,9 @@ abstract class FlushingBacklog[T](val batchSize: Int, val maxSize: Int) {
}
}

protected def write(list: List[T]): IO[Unit]
protected def write(list: List[Value]): IO[Unit]

def flush(): IO[Unit] = if (queue.isEmpty) {
def flush(): IO[Unit] = if (map.isEmpty) {
IO.unit
} else {
prepareWrite()
Expand Down
1 change: 1 addition & 0 deletions sqlite/src/main/scala/lightdb/sqlite/SQLiteIndexer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ case class SQLiteIndexer[D <: Document[D]](indexSupport: SQLiteSupport[D], colle
}

override private[lightdb] def delete(id: Id[D]): IO[Unit] = IO {
indexSupport.backlog.remove(id)
val ps = indexSupport.connection.prepareStatement(s"DELETE FROM ${collection().collectionName} WHERE _id = ?")
try {
ps.setString(1, id.value)
Expand Down
5 changes: 2 additions & 3 deletions sqlite/src/main/scala/lightdb/sqlite/SQLiteSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import lightdb.util.FlushingBacklog
import java.nio.file.Path
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet, Types}

// TODO: Solve for uncommitted records being deleted leading to them being recreated at commit
trait SQLiteSupport[D <: Document[D]] extends IndexSupport[D] {
private lazy val path: Path = collection.db.directory.resolve(collection.collectionName).resolve("sqlite.db")
// TODO: Should each collection have a connection?
Expand All @@ -38,7 +37,7 @@ trait SQLiteSupport[D <: Document[D]] extends IndexSupport[D] {

val _id: SQLIndexedField[Id[D], D] = index("_id", doc => Some(doc._id))

private lazy val backlog = new FlushingBacklog[D](10_000, 100_000) {
private[sqlite] lazy val backlog = new FlushingBacklog[Id[D], D](10_000, 100_000) {
override protected def write(list: List[D]): IO[Unit] = IO {
val sql = s"INSERT OR REPLACE INTO ${collection.collectionName}(${index.fields.map(_.fieldName).mkString(", ")}) VALUES (${index.fields.map(_ => "?").mkString(", ")})"
val ps = connection.prepareStatement(sql)
Expand Down Expand Up @@ -133,7 +132,7 @@ trait SQLiteSupport[D <: Document[D]] extends IndexSupport[D] {
}

override protected def indexDoc(doc: D, fields: List[IndexedField[_, D]]): IO[Unit] =
backlog.enqueue(doc).map(_ => ())
backlog.enqueue(doc._id, doc).map(_ => ())

private def prepare(sql: String, params: List[Json]): PreparedStatement = try {
val ps = connection.prepareStatement(sql)
Expand Down

0 comments on commit e77a16c

Please sign in to comment.