-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Utility improvements * Add connect & transact methods directly on Transactor * Add commonly used classes under magnum.common.* export, so users don't get swamped by every class living in the magnum package * Add magzio.Transactor.semaphore to save memory when not using the Virtual-Thread based blocking executor * actually use semaphore * remove .uninterruptible from zio connect method * Review: Prefer layers to build `Transactor` instances (#88) * use ZLayer.fromZIO instead of Semaphore.unsafe * null check in zio Transactor.releaseConnection.. just in case * try fix ci * formatting --------- Co-authored-by: Jules Ivanic <[email protected]>
- Loading branch information
1 parent
db93b32
commit 31de10b
Showing
34 changed files
with
501 additions
and
280 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
177 changes: 177 additions & 0 deletions
177
magnum-zio/src/main/scala/com/augustnagro/magnum/magzio/Transactor.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
package com.augustnagro.magnum.magzio | ||
|
||
import com.augustnagro.magnum.{DbCon, DbTx, SqlException, SqlLogger} | ||
import zio.{Semaphore, Task, Trace, UIO, ULayer, ZIO, ZLayer} | ||
|
||
import java.sql.Connection | ||
import javax.sql.DataSource | ||
import scala.util.control.NonFatal | ||
|
||
class Transactor private ( | ||
dataSource: DataSource, | ||
sqlLogger: SqlLogger, | ||
connectionConfig: Connection => Unit, | ||
semaphore: Option[Semaphore] | ||
): | ||
|
||
def withSqlLogger(sqlLogger: SqlLogger): Transactor = | ||
new Transactor( | ||
dataSource, | ||
sqlLogger, | ||
connectionConfig, | ||
semaphore | ||
) | ||
|
||
def withConnectionConfig(connectionConfig: Connection => Unit): Transactor = | ||
new Transactor( | ||
dataSource, | ||
sqlLogger, | ||
connectionConfig, | ||
semaphore | ||
) | ||
|
||
def connect[A](f: DbCon ?=> A)(using Trace): Task[A] = | ||
val zio = ZIO.blocking( | ||
ZIO.acquireReleaseWith(acquireConnection)(releaseConnection)(cn => | ||
ZIO.attempt { | ||
connectionConfig(cn) | ||
f(using DbCon(cn, sqlLogger)) | ||
} | ||
) | ||
) | ||
semaphore.fold(zio)(_.withPermit(zio)) | ||
|
||
def transact[A](f: DbTx ?=> A)(using Trace): Task[A] = | ||
val zio = ZIO.blocking( | ||
ZIO.acquireReleaseWith(acquireConnection)(releaseConnection)(cn => | ||
ZIO.attempt { | ||
connectionConfig(cn) | ||
cn.setAutoCommit(false) | ||
try | ||
val res = f(using DbTx(cn, sqlLogger)) | ||
cn.commit() | ||
res | ||
catch | ||
case NonFatal(t) => | ||
cn.rollback() | ||
throw t | ||
}.uninterruptible | ||
) | ||
) | ||
semaphore.fold(zio)(_.withPermit(zio)) | ||
|
||
private def acquireConnection(using Trace): Task[Connection] = | ||
ZIO | ||
.attempt(dataSource.getConnection()) | ||
.mapError(t => SqlException("Unable to acquire DB Connection", t)) | ||
|
||
private def releaseConnection(con: Connection)(using Trace): UIO[Unit] = | ||
if con eq null then ZIO.unit | ||
else | ||
ZIO | ||
.attempt(con.close()) | ||
.orDieWith(t => | ||
SqlException("Unable to close DB Connection, will die", t) | ||
) | ||
end Transactor | ||
|
||
object Transactor: | ||
private val noOpConnectionConfig: Connection => Unit = _ => () | ||
|
||
/** Construct a Transactor | ||
* | ||
* @param dataSource | ||
* Datasource to be used | ||
* @param sqlLogger | ||
* Logging configuration | ||
* @param connectionConfig | ||
* Customize the underlying JDBC Connections | ||
* @param maxBlockingThreads | ||
* Number of threads in your connection pool. This helps magzio be more | ||
* memory efficient by limiting the number of blocking pool threads used. | ||
* Not needed if using the ZIO virtual-thread based blocking executor | ||
* @return | ||
* Transactor UIO | ||
*/ | ||
def layer( | ||
dataSource: DataSource, | ||
sqlLogger: SqlLogger, | ||
connectionConfig: Connection => Unit, | ||
maxBlockingThreads: Option[Int] | ||
): ULayer[Transactor] = | ||
ZLayer.fromZIO { | ||
ZIO | ||
.fromOption(maxBlockingThreads) | ||
.flatMap(threads => Semaphore.make(threads)) | ||
.unsome | ||
.map(semaphoreOpt => | ||
new Transactor( | ||
dataSource, | ||
sqlLogger, | ||
connectionConfig, | ||
semaphoreOpt | ||
) | ||
) | ||
} | ||
|
||
/** Construct a Transactor | ||
* | ||
* @param dataSource | ||
* Datasource to be used | ||
* @param sqlLogger | ||
* Logging configuration | ||
* @param connectionConfig | ||
* Customize the underlying JDBC Connections | ||
* @return | ||
* Transactor UIO | ||
*/ | ||
def layer( | ||
dataSource: DataSource, | ||
sqlLogger: SqlLogger, | ||
connectionConfig: Connection => Unit | ||
): ULayer[Transactor] = | ||
layer( | ||
dataSource, | ||
sqlLogger, | ||
connectionConfig, | ||
None | ||
) | ||
|
||
/** Construct a Transactor | ||
* | ||
* @param dataSource | ||
* Datasource to be used | ||
* @param sqlLogger | ||
* Logging configuration | ||
* @return | ||
* Transactor UIO | ||
*/ | ||
def layer(dataSource: DataSource, sqlLogger: SqlLogger): ULayer[Transactor] = | ||
layer(dataSource, sqlLogger, noOpConnectionConfig, None) | ||
|
||
/** Construct a Transactor | ||
* | ||
* @param dataSource | ||
* Datasource to be used | ||
* @return | ||
* Transactor UIO | ||
*/ | ||
def layer(dataSource: DataSource): ULayer[Transactor] = | ||
layer(dataSource, SqlLogger.Default, noOpConnectionConfig, None) | ||
|
||
/** Construct a Transactor | ||
* | ||
* @param dataSource | ||
* Datasource to be used | ||
* @param connectionConfig | ||
* Customize the underlying JDBC Connections | ||
* @return | ||
* Transactor UIO | ||
*/ | ||
def layer( | ||
dataSource: DataSource, | ||
connectionConfig: Connection => Unit | ||
): ULayer[Transactor] = | ||
layer(dataSource, SqlLogger.Default, connectionConfig, None) | ||
|
||
end Transactor |
28 changes: 28 additions & 0 deletions
28
magnum-zio/src/main/scala/com/augustnagro/magnum/magzio/exports.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
package com.augustnagro.magnum.magzio | ||
|
||
export com.augustnagro.magnum.{ | ||
sql, | ||
batchUpdate, | ||
DbCon, | ||
DbTx, | ||
DbType, | ||
Id, | ||
ImmutableRepo, | ||
NullOrder, | ||
Repo, | ||
SeekDir, | ||
SortOrder, | ||
Spec, | ||
SqlName, | ||
SqlNameMapper, | ||
Table, | ||
TableInfo, | ||
DbCodec, | ||
Frag, | ||
ClickhouseDbType, | ||
OracleDbType, | ||
PostgresDbType, | ||
SqliteDbType, | ||
MySqlDbType, | ||
H2DbType | ||
} |
Oops, something went wrong.