Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cats-Effect intergation (WIP) #89

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
18 changes: 18 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ addCommandAlias("fmt", "scalafmtAll")
val testcontainersVersion = "0.41.4"
val circeVersion = "0.14.10"
val munitVersion = "1.1.0"
val munitCatsEffectVersion = "2.0.0"
val postgresDriverVersion = "42.7.4"

lazy val root = project
Expand Down Expand Up @@ -99,3 +100,20 @@ lazy val magnumZio = project
"org.postgresql" % "postgresql" % postgresDriverVersion % Test
)
)

lazy val magnumCats = project
.in(file("magnum-cats-effect"))
.dependsOn(magnum)
.settings(
name := "magnum-ce",
Test / fork := true,
publish / skip := false,
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-effect" % "3.5.7" % Provided,
"org.scalameta" %% "munit" % munitVersion % Test,
"org.typelevel" %% "munit-cats-effect" % munitCatsEffectVersion % Test,
"com.dimafeng" %% "testcontainers-scala-munit" % testcontainersVersion % Test,
"com.dimafeng" %% "testcontainers-scala-postgresql" % testcontainersVersion % Test,
"org.postgresql" % "postgresql" % postgresDriverVersion % Test
)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package com.augustnagro.magnum.magcats

import cats.*
import cats.effect.kernel.*
import cats.effect.kernel.Outcome.*
import cats.effect.std.*
import cats.effect.syntax.all.*
import cats.syntax.all.*
import com.augustnagro.magnum.*

import java.sql.Connection
import javax.sql.DataSource
import scala.util.control.NonFatal

class Transactor[F[_]: Sync] private (
private val dataSource: DataSource,
private val sqlLogger: SqlLogger,
private val connectionConfig: Connection => Unit,
private val rateLimiter: Option[Resource[F, Unit]]
):
private val makeConn = Resource.make(acquireConnection)(releaseConnection)

def withSqlLogger(sqlLogger: SqlLogger): Transactor[F] =
new Transactor(
dataSource,
sqlLogger,
connectionConfig,
rateLimiter
)

def withConnectionConfig(
connectionConfig: Connection => Unit
): Transactor[F] =
new Transactor(
dataSource,
sqlLogger,
connectionConfig,
rateLimiter
)

def connect[A](f: DbCon ?=> A): F[A] =
useRateLimitedConnection: cn =>
Sync[F].delay(connectionConfig(cn)) >>
Sync[F].interruptible(f(using DbCon(cn, sqlLogger)))

def transact[A](f: DbTx ?=> A): F[A] =
useRateLimitedConnection: cn =>
Sync[F]
.delay {
connectionConfig(cn)
cn.setAutoCommit(false)
} >>
Sync[F]
.interruptible(f(using DbTx(cn, sqlLogger)))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you see any downside of transact being interruptible @guizmaii ?

I can see the benefit if the user has a request timeout that triggers during a long-running transaction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any downside. That's even something I might want to backport to the ZIO module 🤔

Note that this interruption is quite brutal: the only way for Cats to interrupt the f call is to call Thread.interrupt, which will stop the thread running the code.
Could this be an issue? 🤔
I don't think so 🤔

.guaranteeCase {
case Succeeded(_) => Sync[F].blocking(cn.commit())
case Errored(_) | Canceled() =>
Sync[F].blocking(cn.rollback())
}

private def useRateLimitedConnection[A](program: Connection => F[A]): F[A] =
val io = makeConn.use(program)
rateLimiter.fold(io)(_.surround(io))

private def acquireConnection: F[Connection] =
KaranAhlawat marked this conversation as resolved.
Show resolved Hide resolved
Sync[F]
.blocking(dataSource.getConnection())
.adaptError(t => SqlException("Unable to acquire DB Connection", t))

private def releaseConnection(conn: Connection): F[Unit] =
KaranAhlawat marked this conversation as resolved.
Show resolved Hide resolved
if conn eq null then Sync[F].unit
KaranAhlawat marked this conversation as resolved.
Show resolved Hide resolved
else
Sync[F]
.blocking(conn.close())
.adaptError(t => SqlException("Unable to close DB connection", t))
end Transactor

object Transactor:
private val noOpConnectionConfig: Connection => Unit = _ => ()

/** Construct a Transactor
*
* @param dataSource
* Datasource to be used
* @param sqlLogger
* Logging configuration
* @param connectionConfig
* Customize the underlying JDBC Connections
* @param maxBlockingThreads
* Number of threads in your connection pool. This helps magcats be more
* memory efficient by limiting the number of blocking pool threads used.
* Not needed if using a virtual-thread based blocking executor (e.g. via
* evalOn)
* @return
* F[Transactor[F]]
*/
def apply[F[_]: Async](
dataSource: DataSource,
sqlLogger: SqlLogger,
connectionConfig: Connection => Unit,
maxBlockingThreads: Int
): F[Transactor[F]] =
assert(maxBlockingThreads > 0)

val rateLimiter =
if maxBlockingThreads == 1 then Mutex[F].map(_.lock)
else Semaphore[F](maxBlockingThreads).map(_.permit)

rateLimiter.map: rl =>
new Transactor(
dataSource,
sqlLogger,
connectionConfig,
Some(rl)
)

/** Construct a Transactor
*
* @param dataSource
* Datasource to be used
* @param sqlLogger
* Logging configuration
* @param maxBlockingThreads
* Number of threads in your connection pool. This helps magcats be more
* memory efficient by limiting the number of blocking pool threads used.
* Not needed if using a virtual-thread based blocking executor (e.g. via
* evalOn)
* @return
* F[Transactor[F]]
*/
def apply[F[_]: Async](
dataSource: DataSource,
sqlLogger: SqlLogger,
maxBlockingThreads: Int
): F[Transactor[F]] =
apply(dataSource, sqlLogger, noOpConnectionConfig, maxBlockingThreads)

