-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
9e27c96
commit 5983af6
Showing
5 changed files
with
151 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
package lightdb | ||
|
||
import fabric.Json | ||
import fabric.rw.RW | ||
|
||
case class KeyValue(_id: Id[KeyValue], value: Json) extends Document[KeyValue] | ||
|
||
object KeyValue { | ||
implicit val rw: RW[KeyValue] = RW.gen | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,17 +1,101 @@ | ||
package lightdb | ||
|
||
import cats.effect.IO | ||
import cats.implicits.{catsSyntaxApplicativeByName, catsSyntaxParallelSequence1} | ||
import fabric.rw.RW | ||
import lightdb.upgrade.DatabaseUpgrade | ||
|
||
import java.nio.file.Path | ||
import java.util.concurrent.atomic.AtomicBoolean | ||
import scribe.cats.{io => logger} | ||
|
||
abstract class LightDB(directory: Path, | ||
indexThreads: Int = 2, | ||
maxFileSize: Int = 1024 * 1024) { | ||
private val _initialized = new AtomicBoolean(false) | ||
|
||
private var stores = List.empty[Store] | ||
|
||
protected lazy val backingStore = new Collection[KeyValue]("_backingStore", this) | ||
protected lazy val databaseInitialized: StoredValue[Boolean] = stored[Boolean]("_databaseInitialized", false) | ||
protected lazy val appliedUpgrades: StoredValue[Set[String]] = stored[Set[String]]("_appliedUpgrades", Set.empty) | ||
|
||
def initialized: Boolean = _initialized.get() | ||
|
||
def collections: List[Collection[_]] = Nil | ||
|
||
protected[lightdb] def verifyInitialized(): Unit = if (!initialized) throw new RuntimeException("Database not initialized!") | ||
|
||
def init(truncate: Boolean = false): IO[Unit] = if (_initialized.compareAndSet(false, true)) { | ||
for { | ||
_ <- logger.info(s"LightDB initializing...") | ||
// Truncate the database before we do anything if specified | ||
_ <- this.truncate().whenA(truncate) | ||
// Determine if this is an uninitialized database | ||
dbInitialized <- databaseInitialized.get() | ||
// Get applied database upgrades | ||
applied <- appliedUpgrades.get() | ||
// Determine upgrades that need to be applied | ||
upgrades = this.upgrades.filter(u => u.alwaysRun || !applied.contains(u.label)) match { | ||
case list if !dbInitialized => list.filter(_.applyToNew) | ||
case list => list | ||
} | ||
_ <- (for { | ||
_ <- logger.info(s"Applying ${upgrades.length} upgrades (${upgrades.map(_.label).mkString(", ")})...") | ||
_ <- doUpgrades(upgrades, stillBlocking = true) | ||
} yield ()).whenA(upgrades.nonEmpty) | ||
// Set initialized | ||
_ <- databaseInitialized.set(true) | ||
} yield () | ||
} else { | ||
IO.unit | ||
} | ||
|
||
protected[lightdb] def createStore(name: String): Store = synchronized { | ||
// TODO: verifyInitialized() | ||
verifyInitialized() | ||
val store = Store(directory.resolve(name), indexThreads, maxFileSize) | ||
stores = store :: stores | ||
store | ||
} | ||
} | ||
|
||
def upgrades: List[DatabaseUpgrade] = Nil | ||
|
||
def truncate(): IO[Unit] = collections.map(_.truncate()).parSequence.map(_ => ()) | ||
|
||
def dispose(): IO[Unit] = for { | ||
_ <- collections.map(_.dispose()).parSequence | ||
_ <- stores.map(_.dispose()).parSequence | ||
} yield () | ||
|
||
protected object stored { | ||
def apply[T](key: String, | ||
default: => T, | ||
cache: Boolean = true, | ||
collection: Collection[KeyValue] = backingStore) | ||
(implicit rw: RW[T]): StoredValue[T] = StoredValue[T](key, collection, () => default, cache = cache) | ||
|
||
def opt[T](key: String, | ||
cache: Boolean = true, | ||
collection: Collection[KeyValue] = backingStore) | ||
(implicit rw: RW[T]): StoredValue[Option[T]] = StoredValue[Option[T]](key, collection, () => None, cache = cache) | ||
} | ||
|
||
private def doUpgrades(upgrades: List[DatabaseUpgrade], | ||
stillBlocking: Boolean): IO[Unit] = upgrades.headOption match { | ||
case Some(upgrade) => | ||
val continueBlocking = upgrades.exists(_.blockStartup) | ||
val io = for { | ||
_ <- upgrade.upgrade(this) | ||
applied <- appliedUpgrades.get() | ||
_ <- appliedUpgrades.set(applied + upgrade.label) | ||
_ <- doUpgrades(upgrades.tail, continueBlocking) | ||
} yield () | ||
if (stillBlocking && !continueBlocking) { | ||
io.unsafeRunAndForget()(cats.effect.unsafe.IORuntime.global) | ||
IO.unit | ||
} else { | ||
io | ||
} | ||
case None => logger.info("Upgrades completed successfully") | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
package lightdb | ||
|
||
import cats.effect.IO | ||
import fabric.rw._ | ||
|
||
case class StoredValue[T](key: String, | ||
collection: Collection[KeyValue], | ||
default: () => T, | ||
cache: Boolean)(implicit rw: RW[T]) { | ||
private lazy val id = Id[KeyValue](key) | ||
|
||
private var cached: Option[T] = None | ||
|
||
def get(): IO[T] = cached match { | ||
case Some(t) => IO.pure(t) | ||
case None => collection.get(id).map { | ||
case Some(kv) => kv.value.as[T] | ||
case None => default() | ||
}.map { t => | ||
if (cache) cached = Some(t) | ||
t | ||
} | ||
} | ||
|
||
def exists(): IO[Boolean] = collection.get(id).map(_.nonEmpty) | ||
|
||
def clear(): IO[Unit] = collection.delete(id).map { _ => | ||
if (cache) cached = None | ||
} | ||
|
||
def set(value: T): IO[T] = collection | ||
.set(KeyValue(id, value.asJson)) | ||
.map { _ => | ||
if (cache) cached = Some(value) | ||
value | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
package lightdb.upgrade | ||
|
||
import cats.effect.IO | ||
import lightdb.LightDB | ||
|
||
trait DatabaseUpgrade { | ||
def label: String = getClass.getSimpleName.replace("$", "") | ||
def applyToNew: Boolean | ||
def blockStartup: Boolean | ||
def alwaysRun: Boolean | ||
|
||
def upgrade(db: LightDB): IO[Unit] | ||
} |