From f1d1ad1b23aa8531418b0d3efad65661de138470 Mon Sep 17 00:00:00 2001 From: Chad Wilson Date: Thu, 11 Nov 2021 18:27:13 +0800 Subject: [PATCH] #40 Batch inserts of data from source DB to recce DB --- src/main/kotlin/recce/server/RecConfiguration.kt | 6 +++++- .../kotlin/recce/server/dataset/DatasetRecService.kt | 11 ++++++++--- .../recce/server/dataset/DatasetRecServiceTest.kt | 8 +++++--- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/main/kotlin/recce/server/RecConfiguration.kt b/src/main/kotlin/recce/server/RecConfiguration.kt index c8d7c6e3..e3c307f7 100644 --- a/src/main/kotlin/recce/server/RecConfiguration.kt +++ b/src/main/kotlin/recce/server/RecConfiguration.kt @@ -4,6 +4,7 @@ import io.micronaut.context.BeanLocator import io.micronaut.context.annotation.ConfigurationInject import io.micronaut.context.annotation.ConfigurationProperties import io.micronaut.context.annotation.Context +import io.micronaut.core.bind.annotation.Bindable import mu.KotlinLogging import recce.server.dataset.DatasetConfiguration import javax.annotation.PostConstruct @@ -17,7 +18,10 @@ interface PostConstructable { @Context @ConfigurationProperties("reconciliation") class RecConfiguration -@ConfigurationInject constructor(val datasets: Map) : PostConstructable { +@ConfigurationInject constructor( + val datasets: Map, + @Bindable(defaultValue = "1000") val defaultBatchSize: Int = 1000 +) : PostConstructable { @PostConstruct override fun populate(locator: BeanLocator) { diff --git a/src/main/kotlin/recce/server/dataset/DatasetRecService.kt b/src/main/kotlin/recce/server/dataset/DatasetRecService.kt index cedcaa3d..cda6978c 100644 --- a/src/main/kotlin/recce/server/dataset/DatasetRecService.kt +++ b/src/main/kotlin/recce/server/dataset/DatasetRecService.kt @@ -47,12 +47,16 @@ open class DatasetRecService( private fun loadFromSource(source: DataLoadDefinition, run: Mono): Mono = source.runQuery() + .doOnNext { logger.info { "Source query completed; streaming to Recce DB" } } .flatMap { result -> result.map(HashedRow::fromRow) } + .buffer(config.defaultBatchSize) .zipWith(run.repeat()) - .flatMap { (row, run) -> + .flatMap { (rows, run) -> + val records = rows.map { RecRecord(RecRecordKey(run.id!!, it.migrationKey), sourceData = it.hashedValue) } recordRepository - .save(RecRecord(RecRecordKey(run.id!!, row.migrationKey), sourceData = row.hashedValue)) - .map { row.lazyMeta() } + .saveAll(records) + .index() + .map { (i, _) -> rows[i.toInt()].lazyMeta() } } .onErrorMap { DataLoadException("Failed to load data from source [${source.dataSourceRef}]: ${it.message}", it) } .defaultIfEmpty { DatasetMeta() } @@ -62,6 +66,7 @@ open class DatasetRecService( private fun loadFromTarget(target: DataLoadDefinition, run: Mono): Mono = target.runQuery() + .doOnNext { logger.info { "Target query completed; streaming to Recce DB" } } .flatMap { result -> result.map(HashedRow::fromRow) } .zipWith(run.repeat()) .flatMap { (row, run) -> diff --git a/src/test/kotlin/recce/server/dataset/DatasetRecServiceTest.kt b/src/test/kotlin/recce/server/dataset/DatasetRecServiceTest.kt index 0ac43696..028d22a1 100644 --- a/src/test/kotlin/recce/server/dataset/DatasetRecServiceTest.kt +++ b/src/test/kotlin/recce/server/dataset/DatasetRecServiceTest.kt @@ -5,6 +5,7 @@ import io.r2dbc.spi.RowMetadata import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.jupiter.api.Test +import org.mockito.ArgumentMatchers.anyList import org.mockito.kotlin.* import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -42,6 +43,7 @@ internal class DatasetRecServiceTest { private val testRecord = RecRecord(testRecordKey) private val recordRepository = mock { on { save(any()) } doReturn Mono.just(testRecord) + on { saveAll(anyList()) } doReturn Flux.just(testRecord) on { update(eq(testRecordKey), any()) } doReturn Mono.empty() } @@ -84,7 +86,7 @@ internal class DatasetRecServiceTest { } .verifyComplete() - verify(recordRepository).save(RecRecord(testRecordKey, sourceData = "def")) + verify(recordRepository).saveAll(listOf(RecRecord(testRecordKey, sourceData = "def"))) verifyNoMoreInteractions(recordRepository) verify(runService).complete(recRun) } @@ -131,7 +133,7 @@ internal class DatasetRecServiceTest { } .verifyComplete() - verify(recordRepository).save(RecRecord(testRecordKey, sourceData = "def")) + verify(recordRepository).saveAll(listOf(RecRecord(testRecordKey, sourceData = "def"))) verify(recordRepository).existsById(testRecordKey) verify(recordRepository).save(RecRecord(testRecordKey, targetData = "def")) verifyNoMoreInteractions(recordRepository) @@ -155,7 +157,7 @@ internal class DatasetRecServiceTest { } .verifyComplete() - verify(recordRepository).save(RecRecord(testRecordKey, sourceData = "def")) + verify(recordRepository).saveAll(listOf(RecRecord(testRecordKey, sourceData = "def"))) verify(recordRepository).existsById(testRecordKey) verify(recordRepository).update(testRecordKey, targetData = "def") verifyNoMoreInteractions(recordRepository)