diff --git a/core/src/main/scala/lightdb/Collection.scala b/core/src/main/scala/lightdb/Collection.scala index 9da9e5e2..fcdd0706 100644 --- a/core/src/main/scala/lightdb/Collection.scala +++ b/core/src/main/scala/lightdb/Collection.scala @@ -3,13 +3,9 @@ package lightdb import cats.effect.IO import fabric.rw.RW -abstract class Collection[D <: Document[D]](implicit val rw: RW[D]) { - protected lazy val defaultCollectionName: String = getClass.getSimpleName.replace("$", "") +class Collection[D <: Document[D]](val collectionName: String, db: LightDB)(implicit val rw: RW[D]) { protected lazy val store: Store = db.createStore(collectionName) - protected def db: LightDB - protected def collectionName: String = defaultCollectionName - private var _indexedLinks = List.empty[IndexedLinks[_, D]] /** @@ -24,6 +20,8 @@ abstract class Collection[D <: Document[D]](implicit val rw: RW[D]) { case None => IO.pure(None) } } + def delete(id: Id[D]): IO[Unit] = store.delete(id) + def truncate(): IO[Unit] = store.truncate() def get(id: Id[D]): IO[Option[D]] = store.getJson(id) def apply(id: Id[D]): IO[D] = get(id) @@ -43,6 +41,8 @@ abstract class Collection[D <: Document[D]](implicit val rw: RW[D]) { } il } + + def dispose(): IO[Unit] = IO.unit } case class IndexedLinks[V, D <: Document[D]](createKey: V => String, diff --git a/core/src/main/scala/lightdb/KeyValue.scala b/core/src/main/scala/lightdb/KeyValue.scala new file mode 100644 index 00000000..10a5d29a --- /dev/null +++ b/core/src/main/scala/lightdb/KeyValue.scala @@ -0,0 +1,10 @@ +package lightdb + +import fabric.Json +import fabric.rw.RW + +case class KeyValue(_id: Id[KeyValue], value: Json) extends Document[KeyValue] + +object KeyValue { + implicit val rw: RW[KeyValue] = RW.gen +} \ No newline at end of file diff --git a/core/src/main/scala/lightdb/LightDB.scala b/core/src/main/scala/lightdb/LightDB.scala index 21795fc2..8e2a0bcd 100644 --- a/core/src/main/scala/lightdb/LightDB.scala +++ b/core/src/main/scala/lightdb/LightDB.scala @@ -1,17 +1,101 @@ package lightdb +import cats.effect.IO +import cats.implicits.{catsSyntaxApplicativeByName, catsSyntaxParallelSequence1} +import fabric.rw.RW +import lightdb.upgrade.DatabaseUpgrade + import java.nio.file.Path +import java.util.concurrent.atomic.AtomicBoolean +import scribe.cats.{io => logger} abstract class LightDB(directory: Path, indexThreads: Int = 2, maxFileSize: Int = 1024 * 1024) { + private val _initialized = new AtomicBoolean(false) private var stores = List.empty[Store] + protected lazy val backingStore = new Collection[KeyValue]("_backingStore", this) + protected lazy val databaseInitialized: StoredValue[Boolean] = stored[Boolean]("_databaseInitialized", false) + protected lazy val appliedUpgrades: StoredValue[Set[String]] = stored[Set[String]]("_appliedUpgrades", Set.empty) + + def initialized: Boolean = _initialized.get() + + def collections: List[Collection[_]] = Nil + + protected[lightdb] def verifyInitialized(): Unit = if (!initialized) throw new RuntimeException("Database not initialized!") + + def init(truncate: Boolean = false): IO[Unit] = if (_initialized.compareAndSet(false, true)) { + for { + _ <- logger.info(s"LightDB initializing...") + // Truncate the database before we do anything if specified + _ <- this.truncate().whenA(truncate) + // Determine if this is an uninitialized database + dbInitialized <- databaseInitialized.get() + // Get applied database upgrades + applied <- appliedUpgrades.get() + // Determine upgrades that need to be applied + upgrades = this.upgrades.filter(u => u.alwaysRun || !applied.contains(u.label)) match { + case list if !dbInitialized => list.filter(_.applyToNew) + case list => list + } + _ <- (for { + _ <- logger.info(s"Applying ${upgrades.length} upgrades (${upgrades.map(_.label).mkString(", ")})...") + _ <- doUpgrades(upgrades, stillBlocking = true) + } yield ()).whenA(upgrades.nonEmpty) + // Set initialized + _ <- databaseInitialized.set(true) + } yield () + } else { + IO.unit + } + protected[lightdb] def createStore(name: String): Store = synchronized { - // TODO: verifyInitialized() + verifyInitialized() val store = Store(directory.resolve(name), indexThreads, maxFileSize) stores = store :: stores store } -} + + def upgrades: List[DatabaseUpgrade] = Nil + + def truncate(): IO[Unit] = collections.map(_.truncate()).parSequence.map(_ => ()) + + def dispose(): IO[Unit] = for { + _ <- collections.map(_.dispose()).parSequence + _ <- stores.map(_.dispose()).parSequence + } yield () + + protected object stored { + def apply[T](key: String, + default: => T, + cache: Boolean = true, + collection: Collection[KeyValue] = backingStore) + (implicit rw: RW[T]): StoredValue[T] = StoredValue[T](key, collection, () => default, cache = cache) + + def opt[T](key: String, + cache: Boolean = true, + collection: Collection[KeyValue] = backingStore) + (implicit rw: RW[T]): StoredValue[Option[T]] = StoredValue[Option[T]](key, collection, () => None, cache = cache) + } + + private def doUpgrades(upgrades: List[DatabaseUpgrade], + stillBlocking: Boolean): IO[Unit] = upgrades.headOption match { + case Some(upgrade) => + val continueBlocking = upgrades.exists(_.blockStartup) + val io = for { + _ <- upgrade.upgrade(this) + applied <- appliedUpgrades.get() + _ <- appliedUpgrades.set(applied + upgrade.label) + _ <- doUpgrades(upgrades.tail, continueBlocking) + } yield () + if (stillBlocking && !continueBlocking) { + io.unsafeRunAndForget()(cats.effect.unsafe.IORuntime.global) + IO.unit + } else { + io + } + case None => logger.info("Upgrades completed successfully") + } +} \ No newline at end of file diff --git a/core/src/main/scala/lightdb/StoredValue.scala b/core/src/main/scala/lightdb/StoredValue.scala new file mode 100644 index 00000000..04b3a41d --- /dev/null +++ b/core/src/main/scala/lightdb/StoredValue.scala @@ -0,0 +1,37 @@ +package lightdb + +import cats.effect.IO +import fabric.rw._ + +case class StoredValue[T](key: String, + collection: Collection[KeyValue], + default: () => T, + cache: Boolean)(implicit rw: RW[T]) { + private lazy val id = Id[KeyValue](key) + + private var cached: Option[T] = None + + def get(): IO[T] = cached match { + case Some(t) => IO.pure(t) + case None => collection.get(id).map { + case Some(kv) => kv.value.as[T] + case None => default() + }.map { t => + if (cache) cached = Some(t) + t + } + } + + def exists(): IO[Boolean] = collection.get(id).map(_.nonEmpty) + + def clear(): IO[Unit] = collection.delete(id).map { _ => + if (cache) cached = None + } + + def set(value: T): IO[T] = collection + .set(KeyValue(id, value.asJson)) + .map { _ => + if (cache) cached = Some(value) + value + } +} diff --git a/core/src/main/scala/lightdb/upgrade/DatabaseUpgrade.scala b/core/src/main/scala/lightdb/upgrade/DatabaseUpgrade.scala new file mode 100644 index 00000000..784bdcf2 --- /dev/null +++ b/core/src/main/scala/lightdb/upgrade/DatabaseUpgrade.scala @@ -0,0 +1,13 @@ +package lightdb.upgrade + +import cats.effect.IO +import lightdb.LightDB + +trait DatabaseUpgrade { + def label: String = getClass.getSimpleName.replace("$", "") + def applyToNew: Boolean + def blockStartup: Boolean + def alwaysRun: Boolean + + def upgrade(db: LightDB): IO[Unit] +} \ No newline at end of file