Skip to content

Commit

Permalink
#40 Batch inserts of data from source DB to recce DB
Browse files Browse the repository at this point in the history
  • Loading branch information
chadlwilson committed Nov 17, 2021
1 parent 5520847 commit f1d1ad1
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 7 deletions.
6 changes: 5 additions & 1 deletion src/main/kotlin/recce/server/RecConfiguration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import io.micronaut.context.BeanLocator
import io.micronaut.context.annotation.ConfigurationInject
import io.micronaut.context.annotation.ConfigurationProperties
import io.micronaut.context.annotation.Context
import io.micronaut.core.bind.annotation.Bindable
import mu.KotlinLogging
import recce.server.dataset.DatasetConfiguration
import javax.annotation.PostConstruct
Expand All @@ -17,7 +18,10 @@ interface PostConstructable {
@Context
@ConfigurationProperties("reconciliation")
class RecConfiguration
@ConfigurationInject constructor(val datasets: Map<String, DatasetConfiguration>) : PostConstructable {
@ConfigurationInject constructor(
val datasets: Map<String, DatasetConfiguration>,
@Bindable(defaultValue = "1000") val defaultBatchSize: Int = 1000
) : PostConstructable {

@PostConstruct
override fun populate(locator: BeanLocator) {
Expand Down
11 changes: 8 additions & 3 deletions src/main/kotlin/recce/server/dataset/DatasetRecService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,16 @@ open class DatasetRecService(

private fun loadFromSource(source: DataLoadDefinition, run: Mono<RecRun>): Mono<DatasetMeta> =
source.runQuery()
.doOnNext { logger.info { "Source query completed; streaming to Recce DB" } }
.flatMap { result -> result.map(HashedRow::fromRow) }
.buffer(config.defaultBatchSize)
.zipWith(run.repeat())
.flatMap { (row, run) ->
.flatMap { (rows, run) ->
val records = rows.map { RecRecord(RecRecordKey(run.id!!, it.migrationKey), sourceData = it.hashedValue) }
recordRepository
.save(RecRecord(RecRecordKey(run.id!!, row.migrationKey), sourceData = row.hashedValue))
.map { row.lazyMeta() }
.saveAll(records)
.index()
.map { (i, _) -> rows[i.toInt()].lazyMeta() }
}
.onErrorMap { DataLoadException("Failed to load data from source [${source.dataSourceRef}]: ${it.message}", it) }
.defaultIfEmpty { DatasetMeta() }
Expand All @@ -62,6 +66,7 @@ open class DatasetRecService(

private fun loadFromTarget(target: DataLoadDefinition, run: Mono<RecRun>): Mono<DatasetMeta> =
target.runQuery()
.doOnNext { logger.info { "Target query completed; streaming to Recce DB" } }
.flatMap { result -> result.map(HashedRow::fromRow) }
.zipWith(run.repeat())
.flatMap { (row, run) ->
Expand Down
8 changes: 5 additions & 3 deletions src/test/kotlin/recce/server/dataset/DatasetRecServiceTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.r2dbc.spi.RowMetadata
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatchers.anyList
import org.mockito.kotlin.*
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
Expand Down Expand Up @@ -42,6 +43,7 @@ internal class DatasetRecServiceTest {
private val testRecord = RecRecord(testRecordKey)
private val recordRepository = mock<RecRecordRepository> {
on { save(any()) } doReturn Mono.just(testRecord)
on { saveAll(anyList()) } doReturn Flux.just(testRecord)
on { update(eq(testRecordKey), any()) } doReturn Mono.empty()
}

Expand Down Expand Up @@ -84,7 +86,7 @@ internal class DatasetRecServiceTest {
}
.verifyComplete()

verify(recordRepository).save(RecRecord(testRecordKey, sourceData = "def"))
verify(recordRepository).saveAll(listOf(RecRecord(testRecordKey, sourceData = "def")))
verifyNoMoreInteractions(recordRepository)
verify(runService).complete(recRun)
}
Expand Down Expand Up @@ -131,7 +133,7 @@ internal class DatasetRecServiceTest {
}
.verifyComplete()

verify(recordRepository).save(RecRecord(testRecordKey, sourceData = "def"))
verify(recordRepository).saveAll(listOf(RecRecord(testRecordKey, sourceData = "def")))
verify(recordRepository).existsById(testRecordKey)
verify(recordRepository).save(RecRecord(testRecordKey, targetData = "def"))
verifyNoMoreInteractions(recordRepository)
Expand All @@ -155,7 +157,7 @@ internal class DatasetRecServiceTest {
}
.verifyComplete()

verify(recordRepository).save(RecRecord(testRecordKey, sourceData = "def"))
verify(recordRepository).saveAll(listOf(RecRecord(testRecordKey, sourceData = "def")))
verify(recordRepository).existsById(testRecordKey)
verify(recordRepository).update(testRecordKey, targetData = "def")
verifyNoMoreInteractions(recordRepository)
Expand Down

0 comments on commit f1d1ad1

Please sign in to comment.