From c72a49ccce92f84a38b978f529e328dcdcdac6eb Mon Sep 17 00:00:00 2001 From: Matt Hicks Date: Thu, 6 Jun 2024 19:35:27 -0500 Subject: [PATCH] Preliminary support added for DuckDB --- README.md | 1 + .../scala/spec/SimpleHaloAndDuckDBSpec.scala | 269 ++++++++++++++++++ build.sbt | 17 +- .../scala/lightdb/duckdb/DuckDBSupport.scala | 45 +++ .../main/scala/lightdb/sql/SQLSupport.scala | 24 +- .../scala/lightdb/sqlite/SQLiteSupport.scala | 3 + 6 files changed, 351 insertions(+), 8 deletions(-) create mode 100644 all/src/test/scala/spec/SimpleHaloAndDuckDBSpec.scala create mode 100644 duckdb/src/main/scala/lightdb/duckdb/DuckDBSupport.scala diff --git a/README.md b/README.md index ac94c322..8300afa0 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ Computationally focused database using pluggable store + indexer ## Provided Indexers - Apache Lucene (https://lucene.apache.org) - Most featureful - SQLite (https://www.sqlite.org) - Fastest +- DuckDB (https://duckdb.org) - Experimental ## 1.0 TODO - [ ] More performance improvements to SQLite integration diff --git a/all/src/test/scala/spec/SimpleHaloAndDuckDBSpec.scala b/all/src/test/scala/spec/SimpleHaloAndDuckDBSpec.scala new file mode 100644 index 00000000..ff3fd605 --- /dev/null +++ b/all/src/test/scala/spec/SimpleHaloAndDuckDBSpec.scala @@ -0,0 +1,269 @@ +package spec + +import cats.effect.IO +import cats.effect.testing.scalatest.AsyncIOSpec +import fabric.rw._ +import lightdb._ +import lightdb.duckdb.DuckDBSupport +import lightdb.halo.HaloDBSupport +import lightdb.model.Collection +import lightdb.sqlite.SQLiteSupport +import lightdb.upgrade.DatabaseUpgrade +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AsyncWordSpec + +import java.nio.file.{Path, Paths} + +class SimpleHaloAndDuckDBSpec extends AsyncWordSpec with AsyncIOSpec with Matchers { + private val id1 = Id[Person]("john") + private val id2 = Id[Person]("jane") + + private val p1 = Person("John Doe", 21, id1) + private val p2 = Person("Jane Doe", 19, id2) + + "Simple database" should { + "initialize the database" in { + DB.init(truncate = true) + } + "store John Doe" in { + Person.set(p1).map { p => + p._id should be(id1) + } + } + "verify John Doe exists" in { + Person.get(id1).map { o => + o should be(Some(p1)) + } + } + "storage Jane Doe" in { + Person.set(p2).map { p => + p._id should be(id2) + } + } + "verify Jane Doe exists" in { + Person.get(id2).map { o => + o should be(Some(p2)) + } + } + "verify exactly two objects in data" in { + Person.size.map { size => + size should be(2) + } + } + "flush data" in { + Person.commit() + } + "verify exactly two objects in index" in { + Person.index.size.map { size => + size should be(2) + } + } + "verify exactly two objects in the store" in { + Person.idStream.compile.toList.map { ids => + ids.toSet should be(Set(id1, id2)) + } + } + "search by name for positive result" in { + Person.withSearchContext { implicit context => + Person + .query + .countTotal(true) + .filter(Person.name.is("Jane Doe")) + .search() + .flatMap { page => + page.page should be(0) + page.pages should be(1) + page.offset should be(0) + page.total should be(1) + page.ids should be(List(id2)) + page.hasNext should be(false) + page.docs.map { people => + people.length should be(1) + val p = people.head + p._id should be(id2) + p.name should be("Jane Doe") + p.age should be(19) + } + } + } + } + "search by age for positive result" in { + Person.ageLinks.query(19).compile.toList.map { people => + people.length should be(1) + val p = people.head + p._id should be(id2) + p.name should be("Jane Doe") + p.age should be(19) + } + } + "search by id for John" in { + Person(id1).map { person => + person._id should be(id1) + person.name should be("John Doe") + person.age should be(21) + } + } + "search for age range" in { + Person.withSearchContext { implicit context => + Person + .query + .filter(Person.age BETWEEN 19 -> 21) + .search() + .flatMap { results => + results.docs.map { people => + people.length should be(2) + val names = people.map(_.name).toSet + names should be(Set("John Doe", "Jane Doe")) + val ages = people.map(_.age).toSet + ages should be(Set(21, 19)) + } + } + } + } + "do paginated search" in { + Person.withSearchContext { implicit context => + Person.query.pageSize(1).countTotal(true).search().flatMap { page1 => + page1.page should be(0) + page1.pages should be(2) + page1.hasNext should be(true) + page1.docs.flatMap { people1 => + people1.length should be(1) + page1.next().flatMap { + case Some(page2) => + page2.page should be(1) + page2.pages should be(2) + page2.hasNext should be(false) + page2.docs.map { people2 => + people2.length should be(1) + } + case None => fail("Should have a second page") + } + } + } + } + } + "do paginated search as a stream" in { + Person.withSearchContext { implicit context => + Person.query.pageSize(1).countTotal(true).stream.compile.toList.map { people => + people.length should be(2) + people.map(_.name).toSet should be(Set("John Doe", "Jane Doe")) + } + } + } + "verify the number of records" in { + Person.index.size.map { size => + size should be(2) + } + } + "modify John" in { + Person.modify(id1) { + case Some(john) => IO(Some(john.copy(name = "Johnny Doe"))) + case None => throw new RuntimeException("John not found!") + }.map { person => + person.get.name should be("Johnny Doe") + } + } + "commit modified data" in { + Person.commit() + } + "verify the number of records has not changed after modify" in { + Person.index.size.map { size => + size should be(2) + } + } + "verify John was modified" in { + Person(id1).map { person => + person.name should be("Johnny Doe") + } + } + "delete John" in { + Person.delete(id1).map { deleted => + deleted should be(id1) + } + } + "verify exactly one object in data" in { + Person.size.map { size => + size should be(1) + } + } + "commit data" in { + Person.commit() + } + "verify exactly one object in index" in { + Person.index.size.map { size => + size should be(1) + } + } + "list all documents" in { + Person.stream.compile.toList.map { people => + people.length should be(1) + val p = people.head + p._id should be(id2) + p.name should be("Jane Doe") + p.age should be(19) + } + } + "replace Jane Doe" in { + Person.set(Person("Jan Doe", 20, id2)).map { p => + p._id should be(id2) + } + } + "verify Jan Doe" in { + Person(id2).map { p => + p._id should be(id2) + p.name should be("Jan Doe") + p.age should be(20) + } + } + "commit new data" in { + Person.commit() + } + "list new documents" in { + Person.stream.compile.toList.map { results => + results.length should be(1) + val doc = results.head + doc._id should be(id2) + doc.name should be("Jan Doe") + doc.age should be(20) + } + } + "verify start time has been set" in { + DB.startTime.get().map { startTime => + startTime should be > 0L + } + } + "dispose" in { + DB.dispose() + } + } + + object DB extends LightDB with HaloDBSupport { + override lazy val directory: Path = Paths.get("testdb") + + val startTime: StoredValue[Long] = stored[Long]("startTime", -1L) + + override lazy val userCollections: List[Collection[_]] = List( + Person + ) + + override def upgrades: List[DatabaseUpgrade] = List(InitialSetupUpgrade) + } + + case class Person(name: String, age: Int, _id: Id[Person] = Id()) extends Document[Person] + + object Person extends Collection[Person]("people", DB) with DuckDBSupport[Person] { + override implicit val rw: RW[Person] = RW.gen + + val name: I[String] = index.one("name", _.name) + val age: I[Int] = index.one("age", _.age) + val ageLinks: IndexedLinks[Int, Person] = indexedLinks[Int]("age", _.toString, _.age) + } + + object InitialSetupUpgrade extends DatabaseUpgrade { + override def applyToNew: Boolean = true + override def blockStartup: Boolean = true + override def alwaysRun: Boolean = false + + override def upgrade(ldb: LightDB): IO[Unit] = DB.startTime.set(System.currentTimeMillis()).map(_ => ()) + } +} \ No newline at end of file diff --git a/build.sbt b/build.sbt index 4eb1e815..1b243111 100644 --- a/build.sbt +++ b/build.sbt @@ -54,6 +54,7 @@ val fs2Version: String = "3.10.2" val scribeVersion: String = "3.14.0" val luceneVersion: String = "9.10.0" val sqliteVersion: String = "3.46.0.0" +val duckdbVersion: String = "1.0.0" val keysemaphoreVersion: String = "0.3.0-M1" val squantsVersion: String = "1.8.3" @@ -61,7 +62,7 @@ val scalaTestVersion: String = "3.2.18" val catsEffectTestingVersion: String = "1.5.0" lazy val root = project.in(file(".")) - .aggregate(core, halodb, rocksdb, mapdb, lucene, sql, sqlite, all) + .aggregate(core, halodb, rocksdb, mapdb, lucene, sql, sqlite, duckdb, all) .settings( name := projectName, publish := {}, @@ -174,8 +175,20 @@ lazy val sqlite = project.in(file("sqlite")) ) ) +lazy val duckdb = project.in(file("duckdb")) + .dependsOn(sql) + .settings( + name := s"$projectName-duckdb", + fork := true, + libraryDependencies ++= Seq( + "org.duckdb" % "duckdb_jdbc" % duckdbVersion, + "org.scalatest" %% "scalatest" % scalaTestVersion % Test, + "org.typelevel" %% "cats-effect-testing-scalatest" % catsEffectTestingVersion % Test + ) + ) + lazy val all = project.in(file("all")) - .dependsOn(core, halodb, rocksdb, mapdb, lucene, sqlite) + .dependsOn(core, halodb, rocksdb, mapdb, lucene, sqlite, duckdb) .settings( name := s"$projectName-all", fork := true, diff --git a/duckdb/src/main/scala/lightdb/duckdb/DuckDBSupport.scala b/duckdb/src/main/scala/lightdb/duckdb/DuckDBSupport.scala new file mode 100644 index 00000000..fed2eff3 --- /dev/null +++ b/duckdb/src/main/scala/lightdb/duckdb/DuckDBSupport.scala @@ -0,0 +1,45 @@ +package lightdb.duckdb + +import fabric.define.DefType +import lightdb.Document +import lightdb.sql.SQLSupport + +import java.nio.file.{Files, Path} +import java.sql.{Connection, DriverManager} + +trait DuckDBSupport[D <: Document[D]] extends SQLSupport[D] { + private lazy val path: Path = { + val p = collection.db.directory.resolve(collection.collectionName).resolve("duckdb.db") + Files.createDirectories(p.getParent) + p + } + // TODO: Should each collection have a connection? + + override protected def enableAutoCommit: Boolean = true + + override protected def createTable(): String = { + val indexes = index.fields.map { i => + if (i.fieldName == "_id") { + "_id VARCHAR PRIMARY KEY" + } else { + val t = i.rw.definition match { + case DefType.Str => "VARCHAR" + case DefType.Int => "INTEGER" + case d => throw new UnsupportedOperationException(s"${i.fieldName} has an unsupported type: $d") + } + s"${i.fieldName} $t" + } + }.mkString(", ") + val sql = s"CREATE TABLE IF NOT EXISTS ${collection.collectionName}($indexes)" + scribe.info(sql) + sql + } + + override protected def createConnection(): Connection = { + Class.forName("org.duckdb.DuckDBDriver") + val url = s"jdbc:duckdb:${path.toFile.getCanonicalPath}" + DriverManager.getConnection(url) + } + + override protected def truncateSQL: String = s"TRUNCATE ${collection.collectionName}" +} diff --git a/sql/src/main/scala/lightdb/sql/SQLSupport.scala b/sql/src/main/scala/lightdb/sql/SQLSupport.scala index 34962f1b..f8012450 100644 --- a/sql/src/main/scala/lightdb/sql/SQLSupport.scala +++ b/sql/src/main/scala/lightdb/sql/SQLSupport.scala @@ -4,17 +4,20 @@ import cats.effect.IO import fabric._ import fabric.io.JsonFormatter import lightdb.{Document, Id} -import lightdb.index.{IndexSupport, Index} +import lightdb.index.{Index, IndexSupport} import lightdb.model.AbstractCollection import lightdb.query.{PagedResults, Query, SearchContext, Sort, SortDirection} import lightdb.util.FlushingBacklog import java.nio.file.{Files, Path} import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet, Types} +import scala.util.Try trait SQLSupport[D <: Document[D]] extends IndexSupport[D] { private var _connection: Option[Connection] = None + protected def enableAutoCommit: Boolean = false + protected[lightdb] def connection: Connection = _connection match { case Some(c) => c case None => @@ -26,11 +29,13 @@ trait SQLSupport[D <: Document[D]] extends IndexSupport[D] { protected def createConnection(): Connection + protected def createTable(): String + protected def init(c: Connection): Unit = { - c.setAutoCommit(false) + c.setAutoCommit(enableAutoCommit) val s = c.createStatement() try { - s.executeUpdate(s"CREATE TABLE IF NOT EXISTS ${collection.collectionName}(${index.fields.map(_.fieldName).mkString(", ")}, PRIMARY KEY (_id))") + s.executeUpdate(createTable()) val existingColumns = columns(c) index.fields.foreach { f => if (f.fieldName != "_id") { @@ -81,8 +86,10 @@ trait SQLSupport[D <: Document[D]] extends IndexSupport[D] { } } + protected def truncateSQL: String = s"DELETE FROM ${collection.collectionName}" + def truncate(): IO[Unit] = IO.blocking { - val sql = s"DELETE FROM ${collection.collectionName}" + val sql = truncateSQL val ps = connection.prepareStatement(sql) try { ps.executeUpdate() @@ -107,7 +114,7 @@ trait SQLSupport[D <: Document[D]] extends IndexSupport[D] { val total = if (query.countTotal) { val sqlCount = s"""SELECT - | COUNT(*) + | COUNT(*) AS count |FROM | ${collection.collectionName} |$filters @@ -115,6 +122,8 @@ trait SQLSupport[D <: Document[D]] extends IndexSupport[D] { val countPs = prepare(sqlCount, params) try { val rs = countPs.executeQuery() + rs.next() +// scribe.info(s"Columns: ${rs.getMetaData.getColumnType(1)}") rs.getInt(1) } finally { countPs.close() @@ -197,7 +206,10 @@ trait SQLSupport[D <: Document[D]] extends IndexSupport[D] { case _ => ps.setString(index, JsonFormatter.Compact(value)) } - private def commit(): IO[Unit] = IO.blocking(connection.commit()) + private def commit(): IO[Unit] = IO.blocking { + if (!enableAutoCommit) + connection.commit() + } override protected[lightdb] def initModel(collection: AbstractCollection[D]): Unit = { super.initModel(collection) diff --git a/sqlite/src/main/scala/lightdb/sqlite/SQLiteSupport.scala b/sqlite/src/main/scala/lightdb/sqlite/SQLiteSupport.scala index 73416451..829aeb21 100644 --- a/sqlite/src/main/scala/lightdb/sqlite/SQLiteSupport.scala +++ b/sqlite/src/main/scala/lightdb/sqlite/SQLiteSupport.scala @@ -14,6 +14,9 @@ trait SQLiteSupport[D <: Document[D]] extends SQLSupport[D] { } // TODO: Should each collection have a connection? + override protected def createTable(): String = + s"CREATE TABLE IF NOT EXISTS ${collection.collectionName}(${index.fields.map(_.fieldName).mkString(", ")}, PRIMARY KEY (_id))" + override protected def createConnection(): Connection = { val url = s"jdbc:sqlite:${path.toFile.getCanonicalPath}" DriverManager.getConnection(url)