Skip to content

Commit

Permalink
Not intended for merging
Browse files Browse the repository at this point in the history
Trying to emulate what projections does where the missed write is observed.
No success to reproduce so far.
  • Loading branch information
johanandren committed Sep 8, 2023
1 parent def5713 commit 49c68e5
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 3 deletions.
4 changes: 2 additions & 2 deletions core/src/test/scala/akka/persistence/r2dbc/TestConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ object TestConfig {
case "h2" =>
ConfigFactory.parseString(s"""
akka.persistence.r2dbc.connection-factory {
protocol = "file"
database = "./target/h2-test-db"
protocol = "mem"
database = "h2-test-db"
trace-logging = on
}
""")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,32 @@

package akka.persistence.r2dbc.internal

import akka.Done

import scala.concurrent.Await
import scala.concurrent.duration._

import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorSystem
import akka.persistence.r2dbc.ConnectionFactoryProvider
import akka.persistence.r2dbc.TestConfig
import akka.persistence.r2dbc.TestData
import akka.persistence.r2dbc.TestDbLifecycle
import akka.persistence.r2dbc.internal.Sql.Interpolation
import akka.persistence.r2dbc.internal.h2.H2Dialect
import akka.persistence.r2dbc.internal.postgres.PostgresDialect
import akka.persistence.r2dbc.internal.postgres.YugabyteDialect
import akka.persistence.r2dbc.session.scaladsl.R2dbcSession
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import io.r2dbc.spi.Connection
import io.r2dbc.spi.R2dbcNonTransientResourceException
import io.r2dbc.spi.Wrapped
import org.scalatest.wordspec.AnyWordSpecLike
import org.slf4j.LoggerFactory

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

object R2dbcExecutorSpec {
val config: Config = ConfigFactory
Expand Down Expand Up @@ -123,5 +131,75 @@ class R2dbcExecutorSpec
result.failed.futureValue shouldBe a[R2dbcNonTransientResourceException]
}

"see the previously written value across threads in H2" in {
implicit val ec: ExecutionContext = typedSystem.executionContext
val executor = new R2dbcExecutor(
ConnectionFactoryProvider(typedSystem)
.connectionFactoryFor(testConfigPath + ".connection-factory"),
LoggerFactory.getLogger(getClass),
r2dbcSettings.logDbCallsExceeding,
r2dbcSettings.connectionFactorySettings.poolSettings.closeCallsExceeding)(
typedSystem.executionContext,
typedSystem)
val table = s"sometable${System.currentTimeMillis()}"
Await.result(
executor.executeDdl("beforeAll createTable") { conn =>
conn.createStatement(s"""|CREATE table IF NOT EXISTS $table (
| id INT NOT NULL,
| avalue BIGINT NOT NULL,
| PRIMARY KEY(id)
|);""".stripMargin)
},
10.seconds)

def asyncLoop(n: Long, repeatUntil: Long): Future[Done] = {
if (n == repeatUntil) Future.successful(Done)
else {
val nextN = n + 1L
if (n % 1000L == 0) print("X")
else print(".")
r2dbcExecutor
.withConnection(s"iteration $n") { connection =>
val session = new R2dbcSession(connection)
val statement = session.createStatement(s"SELECT avalue FROM $table WHERE id = 1")
R2dbcExecutor
.selectOneInTx(statement, row => row.get("avalue", classOf[java.lang.Long]))
.flatMap { read =>
read match {
case None if n != 1 => throw new IllegalStateException("existing row for first read")
case None =>
case Some(value) if n != value => throw new IllegalStateException(s"expected $n but saw $value")
case Some(_) =>

}

val update = session
.createStatement(sql"""
MERGE INTO $table (id, avalue)
KEY (id)
VALUES (?, ?)
""")
.bind(0, 1)
.bind(1, nextN)

R2dbcExecutor
.updateOneInTx(update)
.map { updatedRows =>
if (updatedRows != 1) throw new IllegalStateException(s"Updated rows $updatedRows for $nextN")
else Done
}
}
}
.flatMap { _ =>
asyncLoop(nextN, repeatUntil)
}

}
}

asyncLoop(1L, 100000L).futureValue(timeout(1.hour))

}

}
}

0 comments on commit 49c68e5

Please sign in to comment.