Skip to content

Commit

Permalink
Lots of improvements to allow stopping and restarting a DB instance
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed May 11, 2024
1 parent 332053f commit c8739a2
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 28 deletions.
19 changes: 15 additions & 4 deletions core/src/main/scala/lightdb/LightDB.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package lightdb
import cats.effect.IO
import cats.implicits.{catsSyntaxApplicativeByName, catsSyntaxParallelSequence1, toTraverseOps}
import fabric.rw._
import lightdb.model.Collection
import lightdb.model.{AbstractCollection, Collection, DocumentModel}
import lightdb.upgrade.DatabaseUpgrade

import java.nio.file.Path
Expand All @@ -23,7 +23,7 @@ abstract class LightDB {

def initialized: Boolean = _initialized.get()

def collections: List[Collection[_]]
def collections: List[AbstractCollection[_]]
def upgrades: List[DatabaseUpgrade]

def commit(): IO[Unit] = collections.map(_.commit()).sequence.map(_ => ())
Expand All @@ -32,7 +32,7 @@ abstract class LightDB {

def init(truncate: Boolean = false): IO[Unit] = if (_initialized.compareAndSet(false, true)) {
for {
_ <- logger.info(s"LightDB initializing ${directory.getFileName.toString}...")
_ <- logger.info(s"LightDB initializing ${directory.getFileName.toString} collection...")
// Truncate the database before we do anything if specified
_ <- this.truncate().whenA(truncate)
// Determine if this is an uninitialized database
Expand All @@ -57,12 +57,23 @@ abstract class LightDB {

protected[lightdb] def createStoreInternal(name: String): Store = synchronized {
verifyInitialized()
// val store = HaloStore(directory.resolve(name), indexThreads, maxFileSize)
val store = createStore(name)
stores = store :: stores
store
}

protected def collection[D <: Document[D]](name: String,
model: DocumentModel[D],
autoCommit: Boolean = false,
atomic: Boolean = true)
(implicit rw: RW[D]): AbstractCollection[D] = AbstractCollection[D](
name = name,
db = this,
model = model,
autoCommit = autoCommit,
atomic = atomic
)

protected def createStore(name: String): Store

def truncate(): IO[Unit] = collections.map(_.truncate()).parSequence.map(_ => ())
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/lightdb/index/IndexSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ trait IndexSupport[D <: Document[D]] extends DocumentModel[D] {
case _ => _collection.getOrElse(throw new RuntimeException("DocumentModel not initialized with Collection (yet)"))
}

lazy val query: Query[D] = Query(this, collection)
def query: Query[D] = Query(this, collection)

override protected[lightdb] def initModel(collection: AbstractCollection[D]): Unit = {
super.initModel(collection)
Expand Down
31 changes: 29 additions & 2 deletions core/src/main/scala/lightdb/model/AbstractCollection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ trait AbstractCollection[D <: Document[D]] extends DocumentActionSupport[D] {
doc = doc,
collection = this,
set = (id, json) => store.putJson(id, json)
)(lock)
)(lock).flatMap { doc =>
commit().whenA(autoCommit).map(_ => doc)
}
}

def modify(id: Id[D])
Expand All @@ -76,12 +78,15 @@ trait AbstractCollection[D <: Document[D]] extends DocumentActionSupport[D] {
collection = this,
get = apply,
delete = id => store.delete(id)
)(lock)
)(lock).flatMap { id =>
commit().whenA(autoCommit).map(_ => id)
}
}

def truncate(): IO[Unit] = for {
_ <- store.truncate()
_ <- model.indexedLinks.map(_.store.truncate()).sequence
_ <- truncateActions.invoke()
_ <- commit().whenA(autoCommit)
} yield ()

Expand Down Expand Up @@ -122,3 +127,25 @@ trait AbstractCollection[D <: Document[D]] extends DocumentActionSupport[D] {
il
}
}

object AbstractCollection {
def apply[D <: Document[D]](name: String,
db: LightDB,
model: DocumentModel[D],
autoCommit: Boolean = false,
atomic: Boolean = true)(implicit docRW: RW[D]): AbstractCollection[D] = {
val ac = autoCommit
val at = atomic
val lightDB = db
val documentModel = model
new AbstractCollection[D] {
override def collectionName: String = name
override def autoCommit: Boolean = ac
override def atomic: Boolean = at
override protected[lightdb] def db: LightDB = lightDB

override implicit val rw: RW[D] = docRW
override def model: DocumentModel[D] = documentModel
}
}
}
5 changes: 3 additions & 2 deletions core/src/main/scala/lightdb/model/Collection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ abstract class Collection[D <: Document[D]](val collectionName: String,
object Collection {
def apply[D <: Document[D]](collectionName: String,
db: LightDB,
autoCommit: Boolean = false)(implicit docRW: RW[D]): Collection[D] =
new Collection[D](collectionName, db, autoCommit = autoCommit) {
autoCommit: Boolean = false,
atomic: Boolean = true)(implicit docRW: RW[D]): Collection[D] =
new Collection[D](collectionName, db, autoCommit = autoCommit, atomic = atomic) {
override implicit val rw: RW[D] = docRW
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ trait DocumentActionSupport[D <: Document[D]] {
val postDelete: DocumentActions[D, D] = DocumentActions[D, D](DocumentAction.PostDelete)

val commitActions: UnitActions = new UnitActions
val truncateActions: UnitActions = new UnitActions
val disposeActions: UnitActions = new UnitActions

protected def doSet(doc: D,
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/lightdb/model/DocumentModel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ trait DocumentModel[D <: Document[D]] {
for {
// Add to IndexedLinks
_ <- _indexedLinks.map(_.add(doc)).sequence
_ <- collection.commit().whenA(collection.autoCommit)
} yield Some(doc)
})
collection.postDelete.add((action: DocumentAction, doc: D, collection: AbstractCollection[D]) => {
for {
// Remove from IndexedLinks
_ <- _indexedLinks.map(_.remove(doc)).sequence
_ <- collection.commit().whenA(collection.autoCommit)
} yield Some(doc)
})
}
Expand Down
6 changes: 5 additions & 1 deletion halodb/src/main/scala/lightdb/halo/HaloDBSupport.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package lightdb.halo

import lightdb.{LightDB, Store}
import scribe.{Level, Logger}

trait HaloDBSupport {
this: LightDB =>

def indexThreads: Int = Runtime.getRuntime.availableProcessors()
def maxFileSize: Int = 1024 * 1024

override protected def createStore(name: String): Store = HaloDBStore(directory.resolve(name), indexThreads, maxFileSize)
Logger("com.oath.halodb").withMinimumLevel(Level.Warn).replace()

override protected def createStore(name: String): Store =
HaloDBStore(directory.resolve(name), indexThreads, maxFileSize)
}
51 changes: 35 additions & 16 deletions sqlite/src/main/scala/lightdb/sqlite/SQLiteSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,28 @@ import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet, Types}
trait SQLiteSupport[D <: Document[D]] extends IndexSupport[D] {
private lazy val path: Path = collection.db.directory.resolve(collection.collectionName).resolve("sqlite.db")
// TODO: Should each collection have a connection?
private[sqlite] lazy val connection: Connection = {
val c = DriverManager.getConnection(s"jdbc:sqlite:${path.toFile.getCanonicalPath}")
c.setAutoCommit(false)
val s = c.createStatement()
try {
s.executeUpdate(s"CREATE TABLE IF NOT EXISTS ${collection.collectionName}(${index.fields.map(_.fieldName).mkString(", ")}, PRIMARY KEY (_id))")
index.fields.foreach { f =>
if (f.fieldName != "_id") {
val indexName = s"${f.fieldName}_idx"
s.executeUpdate(s"CREATE INDEX IF NOT EXISTS $indexName ON ${collection.collectionName}(${f.fieldName})")

private var _connection: Option[Connection] = None
private[sqlite] def connection: Connection = _connection match {
case Some(c) => c
case None =>
val url = s"jdbc:sqlite:${path.toFile.getCanonicalPath}"
val c = DriverManager.getConnection(url)
c.setAutoCommit(true)
val s = c.createStatement()
try {
s.executeUpdate(s"CREATE TABLE IF NOT EXISTS ${collection.collectionName}(${index.fields.map(_.fieldName).mkString(", ")}, PRIMARY KEY (_id))")
index.fields.foreach { f =>
if (f.fieldName != "_id") {
val indexName = s"${f.fieldName}_idx"
s.executeUpdate(s"CREATE INDEX IF NOT EXISTS $indexName ON ${collection.collectionName}(${f.fieldName})")
}
}
} finally {
s.close()
}
} finally {
s.close()
}
c
_connection = Some(c)
c
}

override lazy val index: SQLiteIndexer[D] = SQLiteIndexer(this, () => collection)
Expand All @@ -55,6 +61,16 @@ trait SQLiteSupport[D <: Document[D]] extends IndexSupport[D] {
}
}

private def truncate(): IO[Unit] = IO {
val sql = s"DELETE FROM ${collection.collectionName}"
val ps = connection.prepareStatement(sql)
try {
ps.executeUpdate()
} finally {
ps.close()
}
}

override def doSearch(query: Query[D],
context: SearchContext[D],
offset: Int,
Expand Down Expand Up @@ -103,7 +119,6 @@ trait SQLiteSupport[D <: Document[D]] extends IndexSupport[D] {
|OFFSET
| $offset
|""".stripMargin
// scribe.info(sql)
val ps = prepare(sql, params)
val rs = ps.executeQuery()
try {
Expand Down Expand Up @@ -156,6 +171,10 @@ trait SQLiteSupport[D <: Document[D]] extends IndexSupport[D] {
override protected[lightdb] def initModel(collection: AbstractCollection[D]): Unit = {
super.initModel(collection)
collection.commitActions.add(backlog.flush())
collection.disposeActions.add(IO(connection.close()))
collection.truncateActions.add(truncate())
collection.disposeActions.add(IO {
connection.close()
_connection = None
})
}
}

0 comments on commit c8739a2

Please sign in to comment.