diff --git a/src/main/kotlin/recce/server/RecConfiguration.kt b/src/main/kotlin/recce/server/RecConfiguration.kt index e3c307f7..fc4d5f73 100644 --- a/src/main/kotlin/recce/server/RecConfiguration.kt +++ b/src/main/kotlin/recce/server/RecConfiguration.kt @@ -20,7 +20,7 @@ interface PostConstructable { class RecConfiguration @ConfigurationInject constructor( val datasets: Map, - @Bindable(defaultValue = "1000") val defaultBatchSize: Int = 1000 + @Bindable(defaultValue = "100") val defaultBatchSize: Int = 100 ) : PostConstructable { @PostConstruct diff --git a/src/main/kotlin/recce/server/dataset/DatasetRecService.kt b/src/main/kotlin/recce/server/dataset/DatasetRecService.kt index c0ebc50a..3feeb40e 100644 --- a/src/main/kotlin/recce/server/dataset/DatasetRecService.kt +++ b/src/main/kotlin/recce/server/dataset/DatasetRecService.kt @@ -3,6 +3,7 @@ package recce.server.dataset import jakarta.inject.Inject import jakarta.inject.Singleton import mu.KotlinLogging +import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.kotlin.core.util.function.component1 import reactor.kotlin.core.util.function.component2 @@ -64,27 +65,52 @@ open class DatasetRecService( .map { it.invoke() } .doOnNext { logger.info { "Load from source completed" } } - private fun loadFromTarget(target: DataLoadDefinition, run: Mono): Mono = - target.runQuery() + private fun loadFromTarget(target: DataLoadDefinition, run: Mono): Mono { + val bufferedRows = target.runQuery() .doOnNext { logger.info { "Target query completed; streaming to Recce DB" } } .flatMap { result -> result.map(HashedRow::fromRow) } .zipWith(run.repeat()) - .flatMap { (row, run) -> - recordRepository - .existsByRecRunIdAndMigrationKey(run.id!!, row.migrationKey) - .flatMap { exists -> - when (exists) { - true -> recordRepository.updateByRecRunIdAndMigrationKey(run.id, row.migrationKey, targetData = row.hashedValue).thenReturn(row) - false -> recordRepository.save(RecRecord(recRunId = run.id, migrationKey = row.migrationKey, targetData = row.hashedValue)) - } + .buffer(config.defaultBatchSize) + val batches: Flux<() -> DatasetMeta> = bufferedRows + .flatMap { rows -> +// logger.info { "Processing batch of size [${rows.size}] from target"} + val runId = rows.first().t2.id!! + val toPersist = rows.map { it.t1 }.associateByTo(mutableMapOf()) { it.migrationKey } + val updatedRows = recordRepository + .findByRecRunIdAndMigrationKeyIn(runId, rows.map { it.t1.migrationKey }) + .flatMap { found -> + recordRepository.updateByRecRunIdAndMigrationKey( + found.recRunId, + found.migrationKey, + targetData = toPersist.remove(found.migrationKey)?.hashedValue + ).map { found } + } + val newRows = Flux + .defer { +// logger.info { "Checking new rows for batch of size [${toPersist.size}]" } + if (toPersist.isEmpty()) Mono.empty() else Mono.just(toPersist.values) } - .then(Mono.just(row.lazyMeta())) + .map { hashedRows -> + hashedRows.map { + RecRecord( + recRunId = runId, + migrationKey = it.migrationKey, + targetData = it.hashedValue + ) + } + }.flatMap { recordRepository.saveAll(it) } + + updatedRows.concatWith(newRows) + .index() + .map { (i, _) -> rows[i.toInt()].t1.lazyMeta() } } + return batches .onErrorMap { DataLoadException("Failed to load data from target [${target.dataSourceRef}]: ${it.message}", it) } .defaultIfEmpty { DatasetMeta() } .last() .map { it.invoke() } .doOnNext { logger.info { "Load from target completed" } } + } } class DataLoadException(message: String, cause: Throwable) : Exception(message, cause)