diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala index 7c7bc8f2..7cca3f15 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala @@ -367,7 +367,7 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection r2dbcExecutor.withConnection(s"insert [${state.persistenceId}] with change handler") { connection => for { updatedRows <- recoverDataIntegrityViolation(R2dbcExecutor.updateOneInTx(insertStatement(connection))) - _ <- processChange(handler, connection, change) + _ <- if (updatedRows == 1) processChange(handler, connection, change) else FutureDone } yield updatedRows } } @@ -412,7 +412,7 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection r2dbcExecutor.withConnection(s"update [${state.persistenceId}] with change handler") { connection => for { updatedRows <- R2dbcExecutor.updateOneInTx(updateStatement(connection)) - _ <- processChange(handler, connection, change) + _ <- if (updatedRows == 1) processChange(handler, connection, change) else FutureDone } yield updatedRows } } @@ -492,7 +492,7 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection R2dbcExecutor.updateOneInTx(insertDeleteMarkerStatement(connection))) _ <- changeHandler match { case None => FutureDone - case Some(handler) => processChange(handler, connection, change) + case Some(handler) => if (updatedRows == 1) processChange(handler, connection, change) else FutureDone } } yield updatedRows } @@ -539,7 +539,7 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection updatedRows <- R2dbcExecutor.updateOneInTx(updateStatement(connection)) _ <- changeHandler match { case None => FutureDone - case Some(handler) => processChange(handler, connection, change) + case Some(handler) => if (updatedRows == 1) processChange(handler, connection, change) else FutureDone } } yield updatedRows } @@ -575,8 +575,11 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection _ <- changeHandler match { case None => FutureDone case Some(handler) => - val change = new DeletedDurableState[Any](persistenceId, 0L, NoOffset, EmptyDbTimestamp.toEpochMilli) - processChange(handler, connection, change) + if (updatedRows == 1) { + val change = new DeletedDurableState[Any](persistenceId, 0L, NoOffset, EmptyDbTimestamp.toEpochMilli) + processChange(handler, connection, change) + } else + FutureDone } } yield updatedRows } diff --git a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreChangeHandlerSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreChangeHandlerSpec.scala index 0fa84763..e70f72e5 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreChangeHandlerSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreChangeHandlerSpec.scala @@ -103,12 +103,15 @@ class DurableStateStoreChangeHandlerSpec private val unusedTag = "n/a" private def exists(whereCondition: String): Boolean = + count(whereCondition) >= 1 + + private def count(whereCondition: String): Long = r2dbcExecutor .selectOne("count")( _.createStatement(s"select count(*) from $anotherTable where $whereCondition"), row => row.get(0, classOf[java.lang.Long]).longValue()) .futureValue - .contains(1) + .getOrElse(0L) "The R2DBC durable state store change handler" should { @@ -180,6 +183,23 @@ class DurableStateStoreChangeHandlerSpec exists(s"pid = '$persistenceId' and rev = 2") should be(false) } + "not be invoked when wrong revision" in { + val entityType = "CustomEntity" + val persistenceId = nextPid(entityType) + val value = "Genuinely Collaborative" + store.upsertObject(persistenceId, 1L, value, unusedTag).futureValue + count(s"pid = '$persistenceId'") should be(1L) + + store.upsertObject(persistenceId, 1L, value, unusedTag).failed.futureValue + count(s"pid = '$persistenceId'") should be(1L) // not called (or rolled back) + + val updatedValue = "Open to Feedback" + store.upsertObject(persistenceId, 2L, updatedValue, unusedTag).futureValue + count(s"pid = '$persistenceId'") should be(2L) + store.upsertObject(persistenceId, 2L, updatedValue, unusedTag).failed.futureValue + count(s"pid = '$persistenceId'") should be(2L) // not called (or rolled back) + } + "support javadsl.ChangeHandler" in { val entityType = "JavadslCustomEntity" val persistenceId = nextPid(entityType)