Skip to content

Commit

Permalink
v0.14.1 Transaction handling
Browse files Browse the repository at this point in the history
  • Loading branch information
marcgrue committed Dec 19, 2024
1 parent 2f02e54 commit a4fb26d
Show file tree
Hide file tree
Showing 17 changed files with 263 additions and 859 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,12 @@ lazy val yourProject = project.in(file("app"))
.settings(
libraryDependencies ++= Seq(
// One or more of:
"org.scalamolecule" %%% "molecule-sql-postgres" % "0.14.0",
"org.scalamolecule" %%% "molecule-sql-sqlite" % "0.14.0",
"org.scalamolecule" %%% "molecule-sql-mysql" % "0.14.0",
"org.scalamolecule" %%% "molecule-sql-mariadb" % "0.14.0",
"org.scalamolecule" %%% "molecule-sql-h2" % "0.14.0",
"org.scalamolecule" %%% "molecule-datalog-datomic" % "0.14.0",
"org.scalamolecule" %%% "molecule-sql-postgres" % "0.14.1",
"org.scalamolecule" %%% "molecule-sql-sqlite" % "0.14.1",
"org.scalamolecule" %%% "molecule-sql-mysql" % "0.14.1",
"org.scalamolecule" %%% "molecule-sql-mariadb" % "0.14.1",
"org.scalamolecule" %%% "molecule-sql-h2" % "0.14.1",
"org.scalamolecule" %%% "molecule-datalog-datomic" % "0.14.1",
),
moleculeSchemas := Seq("app/dataModel") // paths to directories with Data Model definition files
)
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ inThisBuild(
organizationName := "ScalaMolecule",
organizationHomepage := Some(url("http://www.scalamolecule.org")),
versionScheme := Some("early-semver"),
version := "0.14.0",
version := "0.14.1",
scalaVersion := scala213,
crossScalaVersions := allScala,

Expand Down
2 changes: 2 additions & 0 deletions core/shared/src/main/scala/molecule/core/api/Api_async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,13 @@ trait Api_async_transact { api: Api_async with Spi_async =>

def unitOfWork[T](body: => Future[T])
(implicit conn: Conn, ec: EC): Future[T] = {
conn.setInsideUOW(true)
conn.waitCommitting()
body
.map { t =>
// Commit all actions
conn.commit()
conn.setInsideUOW(false)
t
}
.recover { case e =>
Expand Down
2 changes: 2 additions & 0 deletions core/shared/src/main/scala/molecule/core/api/Api_io.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,13 @@ trait Api_io_transact { api: Api_io with Spi_io =>


def unitOfWork[T](body: => IO[T])(implicit conn: Conn): IO[T] = {
conn.setInsideUOW(true)
conn.waitCommitting()
body.attempt.map {
case Right(t) =>
// Commit all actions
conn.commit()
conn.setInsideUOW(false)
t
case Left(error: Throwable) =>
// Rollback all executed actions so far
Expand Down
2 changes: 2 additions & 0 deletions core/shared/src/main/scala/molecule/core/api/Api_sync.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,13 @@ trait Api_sync_transact { api: Api_sync with Spi_sync =>


def unitOfWork[T](body: => T)(implicit conn: Conn): T = {
conn.setInsideUOW(true)
conn.waitCommitting()
try {
val result = body
// Commit all actions
conn.commit()
conn.setInsideUOW(false)
result
} catch {
case NonFatal(e) =>
Expand Down
2 changes: 2 additions & 0 deletions core/shared/src/main/scala/molecule/core/api/Api_zio.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,13 @@ trait Api_zio_transact { api: Api_zio with Spi_zio =>
def unitOfWork[T](body: => ZIO[Conn, MoleculeError, T]): ZIO[Conn, MoleculeError, T] = {
for {
conn <- ZIO.service[Conn]
_ = conn.setInsideUOW(true)
_ = conn.waitCommitting()
result <- body
.map { t =>
// Commit all actions
conn.commit()
conn.setInsideUOW(false)
t
}
.mapError { error =>
Expand Down
23 changes: 16 additions & 7 deletions core/shared/src/main/scala/molecule/core/spi/Conn.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@ import scala.concurrent.{ExecutionContext, Future}
abstract class Conn(val proxy: ConnProxy)
extends ModelUtils { self: DataType =>

protected var commit_ = true

def waitCommitting(): Unit = ???
def commit(): Unit = ???
def rollback(): Unit = ???

def transact_async(data: Data)
(implicit ec: ExecutionContext): Future[TxReport] =
Expand All @@ -36,7 +31,7 @@ abstract class Conn(val proxy: ConnProxy)
ExecutionError(s"`$method` only implemented on JVM platform.")


// Subscriptions --------------------------------------------------------------
// Subscriptions -------------------------------------------------------------

private var callbacks = List.empty[(List[Element], (Set[String], Boolean) => Future[Unit])]

Expand All @@ -57,6 +52,16 @@ abstract class Conn(val proxy: ConnProxy)
callbacks = callbacks.filterNot(_._1 == elements)
}


// Transaction handling ------------------------------------------------------

private var uow_ = false
protected var commit_ = true

def waitCommitting(): Unit = ???
def commit(): Unit = ???
def rollback(): Unit = ???

def savepoint_sync[T](body: Savepoint => T): T = ???
def savepoint_async[T](body: Savepoint => Future[T])
(implicit ec: ExecutionContext): Future[T] = ???
Expand All @@ -67,7 +72,11 @@ abstract class Conn(val proxy: ConnProxy)

def savepoint_io[T](body: Savepoint => IO[T]): IO[T] = ???

def hasSavepoint: Boolean = ???

def setInsideUOW(inside: Boolean): Unit = uow_ = inside
def isInsideUOW: Boolean = uow_

def isInsideSavepoint: Boolean = ???

def setAutoCommit(bool: Boolean): Unit = ???
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,39 +132,123 @@ trait Transactions_async extends CoreTestSuite with Api_async with Api_async_tra
}


"mixed, unified actions" - types { implicit conn =>
"mixed with queries" - types { implicit conn =>
for {
_ <- unitOfWork {
for {
_ <- Ns.int(1).save.transact
_ <- Ns.int.query.get.map(_ ==> List(1))

_ <- Ns.int.insert(2, 3).transact
_ <- Ns.int.query.get.map(_ ==> List(1, 2, 3))

_ <- Ns(1).delete.transact
_ <- Ns.int.query.get.map(_ ==> List(2, 3))

_ <- Ns(3).int.*(10).update.transact
_ <- Ns.int.query.get.map(_ ==> List(2, 30))
} yield ()
}
_ <- Ns.int.query.get.map(_ ==> List(2, 30))
} yield ()
}


"mixed, with queries" - types { implicit conn =>
"abort save" - types { implicit conn =>
for {
_ <- Ns.int(1).save.transact
_ <- unitOfWork {
for {
_ <- Ns.int(1).save.transact
_ <- Ns.int.query.get.map(_ ==> List(1))
_ <- Ns.int(2).save.transact
_ <- Ns.int.query.get.map(_ ==> List(1, 2))
_ = throw new Exception()
} yield ()
}.recover {
case _: Exception => ()
}
_ <- Ns.int.query.get.map(_ ==> List(1))
} yield ()
}


"abort insert" - types { implicit conn =>
for {
_ <- Ns.int(1).save.transact
_ <- unitOfWork {
for {
_ <- Ns.int.insert(2, 3).transact
_ <- Ns.int.query.get.map(_ ==> List(1, 2, 3))
_ = throw new Exception()
} yield ()
}.recover {
case _: Exception => ()
}
_ <- Ns.int.query.get.map(_ ==> List(1))
} yield ()
}


"abort update" - types { implicit conn =>
for {
_ <- Ns.int(1).save.transact
_ <- unitOfWork {
for {
_ <- Ns(1).int.*(10).update.transact
_ <- Ns.int.query.get.map(_ ==> List(10))
_ = throw new Exception()
} yield ()
}.recover {
case _: Exception => ()
}
_ <- Ns.int.query.get.map(_ ==> List(1))
} yield ()
}


"abort delete" - types { implicit conn =>
for {
_ <- Ns.int.insert(1, 2).transact
_ <- unitOfWork {
for {
_ <- Ns(1).delete.transact
_ <- Ns.int.query.get.map(_ ==> List(2, 3))
_ <- Ns.int.query.get.map(_ ==> List(1, 2))
_ = throw new Exception()
} yield ()
}.recover {
case _: Exception => ()
}
_ <- Ns.int.query.get.map(_ ==> List(1, 2))
} yield ()
}

_ <- Ns(3).int.*(10).update.transact
_ <- Ns.int.query.get.map(_ ==> List(2, 30))

"abort mixed" - types { implicit conn =>
for {
// Initial data
_ <- Ns.int(1).save.transact

_ <- unitOfWork {
for {
_ <- Ns.int(2).save.transact
_ <- Ns.int.query.get.map(_ ==> List(1, 2))

_ <- Ns.int.insert(3, 4).transact
_ <- Ns.int.query.get.map(_ ==> List(1, 2, 3, 4))

_ <- Ns(2).delete.transact
_ <- Ns.int.query.get.map(_ ==> List(1, 3, 4))

_ <- Ns(4).int.*(10).update.transact
_ <- Ns.int.query.get.map(_ ==> List(1, 3, 40))

_ = throw new Exception()
} yield ()
}.recover {
case _: Exception => ()
}
_ <- Ns.int.query.get.map(_ ==> List(2, 30))

// Initial data remains intact
_ <- Ns.int.query.get.map(_ ==> List(1))
} yield ()
}

Expand Down Expand Up @@ -273,7 +357,7 @@ trait Transactions_async extends CoreTestSuite with Api_async with Api_async_tra
}
}

// Without rollbacks, the above is the same as the following:
// Without rollbacks, the above is the same as the following:
"commit2" - types { implicit conn =>
for {
_ <- Ns.int.insert(1 to 4).transact
Expand Down
Loading

0 comments on commit a4fb26d

Please sign in to comment.