-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: Minor cleanup of DynamoDB OffsetStoreDao #1262
Conversation
@@ -13,7 +13,8 @@ akka.projection.dynamodb { | |||
time-window = 5 minutes | |||
|
|||
# Trying to batch insert offsets in batches of this size. | |||
offset-batch-size = 20 | |||
# Must be less than or equal to 25 (hard limit in DynamoDB) | |||
offset-batch-size = 25 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would probably not have made this configurable since it's a hard limit in DynamoDB, but since we had this already in the config/settings and it wasn't used.
@@ -63,7 +62,7 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse | |||
val managementStateBySlicePid = AttributeValue.fromS("_mgmt") | |||
} | |||
|
|||
final class BatchWriteFailed(val lastResponse: BatchWriteItemResponse) extends Exception | |||
final class BatchWriteFailed(val lastResponse: BatchWriteItemResponse) extends RuntimeException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't matter in Scala, but just to follow our conventions
@@ -143,13 +140,14 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse | |||
if (log.isDebugEnabled) { | |||
val count = unprocessed.asScala.valuesIterator.map(_.size).sum | |||
log.debug( | |||
"Not all writes in batch were applied, retrying in [{}]: [{}] unapplied writes, [{}/{}] retries", | |||
delay.toCoarsest, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't work great for exponential backoff delays since those will become full random nanos
val unprocessedSeqNrs = unprocessedSeqNrItems.map { item => | ||
import OffsetStoreDao.OffsetStoreAttributes._ | ||
s"${item.get(NameSlice).s}: ${item.get(Pid).s}" | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was kind of double iterations and collection constructions of those
I looked at the test failure. It's unrelated and I see what is wrong, in the test. I'll follow up on that in separate PR |
No description provided.