Skip to content

Commit

Permalink
#40 First messy attempt to batch the target inserts
Browse files Browse the repository at this point in the history
- divides each target row into batches
- batch searches for whether they already exist from source
- if they do, updates them one-by-one (no easy way with Micronaut data to do these updates in bulk just yet)
- any remaining which do not are batch inserted

This still doesn't seem quite right
- large batch sizes cause things to lock up and halt - some kind of exhaustion?
- this leaves the slowest bit the updates to existing rows which are done one-by-one
- if all rows in target are matched to source, the target metadata is missing
  • Loading branch information
chadlwilson committed Nov 16, 2021
1 parent 1f11173 commit de49086
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 12 deletions.
2 changes: 1 addition & 1 deletion src/main/kotlin/recce/server/RecConfiguration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ interface PostConstructable {
class RecConfiguration
@ConfigurationInject constructor(
val datasets: Map<String, DatasetConfiguration>,
@Bindable(defaultValue = "1000") val defaultBatchSize: Int = 1000
@Bindable(defaultValue = "100") val defaultBatchSize: Int = 100
) : PostConstructable {

@PostConstruct
Expand Down
48 changes: 37 additions & 11 deletions src/main/kotlin/recce/server/dataset/DatasetRecService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package recce.server.dataset
import jakarta.inject.Inject
import jakarta.inject.Singleton
import mu.KotlinLogging
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.kotlin.core.util.function.component1
import reactor.kotlin.core.util.function.component2
Expand Down Expand Up @@ -64,27 +65,52 @@ open class DatasetRecService(
.map { it.invoke() }
.doOnNext { logger.info { "Load from source completed" } }

private fun loadFromTarget(target: DataLoadDefinition, run: Mono<RecRun>): Mono<DatasetMeta> =
target.runQuery()
private fun loadFromTarget(target: DataLoadDefinition, run: Mono<RecRun>): Mono<DatasetMeta> {
val bufferedRows = target.runQuery()
.doOnNext { logger.info { "Target query completed; streaming to Recce DB" } }
.flatMap { result -> result.map(HashedRow::fromRow) }
.zipWith(run.repeat())
.flatMap { (row, run) ->
recordRepository
.existsByRecRunIdAndMigrationKey(run.id!!, row.migrationKey)
.flatMap { exists ->
when (exists) {
true -> recordRepository.updateByRecRunIdAndMigrationKey(run.id, row.migrationKey, targetData = row.hashedValue).thenReturn(row)
false -> recordRepository.save(RecRecord(recRunId = run.id, migrationKey = row.migrationKey, targetData = row.hashedValue))
}
.buffer(config.defaultBatchSize)
val batches: Flux<() -> DatasetMeta> = bufferedRows
.flatMap { rows ->
// logger.info { "Processing batch of size [${rows.size}] from target"}
val runId = rows.first().t2.id!!
val toPersist = rows.map { it.t1 }.associateByTo(mutableMapOf()) { it.migrationKey }
val updatedRows = recordRepository
.findByRecRunIdAndMigrationKeyIn(runId, rows.map { it.t1.migrationKey })
.flatMap { found ->
recordRepository.updateByRecRunIdAndMigrationKey(
found.recRunId,
found.migrationKey,
targetData = toPersist.remove(found.migrationKey)?.hashedValue
).map { found }
}
val newRows = Flux
.defer {
// logger.info { "Checking new rows for batch of size [${toPersist.size}]" }
if (toPersist.isEmpty()) Mono.empty() else Mono.just(toPersist.values)
}
.then(Mono.just(row.lazyMeta()))
.map { hashedRows ->
hashedRows.map {
RecRecord(
recRunId = runId,
migrationKey = it.migrationKey,
targetData = it.hashedValue
)
}
}.flatMap { recordRepository.saveAll(it) }

updatedRows.concatWith(newRows)
.index()
.map { (i, _) -> rows[i.toInt()].t1.lazyMeta() }
}
return batches
.onErrorMap { DataLoadException("Failed to load data from target [${target.dataSourceRef}]: ${it.message}", it) }
.defaultIfEmpty { DatasetMeta() }
.last()
.map { it.invoke() }
.doOnNext { logger.info { "Load from target completed" } }
}
}

class DataLoadException(message: String, cause: Throwable) : Exception(message, cause)

0 comments on commit de49086

Please sign in to comment.