/** Construct a Transactor
*
* @param dataSource
* Datasource to be used
* @param maxBlockingThreads
* Number of threads in your connection pool. This helps magcats be more
* memory efficient by limiting the number of blocking pool threads used.
* Not needed if using a virtual-thread based blocking executor (e.g. via
* evalOn)
* @return
* F[Transactor[F]]
*/
def apply[F[_]: Async](
dataSource: DataSource,
maxBlockingThreads: Int
): F[Transactor[F]] =
apply(
dataSource,
SqlLogger.Default,
noOpConnectionConfig,
maxBlockingThreads
)

/** Construct a Transactor
*
* @param dataSource
* Datasource to be used
* @param sqlLogger
* Logging configuration
* @param connectionConfig
* Customize the underlying JDBC Connections
* @return
* F[Transactor[F]]
*/
def apply[F[_]: Sync](
dataSource: DataSource,
sqlLogger: SqlLogger,
connectionConfig: Connection => Unit
): F[Transactor[F]] =
Sync[F].pure(new Transactor(dataSource, sqlLogger, connectionConfig, None))

/** Construct a Transactor
*
* @param dataSource
* Datasource to be used
* @param sqlLogger
* Logging configuration
* @return
* F[Transactor[F]]
*/
def apply[F[_]: Sync](
dataSource: DataSource,
sqlLogger: SqlLogger
): F[Transactor[F]] =
apply(dataSource, sqlLogger, noOpConnectionConfig)

/** Construct a Transactor
*
* @param dataSource
* Datasource to be used
* @return
* F[Transactor[F]]
*/
def apply[F[_]: Sync](
dataSource: DataSource
): F[Transactor[F]] =
apply(dataSource, SqlLogger.Default, noOpConnectionConfig)
end Transactor
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.augustnagro.magnum.magcats

export com.augustnagro.magnum.{
sql,
batchUpdate,
DbCon,
DbTx,
DbType,
Id,
ImmutableRepo,
NullOrder,
Repo,
SeekDir,
SortOrder,
Spec,
SqlName,
SqlNameMapper,
Table,
TableInfo,
DbCodec,
Frag,
ClickhouseDbType,
OracleDbType,
PostgresDbType,
SqliteDbType,
MySqlDbType,
H2DbType
}
10 changes: 10 additions & 0 deletions magnum-cats-effect/src/test/resources/pg/big-dec.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
drop table if exists big_dec cascade;

create table big_dec (
id int primary key,
my_big_dec numeric
);

insert into big_dec values
(1, 123),
(2, null);
15 changes: 15 additions & 0 deletions magnum-cats-effect/src/test/resources/pg/car.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
DROP TABLE IF EXISTS car;

CREATE TABLE car (
model VARCHAR(50) NOT NULL,
id bigint PRIMARY KEY,
top_speed INT NOT NULL,
vin INT,
color TEXT NOT NULL CHECK (color IN ('Red', 'Green', 'Blue')),
created TIMESTAMP WITH TIME ZONE NOT NULL
);

INSERT INTO car (model, id, top_speed, vin, color, created) VALUES
('McLaren Senna', 1, 208, 123, 'Red', '2024-11-24T22:17:30.000000000Z'::timestamptz),
('Ferrari F8 Tributo', 2, 212, 124, 'Green', '2024-11-24T22:17:31.000000000Z'::timestamptz),
('Aston Martin Superleggera', 3, 211, null, 'Blue', '2024-11-24T22:17:32.000000000Z'::timestamptz);
11 changes: 11 additions & 0 deletions magnum-cats-effect/src/test/resources/pg/my-user.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
drop table if exists my_user cascade;

create table my_user (
first_name text not null,
id bigint primary key generated always as identity
);

insert into my_user (first_name) values
('George'),
('Alexander'),
('John');
12 changes: 12 additions & 0 deletions magnum-cats-effect/src/test/resources/pg/no-id.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
drop table if exists no_id;

create table no_id (
created_at timestamptz not null default now(),
user_name text not null,
user_action text not null
);

insert into no_id values
(timestamp '1997-08-15', 'Josh', 'clicked a button'),
(timestamp '1997-08-16', 'Danny', 'opened a toaster'),
(timestamp '1997-08-17', 'Greg', 'ran some QA tests');
20 changes: 20 additions & 0 deletions magnum-cats-effect/src/test/resources/pg/person.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
drop table if exists person cascade;

create table person (
id bigint primary key,
first_name varchar(50),
last_name varchar(50) not null,
is_admin boolean not null,
created timestamptz not null,
social_id UUID
);

insert into person (id, first_name, last_name, is_admin, created, social_id) values
(1, 'George', 'Washington', true, now(), 'd06443a6-3efb-46c4-a66a-a80a8a9a5388'),
(2, 'Alexander', 'Hamilton', true, now(), '529b6c6d-7228-4da5-81d7-13b706f78ddb'),
(3, 'John', 'Adams', true, now(), null),
(4, 'Benjamin', 'Franklin', true, now(), null),
(5, 'John', 'Jay', true, now(), null),
(6, 'Thomas', 'Jefferson', true, now(), null),
(7, 'James', 'Madison', true, now(), null),
(8, null, 'Nagro', false, timestamp '1997-08-12', null);
Loading