Skip to content

Commit

Permalink
feat(db): #57 Add DB migration module (#72)
Browse files Browse the repository at this point in the history
* feat(db): #57 Add DB migration module

* docs(db): #57 Add db-migration module documentation
  • Loading branch information
rlemaitre authored Mar 2, 2024
1 parent ce300f3 commit 7084a6b
Show file tree
Hide file tree
Showing 18 changed files with 550 additions and 67 deletions.
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
java temurin-17.0.9+9
sbt 1.9.6
adr-tools 3.0.0
postgres 16.1
postgres 16.2
act 0.2.54
doctoolchain 3.2.0
13 changes: 12 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@ lazy val db = Project("pillars-db", file("modules/db"))
)
.dependsOn(core)

lazy val dbMigrations = Project("pillars-db-migration", file("modules/db-migration"))
.enablePlugins(BuildInfoPlugin)
.settings(
name := "pillars-db-migration",
description := "pillars-db is a scala 3 library providing database migrations",
libraryDependencies ++= Dependencies.migrations,
buildInfoKeys := Seq[BuildInfoKey](name, version, description),
buildInfoPackage := "pillars.db.migrations.build"
)
.dependsOn(core, db)

lazy val flags = Project("pillars-flags", file("modules/flags"))
.enablePlugins(BuildInfoPlugin)
.settings(
Expand Down Expand Up @@ -113,7 +124,7 @@ lazy val example = Project("pillars-example", file("modules/example"))
buildInfoPackage := "example.build", // //<6>
publish / skip := true
)
.dependsOn(core, db, flags, httpClient)
.dependsOn(core, db, flags, httpClient, dbMigrations)
// end::example[]
lazy val docs = Project("pillars-docs", file("modules/docs"))
.settings(
Expand Down
10 changes: 6 additions & 4 deletions modules/core/src/main/scala/pillars/Pillars.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,15 @@ object Pillars:
: Resource[F, Modules[F]] =
val loaders = ServiceLoader.load(classOf[Loader])
.asScala
.toList
scribe.info(s"Found ${loaders.size} module loaders: ${loaders.map(_.name).mkString(", ")}")
.groupBy(_.key)
.map((key, value) => key -> value.head)
scribe.info(s"Found ${loaders.size} module loaders: ${loaders.keys.map(_.name).mkString(", ")}")
loaders.topologicalSort(_.dependsOn) match
case Left(value) => throw IllegalStateException("Circular dependency detected in modules")
case Right(value) =>
value.foldLeftM(Modules.empty[F]): (acc, loader) =>
loader.load(context, acc).map(acc.add)
value.foldLeftM(Modules.empty[F]):
case (acc, (key, loader)) =>
loader.load(context, acc).map(acc.add(key))
end match
end loadModules

Expand Down
21 changes: 13 additions & 8 deletions modules/core/src/main/scala/pillars/modules.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,32 @@ trait Module[F[_]]:

def adminControllers: List[Controller[F]] = Nil

def key: Module.Key
end Module

object Module:
trait Key
trait Key:
def name: String

override def toString: String = s"Key($name)"
end Key
end Module

case class Modules[F[_]](private val values: Map[Module.Key, Module[F]]):
def add[K <: Module[F]](value: K): Modules[F] = Modules(values + (value.key -> value))
def get[K](key: Module.Key): K = values(key).asInstanceOf[K]
def add[K <: Module[F]](key: Module.Key)(value: K): Modules[F] = Modules(values + (key -> value))
def get[K](key: Module.Key): K = values(key).asInstanceOf[K]
export values.size
export values.values as all
def probes: List[Probe[F]] = all.flatMap(_.probes).toList
def adminControllers: List[Controller[F]] = all.flatMap(_.adminControllers).toList
def probes: List[Probe[F]] = all.flatMap(_.probes).toList
def adminControllers: List[Controller[F]] = all.flatMap(_.adminControllers).toList
end Modules
object Modules:
def empty[F[_]]: Modules[F] = Modules(Map.empty)

trait Loader:
type M[F[_]] <: Module[F]
def name: String
def dependsOn: Set[Loader] = Set.empty
def key: Module.Key

def dependsOn: Set[Module.Key] = Set.empty

def load[F[_]: Async: Network: Tracer: Console](
context: Loader.Context[F],
Expand Down
25 changes: 10 additions & 15 deletions modules/core/src/main/scala/pillars/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,23 @@ package object pillars:
*/
type Run[F[_], A] = Pillars[F] ?=> A

extension [T](items: Iterable[T])
/**
* Extension method for Iterable[T] to perform topological sorting.
*
* @param dependencies A function that takes an item of type T and returns an Iterable[T] of its dependencies.
* @return Either a sorted List[T] if the graph is acyclic, or an error message if a cycle is detected.
*/
def topologicalSort(dependencies: T => Iterable[T]): Either[String, List[T]] =
extension [K, V](items: Map[K, V])
def topologicalSort(dependencies: V => Iterable[K]): Either[String, List[(K, V)]] =
@annotation.tailrec
def loop(
remaining: Iterable[T],
sorted: List[T],
visited: Set[T],
recursionStack: Set[T]
): Either[String, List[T]] =
remaining: Map[K, V],
sorted: List[(K, V)],
visited: Set[K],
recursionStack: Set[(K, V)]
): Either[String, List[(K, V)]] =
if remaining.isEmpty then Right(sorted)
else
val (noDeps, hasDeps) = remaining.partition(item => dependencies(item).forall(visited.contains))
val (noDeps, hasDeps) = remaining.partition: (_, value) =>
dependencies(value).forall(visited.contains)
if noDeps.isEmpty then
if hasDeps.exists(recursionStack.contains) then Left("Cyclic dependency found")
else loop(hasDeps, sorted, visited, recursionStack ++ hasDeps)
else loop(hasDeps, sorted ++ noDeps.toList, visited ++ noDeps, recursionStack -- noDeps)
else loop(hasDeps, sorted ++ noDeps.toList, visited ++ noDeps.keySet, recursionStack)

loop(items, List.empty, Set.empty, Set.empty)
end pillars
39 changes: 18 additions & 21 deletions modules/core/src/test/scala/pillars/TopologicalSortSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,29 @@ import munit.FunSuite

class TopologicalSortSuite extends FunSuite:
test("topologicalSort returns sorted list for acyclic graph"):
val items = List('A', 'B', 'C', 'D', 'E')
val dependencies: Char => Iterable[Char] =
case 'A' => List('D')
case 'B' => List('D')
case 'C' => List('A', 'B')
case 'D' => List('E')
case 'E' => List()
val dependencies: Map[Char, Iterable[Char]] = Map(
'A' -> List('D'),
'B' -> List('D'),
'C' -> List('A', 'B'),
'D' -> List('E'),
'E' -> List()
)
end dependencies
assertEquals(items.topologicalSort(dependencies), Right(List('E', 'D', 'A', 'B', 'C')))
assertEquals(dependencies.topologicalSort(identity).map(_.map(_._1)), Right(List('E', 'D', 'A', 'B', 'C')))

test("topologicalSort returns error for cyclic graph"):
val items = List('A', 'B', 'C')
val dependencies: Char => Iterable[Char] =
case 'A' => List('B')
case 'B' => List('C')
case 'C' => List('A')
assertEquals(items.topologicalSort(dependencies), Left("Cyclic dependency found"))
val dependencies: Map[Char, Iterable[Char]] = Map(
'A' -> List('B'),
'B' -> List('C'),
'C' -> List('A')
)
assertEquals(dependencies.topologicalSort(identity).map(_.map(_._1)), Left("Cyclic dependency found"))

test("topologicalSort returns sorted list for single node graph"):
val items = List('A')
val dependencies: Char => Iterable[Char] =
case 'A' => List()
assertEquals(items.topologicalSort(dependencies), Right(List('A')))
val dependencies: Map[Char, Iterable[Char]] = Map('A' -> List())
assertEquals(dependencies.topologicalSort(identity).map(_.map(_._1)), Right(List('A')))

test("topologicalSort returns sorted list for disconnected graph"):
val items = List('A', 'B', 'C', 'D', 'E')
val dependencies: Char => Iterable[Char] = _ => List.empty
assertEquals(items.topologicalSort(dependencies), Right(items))
val dependencies: Map[Char, Iterable[Char]] = Map('A' -> Nil, 'B' -> Nil, 'C' -> Nil, 'D' -> Nil, 'E' -> Nil)
assertEquals(dependencies.topologicalSort(identity), Right(dependencies.toList))
end TopologicalSortSuite
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pillars.db.migrations.DBMigrationLoader
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package pillars.db.migrations

import cats.effect.Async
import cats.effect.Resource
import cats.effect.std.Console
import cats.syntax.all.*
import dumbo.ConnectionConfig
import dumbo.Dumbo
import dumbo.Dumbo.MigrationResult
import dumbo.DumboWithResourcesPartiallyApplied
import fs2.io.file.Files
import fs2.io.net.Network
import io.circe.Codec
import io.circe.derivation.Configuration
import io.github.iltotore.iron.*
import io.github.iltotore.iron.constraint.all.*
import mouse.all.anySyntaxMouse
import org.typelevel.otel4s.trace.Tracer
import pillars.Loader
import pillars.Logger
import pillars.Module
import pillars.Modules
import pillars.Pillars
import pillars.Run
import pillars.codec.given
import pillars.db.DatabaseConfig
import pillars.db.DatabaseSchema
import pillars.db.DatabaseTable
import pillars.db.DB
import scala.concurrent.duration.*

final case class DBMigration[F[_]: Async: Console: Tracer: Network: Files](
config: MigrationConfig,
dbConfig: DatabaseConfig
) extends Module[F]:
private val connectionConfig = dumbo.ConnectionConfig(
host = dbConfig.host.toString,
port = dbConfig.port.value,
database = dbConfig.database,
user = dbConfig.username,
password = dbConfig.password.value.some,
ssl = dbConfig.ssl
)

inline def migrateModule(key: Module.Key): Run[F, F[Unit]] =
migrate(
"db/migrations",
DatabaseSchema.pillars,
DatabaseTable(s"${key.name.replaceAll("[^0-9a-zA-Z$_]", "-")}_schema_history".assume)
)
inline def migrate(
path: String,
schema: DatabaseSchema = DatabaseSchema.public,
schemaHistoryTable: DatabaseTable = DatabaseTable(Dumbo.defaults.schemaHistoryTable.assume)
): Run[F, F[Unit]] = (Dumbo.withResourcesIn(path) |> migrate(schema, schemaHistoryTable)).flatMap: result =>
Logger[F].info(s"Migration completed with ${result.migrationsExecuted} migrations executed")

private def migrate(
schema: DatabaseSchema,
table: DatabaseTable
)(dumbo: DumboWithResourcesPartiallyApplied[F]): F[MigrationResult] =
dumbo.withMigrationStateLogAfter(config.logAfter).apply(
connection = connectionConfig,
defaultSchema = schema,
schemas = Set(dbConfig.appSchema),
schemaHistoryTable = table,
validateOnMigrate = config.validateOnMigrate
).runMigration

end migrate
end DBMigration

object DBMigration:
def apply[F[_]](using p: Pillars[F]): DBMigration[F] = p.module[DBMigration[F]](DBMigration.Key)

case object Key extends Module.Key:
override val name: String = "db-migration"
end DBMigration

class DBMigrationLoader extends Loader:
override type M[F[_]] = DBMigration[F]
override val key: Module.Key = DBMigration.Key

override def dependsOn: Set[Module.Key] = Set(DB.Key)

def load[F[_]: Async: Network: Tracer: Console](
context: Loader.Context[F],
modules: Modules[F]
): Resource[F, DBMigration[F]] =
import context.*
given Files[F] = Files.forAsync[F]
Resource.eval:
for
_ <- logger.info("Loading DB Migration module")
dbConfig <- configReader.read[DatabaseConfig]("db")
config <- configReader.read[MigrationConfig]("db-migration")
_ <- logger.info("DB Migration module loaded")
yield DBMigration(config, dbConfig)
end for
end load

end DBMigrationLoader

final case class MigrationConfig(
logAfter: FiniteDuration = 5.seconds,
validateOnMigrate: Boolean = Dumbo.defaults.validateOnMigrate
)
object MigrationConfig:
given Configuration = Configuration.default.withKebabCaseMemberNames.withKebabCaseConstructorNames.withDefaults

given Codec[MigrationConfig] = Codec.AsObject.derivedConfigured

extension [F[_]](p: Pillars[F])
def dbMigration: DBMigration[F] = p.module(DBMigration.Key)

end extension
36 changes: 31 additions & 5 deletions modules/db/src/main/scala/pillars/db/db.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import com.comcast.ip4s.*
import fs2.io.file.Files
import fs2.io.net.Network
import io.circe.Codec
import io.circe.Decoder as CirceDecoder
import io.circe.Encoder as CirceEncoder
import io.circe.derivation.Configuration
import io.github.iltotore.iron.*
import io.github.iltotore.iron.circe.given
Expand All @@ -29,7 +31,6 @@ extension [F[_]](p: Pillars[F])
final case class DB[F[_]: Async: Network: Tracer: Console](pool: Resource[F, Session[F]]) extends Module[F]:
export pool.*

override def key: Module.Key = DB.Key
override def probes: List[Probe[F]] =
val probe = new Probe[F]:
override def component: Component = Component(Component.Name("db"), Component.Type.Datastore)
Expand All @@ -39,13 +40,13 @@ final case class DB[F[_]: Async: Network: Tracer: Console](pool: Resource[F, Ses
end DB

object DB:
case object Key extends Module.Key
case object Key extends Module.Key:
override val name: String = "db"
def apply[F[_]](using p: Pillars[F]): DB[F] = p.module[DB[F]](DB.Key)

class DBLoader extends Loader:
override type M[F[_]] = DB[F]

override def name: String = "db"
override val key: Module.Key = DB.Key

def load[F[_]: Async: Network: Tracer: Console](
context: Loader.Context[F],
Expand Down Expand Up @@ -77,6 +78,10 @@ final case class DatabaseConfig(
database: DatabaseName,
username: DatabaseUser,
password: Secret[DatabasePassword],
ssl: SSL = SSL.None,
systemSchema: DatabaseSchema = DatabaseSchema.public,
appSchema: DatabaseSchema = DatabaseSchema.public,
// TODO: Add system and application schemas (default = public)
poolSize: PoolSize = PoolSize(32),
debug: Boolean = false,
probe: ProbeConfig
Expand All @@ -86,6 +91,19 @@ object DatabaseConfig:
given Configuration = Configuration.default.withKebabCaseMemberNames.withKebabCaseConstructorNames.withDefaults
given Codec[DatabaseConfig] = Codec.AsObject.derivedConfigured

given CirceDecoder[SSL] = CirceDecoder.decodeString.emap {
case "none" => Right(SSL.None)
case "trusted" => Right(SSL.Trusted)
case "system" => Right(SSL.System)
case other => Left(s"Invalid SSL mode: $other")
}
given CirceEncoder[SSL] = CirceEncoder.encodeString.contramap {
case SSL.None => "none"
case SSL.Trusted => "trusted"
case SSL.System => "system"
}
end DatabaseConfig

private type DatabaseNameConstraint = Not[Blank] DescribedAs "Database name must not be blank"
opaque type DatabaseName <: String = String :| DatabaseNameConstraint

Expand All @@ -94,7 +112,15 @@ object DatabaseName extends RefinedTypeOps[String, DatabaseNameConstraint, Datab
private type DatabaseSchemaConstraint = Not[Blank] DescribedAs "Database schema must not be blank"
opaque type DatabaseSchema <: String = String :| DatabaseSchemaConstraint

object DatabaseSchema extends RefinedTypeOps[String, DatabaseSchemaConstraint, DatabaseSchema]
object DatabaseSchema extends RefinedTypeOps[String, DatabaseSchemaConstraint, DatabaseSchema]:
val public: DatabaseSchema = DatabaseSchema("public")
val pillars: DatabaseSchema = DatabaseSchema("pillars")

private type DatabaseTableConstraint =
(Not[Blank] & Match["""^[a-zA-Z_][0-9a-zA-Z$_]{0,63}$"""]) DescribedAs "Database table must be at most 64 characters (letter, digit, dollar sign or underscore) long and start with a letter or an underscore"
opaque type DatabaseTable <: String = String :| DatabaseTableConstraint

object DatabaseTable extends RefinedTypeOps[String, DatabaseTableConstraint, DatabaseTable]

private type DatabaseUserConstraint = Not[Blank] DescribedAs "Database user must not be blank"
opaque type DatabaseUser <: String = String :| DatabaseUserConstraint
Expand Down
1 change: 1 addition & 0 deletions modules/docs/src/docs/user-guide/10_quick-start.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ You can also add optional modules to your dependencies:
--
libraryDependencies ++= Seq(
"com.rlemaitre" %% "pillars-db" % "{site_version}",
"com.rlemaitre" %% "pillars-db-migration" % "{site_version}",
"com.rlemaitre" %% "pillars-flags" % "{site_version}",
"com.rlemaitre" %% "pillars-http-client" % "{site_version}"
)
Expand Down
Loading

0 comments on commit 7084a6b

Please sign in to comment.