Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support for r2dbc sqlserver #1122

Merged
merged 8 commits into from
May 6, 2024

Conversation

sebastian-alfers
Copy link
Contributor

Still depends on a snapshot version, so do not merge.

@sebastian-alfers sebastian-alfers marked this pull request as draft February 4, 2024 16:32
Copy link
Member

@patriknw patriknw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking good

project/Dependencies.scala Outdated Show resolved Hide resolved
build.sbt Outdated Show resolved Hide resolved
.mkString(",")
sql"""
$base
AND CONCAT(persistence_id, '-', seq_nr) NOT IN ($listParam)"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not possible to use a bind parameter for that $listParam? such things could be source of sql injection

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, but I can have another look.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, does not seem it works:

Cause: java.lang.IllegalArgumentException: Cannot encode [[Ljava.lang.String;@5b565160] parameter of type [[Ljava.lang.String;]

The currently supported codecs can be found here representing this list of java types. The maintainer does mention this as well here.

I am trying now to transform each item within notInLatestBySlice to its own param to end up with ... NOT IN(?,?,?,...) and then one single .bind() per element.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I my last commit I removed the pending case for sqlserver. The test is still flaky, but at least it went green now to indicate this change ⬆️ works. It if keeps being flaky, we can re-add the pending exclusion, but not we at least know it works.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good that you found a way to use parameters.

Now the concat of the pid and seqNr in notInLatestBySlice is done in the OffsetStore, and then the dao is assumed to use the same format. We can make a more clear interface by adding a:

case class LatestBySlice(slice: Int, pid: String, seqNr: Long)

and the dao method would be

def deleteOldTimestampOffsetSql(notInLatestBySlice: Seq[LatestBySlice])

The main reason why I started thinking about that was that it might be good to use fixed number of parameters in the sql. Otherwise we can end up with very many different prepared statements.

That fixed number of parameters would then be the same as number of slices in the slice range. For the slices that are not included in the notInLatestBySlice we could still bind a fake value (such as "-") to the parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I do understand you right, you suggest one param per (timestampOffsetBySlicesSourceProvider.minSlice to timestampOffsetBySlicesSourceProvider.maxSlice). Is that right? Then there is a limit in the server:

io.r2dbc.mssql.ExceptionFactory$MssqlNonTransientException: The incoming request has too many parameters. The server supports a maximum of 2100 parameters. Reduce the number of parameters and resend the request.
	at io.r2dbc.mssql.ExceptionFactory.createException(ExceptionFactory.java:154)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I can fix this, let me see.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is right, should be at most 1024 parameters because we have at most 1024 slices for one Projection instance (for one R2dbcOffsetStore).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed my take on how I understood your suggestion.

Copy link
Member

@patriknw patriknw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good, just something minor

@sebastian-alfers sebastian-alfers marked this pull request as ready for review February 27, 2024 10:26
@ennru ennru requested a review from patriknw February 27, 2024 11:09
Copy link
Member

@patriknw patriknw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking good, only minor comments, and the docs should also be updated https://doc.akka.io/docs/akka-projection/current/r2dbc.html

@@ -42,13 +53,16 @@ private[projection] class PostgresOffsetStoreDao(
projectionId: ProjectionId)
extends OffsetStoreDao {

private val logger = LoggerFactory.getLogger(getClass)
protected def logger: Logger = LoggerFactory.getLogger(getClass)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you do the same as in a-p-r2dbc to avoid accessing LoggerFactory each time:

protected def log: Logger = PostgresOffsetStoreDao.log

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should PostgresOffsetStoreDao.log then be a val, or what is the purpose?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, so that it's created once instead of for each log.x call

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sebastian-alfers and others added 3 commits March 15, 2024 18:55
…ernal/PostgresOffsetStoreDao.scala

Co-authored-by: Patrik Nordwall <[email protected]>
…ernal/R2dbcOffsetStore.scala

Co-authored-by: Patrik Nordwall <[email protected]>
@ennru ennru requested a review from patriknw March 20, 2024 10:44
*/
@InternalApi
private[projection] object PostgresOffsetStoreDao {
private def log: Logger = LoggerFactory.getLogger(classOf[PostgresOffsetStoreDao])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a val

@@ -42,13 +61,16 @@ private[projection] class PostgresOffsetStoreDao(
projectionId: ProjectionId)
extends OffsetStoreDao {

private val logger = LoggerFactory.getLogger(getClass)
protected def logger: Logger = PostgresOffsetStoreDao.log
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to change this from val to def in the first place? doesn't getLogger(getClass) do exactly what we want? if that is the case we don't need the extra val in the companion object

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, in a-p-re2dc, I had to add it to avoid some conflicts caused by inheritance. Removing it here, maybe it was needed at some point, but is not anymore.

Copy link
Member

@patriknw patriknw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking good, only some docs and then we can merge

@@ -57,6 +57,12 @@ The table below shows `akka-projection-r2dbc`'s direct dependencies, and the sec
The `akka_projection_offset_store`, `akka_projection_timestamp_offset_store` and `akka_projection_management` tables
need to be created in the configured database:

@@@ warning
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@patriknw As it is now, this warning is shown regardless of the chosen dialect tab.

So, in a-p-r2dbc, we took advantage of the paradoxGroups which allowed to make this a condition.

To get this warning only render if "SQLServer" tab is checked, we would also have to introduce paradoxGroups in the build.sbt, however this would show in all pages (that have no reference to those dialects). Any idea? I tried to hide the html-rendering of the dialect drop-down in the top-left via JS, and it worked, but it was not perfect.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine, let's show the warning for all dialects here.

Copy link
Member

@patriknw patriknw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks

@johanandren johanandren merged commit 764bc38 into akka:main May 6, 2024
22 checks passed
@johanandren johanandren added this to the 1.5.4 milestone May 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants