Skip to content

Commit

Permalink
Chart generation WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Jun 30, 2024
1 parent 81adadd commit e75ea47
Show file tree
Hide file tree
Showing 16 changed files with 666 additions and 23 deletions.
Binary file added benchmark/charts/Insert Records-total-time.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added benchmark/charts/Stream Records-total-time.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
18 changes: 11 additions & 7 deletions benchmark/src/main/scala/benchmark/bench/Bench.scala
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
package benchmark.bench

trait Bench {
val RecordCount: Int = 1_000_000
val RecordCount: Int = 5_000_000
val StreamIterations: Int = 1
val SearchIterations: Int = 1

val tasks: List[Task] = List(
Task("Insert Records", RecordCount, insertRecords),
Task("Stream Records", StreamIterations, streamRecords),
Task("Stream Records", StreamIterations * RecordCount, streamRecords),
Task("Search Each Record", StreamIterations * RecordCount, searchEachRecord),
Task("Search All Records", StreamIterations, searchAllRecords)
Task("Search All Records", StreamIterations * RecordCount, searchAllRecords)
)

def name: String

def init(): Unit

protected def insertRecords(status: StatusCallback): Unit
protected def insertRecords(status: StatusCallback): Int

protected def streamRecords(status: StatusCallback): Int

protected def streamRecords(status: StatusCallback): Unit
protected def searchEachRecord(status: StatusCallback): Int

protected def searchEachRecord(status: StatusCallback): Unit
protected def searchAllRecords(status: StatusCallback): Int

protected def searchAllRecords(status: StatusCallback): Unit
def size(): Long

def dispose(): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ package benchmark.bench

import fabric.rw.RW

case class BenchmarkReport(name: String, maxProgress: Double, logs: List[StatusLog])
case class BenchmarkReport(benchName: String,
name: String,
maxProgress: Double,
size: Long,
logs: List[StatusLog])

object BenchmarkReport {
implicit val rw: RW[BenchmarkReport] = RW.gen
Expand Down
18 changes: 15 additions & 3 deletions benchmark/src/main/scala/benchmark/bench/Runner.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package benchmark.bench

import benchmark.bench.impl.LightDBBench
import benchmark.bench.impl.{DerbyBench, H2Bench, LightDBBench, PostgreSQLBench, SQLiteBench}
import fabric.io.JsonFormatter
import fabric.rw.Convertible
import lightdb.duckdb.DuckDBIndexer
Expand All @@ -24,6 +24,10 @@ object Runner {
"ldbHaloSQLite" -> LightDBBench(HaloDBStore, SQLiteIndexer),
"ldbHaloH2" -> LightDBBench(HaloDBStore, H2Indexer),
"ldbHaloDuck" -> LightDBBench(HaloDBStore, DuckDBIndexer),
"SQLite" -> SQLiteBench,
"PostgreSQL" -> PostgreSQLBench,
"H2" -> H2Bench,
"Derby" -> DerbyBench
)

def main(args: Array[String]): Unit = {
Expand All @@ -37,11 +41,19 @@ object Runner {
val status = StatusCallback()
status.start()
scribe.info(s"Executing ${task.name} task...")
task.f(status)
val count = task.f(status)
status.finish()
if (count != task.maxProgress.toInt) {
throw new RuntimeException(s"${bench.name} - ${task.name} expected ${task.maxProgress.toInt}, but received: $count")
}
val logs = status.logs
scribe.info(s"Completed in ${logs.last.elapsed} seconds")
BenchmarkReport(task.name, task.maxProgress, logs)
BenchmarkReport(
benchName = bench.name,
name = task.name,
maxProgress = task.maxProgress,
size = bench.size(),
logs = logs)
}
scribe.info(s"Disposing $implName benchmark...")
bench.dispose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.sun.management.OperatingSystemMXBean

import java.lang.management.ManagementFactory

case class StatusCallback(every: Long = 1_000L) {
case class StatusCallback(every: Long = 10_000L) {
val progress = new AtomicDouble(0.0)

def logs: List[StatusLog] = _logs.reverse
Expand Down
2 changes: 1 addition & 1 deletion benchmark/src/main/scala/benchmark/bench/Task.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package benchmark.bench

case class Task(name: String, maxProgress: Double = 1.0, f: StatusCallback => Unit)
case class Task(name: String, maxProgress: Double = 1.0, f: StatusCallback => Int)
158 changes: 158 additions & 0 deletions benchmark/src/main/scala/benchmark/bench/impl/DerbyBench.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package benchmark.bench.impl

import benchmark.bench.{Bench, StatusCallback}
import lightdb.util.Unique

import java.io.File
import java.sql.{Connection, DriverManager}
import scala.collection.parallel.CollectionConverters._

object DerbyBench extends Bench {
private lazy val connection: Connection = {
val path = new File("db/derby").getCanonicalPath
val c = DriverManager.getConnection(s"jdbc:derby:$path;create=true")
c.setAutoCommit(false)
c
}

override def name: String = "Derby"

override def init(): Unit = {
// executeUpdate("DROP TABLE IF EXISTS people")
executeUpdate("CREATE TABLE people(id VARCHAR(255), name VARCHAR(255), age INTEGER)")
executeUpdate("CREATE INDEX id_idx ON people(id)")
executeUpdate("CREATE INDEX age_idx ON people(age)")
}

override protected def insertRecords(status: StatusCallback): Int = {
val ps = connection.prepareStatement("INSERT INTO people(id, name, age) VALUES (?, ?, ?)")
try {
(0 until RecordCount)
.foldLeft(0)((total, index) => {
val person = Person(
name = Unique(),
age = index
)
ps.setString(1, person.id)
ps.setString(2, person.name)
ps.setInt(3, person.age)
ps.addBatch()
status.progress.set(index + 1)
total + 1
})
} finally {
ps.executeBatch()
ps.close()
connection.commit()
}
}

private def countRecords(): Int = {
val s = connection.createStatement()
try {
val rs = s.executeQuery("SELECT COUNT(*) FROM people")
try {
rs.next()
rs.getInt(1)
} finally {
rs.close()
}
} finally {
s.close()
}
}

override protected def streamRecords(status: StatusCallback): Int = (0 until StreamIterations)
.foldLeft(0)((total, iteration) => {
val s = connection.createStatement()
val rs = s.executeQuery("SELECT * FROM people")
var count = 0
while (rs.next()) {
count += 1
}
rs.close()
s.close()
if (count != RecordCount) {
scribe.warn(s"RecordCount was not $RecordCount, it was $count")
}
status.progress.set(iteration + 1)
total + count
})

override protected def searchEachRecord(status: StatusCallback): Int = {
var counter = 0
(0 until StreamIterations)
.foreach { iteration =>
val ps = connection.prepareStatement("SELECT * FROM people WHERE age = ?")
(0 until RecordCount)
.foreach { index =>
ps.setInt(1, index)
val rs = ps.executeQuery()
rs.next()
val person = Person(
name = rs.getString("name"),
age = rs.getInt("age"),
id = rs.getString("id")
)
if (person.age != index) {
scribe.warn(s"${person.age} was not $index")
}
if (rs.next()) {
scribe.warn(s"More than one result for $index")
}
rs.close()
counter += 1
status.progress.set((iteration + 1) * (index + 1))
}
ps.close()
}
counter
}

override protected def searchAllRecords(status: StatusCallback): Int = {
var counter = 0
(0 until StreamIterations)
.par
.foreach { iteration =>
val s = connection.createStatement()
val rs = s.executeQuery("SELECT * FROM people")
var count = 0
while (rs.next()) {
count += 1
counter += 1
}
rs.close()
s.close()
if (count != RecordCount) {
scribe.warn(s"RecordCount was not $RecordCount, it was $count")
}
status.progress.set(iteration + 1)
}
counter
}

override def size(): Long = {
def recurse(file: File): Long = if (file.isDirectory) {
file.listFiles().map(recurse).sum
} else {
file.length()
}
recurse(new File("db/derby"))
}

override def dispose(): Unit = {
connection.commit()
connection.close()
}

private def executeUpdate(sql: String): Unit = {
val s = connection.createStatement()
try {
s.executeUpdate(sql)
} finally {
s.close()
}
}

case class Person(name: String, age: Int, id: String = Unique())
}
Loading

0 comments on commit e75ea47

Please sign in to comment.