Skip to content

Commit

Permalink
#40 First attempt to batch the target inserts
Browse files Browse the repository at this point in the history
- divides each target row into batches
- batch searches for whether they already exist from source
- if they do, updates them one-by-one (haven't done bulk updates with Micronaut Data just yet)
- any remaining which were not found+updated are batch inserted

Still to resolve
- large batch sizes cause things to lock up and halt - some kind of exhaustion, probably of connections
- this leaves the slowest bit the updates to existing rows which are done one-by-one
  • Loading branch information
chadlwilson committed Nov 17, 2021
1 parent c3b55a3 commit 69f36d9
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 26 deletions.
2 changes: 1 addition & 1 deletion src/main/kotlin/recce/server/RecConfiguration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ interface PostConstructable {
class RecConfiguration
@ConfigurationInject constructor(
val datasets: Map<String, DatasetConfiguration>,
@Bindable(defaultValue = "1000") val defaultBatchSize: Int = 1000
@Bindable(defaultValue = "100") val defaultBatchSize: Int = 100
) : PostConstructable {

@PostConstruct
Expand Down
46 changes: 35 additions & 11 deletions src/main/kotlin/recce/server/dataset/DatasetRecService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package recce.server.dataset
import jakarta.inject.Inject
import jakarta.inject.Singleton
import mu.KotlinLogging
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.kotlin.core.util.function.component1
import reactor.kotlin.core.util.function.component2
Expand Down Expand Up @@ -64,27 +65,50 @@ open class DatasetRecService(
.map { it.invoke() }
.doOnNext { logger.info { "Load from source completed" } }

private fun loadFromTarget(target: DataLoadDefinition, run: Mono<RecRun>): Mono<DatasetMeta> =
target.runQuery()
private fun loadFromTarget(target: DataLoadDefinition, run: Mono<RecRun>): Mono<DatasetMeta> {
val bufferedRows = target.runQuery()
.doOnNext { logger.info { "Target query completed; streaming to Recce DB" } }
.flatMap { result -> result.map(HashedRow::fromRow) }
.buffer(config.defaultBatchSize)
.zipWith(run.repeat())
.flatMap { (row, run) ->
recordRepository
.existsByRecRunIdAndMigrationKey(run.id!!, row.migrationKey)
.flatMap { exists ->
when (exists) {
true -> recordRepository.updateByRecRunIdAndMigrationKey(run.id, row.migrationKey, targetData = row.hashedValue).thenReturn(row)
false -> recordRepository.save(RecRecord(recRunId = run.id, migrationKey = row.migrationKey, targetData = row.hashedValue))
}
val batches: Flux<() -> DatasetMeta> = bufferedRows
.flatMap { (rows, run) ->
// logger.info { "Processing batch of size [${rows.size}] from target"}
val toPersist = rows.associateByTo(mutableMapOf()) { it.migrationKey }
val updatedRows = recordRepository
.findByRecRunIdAndMigrationKeyIn(run.id!!, rows.map { it.migrationKey })
.flatMap { found ->
recordRepository.updateByRecRunIdAndMigrationKey(
found.recRunId,
found.migrationKey,
targetData = toPersist.remove(found.migrationKey)?.hashedValue
).then(Mono.just(found))
}
.then(Mono.just(row.lazyMeta()))
val newRows = Flux
.defer {
// logger.info { "Checking new rows for batch of size [${toPersist.size}]" }
if (toPersist.isEmpty()) Mono.empty() else Mono.just(toPersist.values)
}
.map { hashedRows ->
hashedRows.map {
RecRecord(
recRunId = run.id,
migrationKey = it.migrationKey,
targetData = it.hashedValue
)
}
}.flatMap { recordRepository.saveAll(it) }

updatedRows.concatWith(newRows).map { rows.first().lazyMeta() }
}
return batches
.doOnNext { logger.debug { "$it emitted" } }
.onErrorMap { DataLoadException("Failed to load data from target [${target.dataSourceRef}]: ${it.message}", it) }
.defaultIfEmpty { DatasetMeta() }
.last()
.map { it.invoke() }
.doOnNext { logger.info { "Load from target completed" } }
}
}

class DataLoadException(message: String, cause: Throwable) : Exception(message, cause)
3 changes: 1 addition & 2 deletions src/main/kotlin/recce/server/recrun/RecRecordRepository.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ import javax.persistence.*
abstract class RecRecordRepository(private val operations: R2dbcOperations) :
ReactorCrudRepository<RecRecord, Int> {

abstract fun existsByRecRunIdAndMigrationKey(recRunId: Int, migrationKey: String): Mono<Boolean>

@Suppress("MicronautDataRepositoryMethodParameters") // False positive
abstract fun updateByRecRunIdAndMigrationKey(recRunId: Int, migrationKey: String, targetData: String?): Mono<Void>

abstract fun findByRecRunIdAndMigrationKeyIn(recRunId: Int, migrationKeys: List<String>): Flux<RecRecord>
Expand Down
25 changes: 13 additions & 12 deletions src/test/kotlin/recce/server/dataset/DatasetRecServiceTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ internal class DatasetRecServiceTest {
private val recordRepository = mock<RecRecordRepository> {
on { save(any()) } doReturn Mono.just(testRecord)
on { saveAll(anyList()) } doReturn Flux.just(testRecord)
on { findByRecRunIdAndMigrationKeyIn(eq(testRecordKey.recRunId), anyList()) } doReturn Flux.just(testRecord)
on { updateByRecRunIdAndMigrationKey(eq(testRecordKey.recRunId), eq(testRecordKey.migrationKey), any()) } doReturn Mono.empty()
}

Expand Down Expand Up @@ -93,8 +94,8 @@ internal class DatasetRecServiceTest {

@Test
fun `should reconcile empty source with target`() {
whenever(recordRepository.existsByRecRunIdAndMigrationKey(testRecordKey.recRunId, testRecordKey.migrationKey))
.doReturn(Mono.just(false))
whenever(recordRepository.findByRecRunIdAndMigrationKeyIn(testRecordKey.recRunId, listOf(testRecordKey.migrationKey)))
.doReturn(Flux.empty())

val service = DatasetRecService(
RecConfiguration(mapOf(testDataset to DatasetConfiguration(emptyDataLoad, singleRowDataLoad))),
Expand All @@ -109,18 +110,18 @@ internal class DatasetRecServiceTest {
}
.verifyComplete()

verify(recordRepository).existsByRecRunIdAndMigrationKey(testRecordKey.recRunId, testRecordKey.migrationKey)
verify(recordRepository).save(RecRecord(key = testRecordKey, targetData = "def"))
verify(recordRepository).findByRecRunIdAndMigrationKeyIn(testRecordKey.recRunId, listOf(testRecordKey.migrationKey))
verify(recordRepository).saveAll(listOf(RecRecord(key = testRecordKey, targetData = "def")))
verifyNoMoreInteractions(recordRepository)
verify(runService).complete(recRun)
}

@Test
fun `should reconcile source with target with different rows row`() {
fun `should reconcile source with target with different rows`() {
// yes, we are re-using the same key, but let's pretend they are different by telling
// the code that the row doesn't exist
whenever(recordRepository.existsByRecRunIdAndMigrationKey(testRecordKey.recRunId, testRecordKey.migrationKey))
.doReturn(Mono.just(false))
whenever(recordRepository.findByRecRunIdAndMigrationKeyIn(testRecordKey.recRunId, listOf(testRecordKey.migrationKey)))
.doReturn(Flux.empty())

val service = DatasetRecService(
RecConfiguration(mapOf(testDataset to DatasetConfiguration(singleRowDataLoad, singleRowDataLoad))),
Expand All @@ -136,16 +137,16 @@ internal class DatasetRecServiceTest {
.verifyComplete()

verify(recordRepository).saveAll(listOf(RecRecord(key = testRecordKey, sourceData = "def")))
verify(recordRepository).existsByRecRunIdAndMigrationKey(testRecordKey.recRunId, testRecordKey.migrationKey)
verify(recordRepository).save(RecRecord(key = testRecordKey, targetData = "def"))
verify(recordRepository).findByRecRunIdAndMigrationKeyIn(testRecordKey.recRunId, listOf(testRecordKey.migrationKey))
verify(recordRepository).saveAll(listOf(RecRecord(key = testRecordKey, targetData = "def")))
verifyNoMoreInteractions(recordRepository)
verify(runService).complete(recRun)
}

@Test
fun `should reconcile source with target when rows have matching key`() {
whenever(recordRepository.existsByRecRunIdAndMigrationKey(testRecordKey.recRunId, testRecordKey.migrationKey))
.doReturn(Mono.just(true))
whenever(recordRepository.findByRecRunIdAndMigrationKeyIn(testRecordKey.recRunId, listOf(testRecordKey.migrationKey)))
.doReturn(Flux.just(testRecord))

val service = DatasetRecService(
RecConfiguration(mapOf(testDataset to DatasetConfiguration(singleRowDataLoad, singleRowDataLoad))),
Expand All @@ -161,7 +162,7 @@ internal class DatasetRecServiceTest {
.verifyComplete()

verify(recordRepository).saveAll(listOf(RecRecord(key = testRecordKey, sourceData = "def")))
verify(recordRepository).existsByRecRunIdAndMigrationKey(testRecordKey.recRunId, testRecordKey.migrationKey)
verify(recordRepository).findByRecRunIdAndMigrationKeyIn(testRecordKey.recRunId, listOf(testRecordKey.migrationKey))
verify(recordRepository).updateByRecRunIdAndMigrationKey(testRecordKey.recRunId, testRecordKey.migrationKey, targetData = "def")
verifyNoMoreInteractions(recordRepository)
verify(runService).complete(recRun)
Expand Down

0 comments on commit 69f36d9

Please sign in to comment.