Skip to content

Commit

Permalink
#40 Removing EmbeddedId for referring to RecRecords
Browse files Browse the repository at this point in the history
Micronaut Data seems to have a lot of limitations trying to deal with `EmbeddedId` and `Embedded` types in general when using JPA style repository methods to be auto-implemented by Micronaut Data. So let's see how far we can get without it. For example, when trying to do bulk finds by Embedded ID, it was not able to correctly understand how to generate the queries correctly. While it's probably a bug, it doesn't seem a major focus right now (micronaut-projects/micronaut-data#594 and micronaut-projects/micronaut-data#768)

The downside of adding a surrogate key is extra data to store+manage, extra index to update when bulk updating etc which is why I'd tried to avoid it originally. Short of this change, the other option would be to switch to Micronaut Data with JPA, with Hibernate underneath rather than using Micronaut Data directly.
  • Loading branch information
chadlwilson committed Nov 17, 2021
1 parent ee48549 commit 7b9c1a7
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 53 deletions.
9 changes: 4 additions & 5 deletions src/main/kotlin/recce/server/dataset/DatasetRecService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ open class DatasetRecService(
.buffer(config.defaultBatchSize)
.zipWith(run.repeat())
.flatMap { (rows, run) ->
val records = rows.map { RecRecord(RecRecordKey(run.id!!, it.migrationKey), sourceData = it.hashedValue) }
val records = rows.map { RecRecord(key = RecRecordKey(run.id!!, it.migrationKey), sourceData = it.hashedValue) }
recordRepository
.saveAll(records)
.index()
Expand All @@ -70,13 +70,12 @@ open class DatasetRecService(
.flatMap { result -> result.map(HashedRow::fromRow) }
.zipWith(run.repeat())
.flatMap { (row, run) ->
val key = RecRecordKey(run.id!!, row.migrationKey)
recordRepository
.existsById(key)
.existsByRecRunIdAndMigrationKey(run.id!!, row.migrationKey)
.flatMap { exists ->
when (exists) {
true -> recordRepository.update(key, targetData = row.hashedValue).thenReturn(row)
false -> recordRepository.save(RecRecord(key, targetData = row.hashedValue))
true -> recordRepository.updateByRecRunIdAndMigrationKey(run.id, row.migrationKey, targetData = row.hashedValue).thenReturn(row)
false -> recordRepository.save(RecRecord(recRunId = run.id, migrationKey = row.migrationKey, targetData = row.hashedValue))
}
}
.then(Mono.just(row.lazyMeta()))
Expand Down
32 changes: 19 additions & 13 deletions src/main/kotlin/recce/server/recrun/RecRecordRepository.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package recce.server.recrun

import io.micronaut.data.annotation.Id
import io.micronaut.data.model.query.builder.sql.Dialect
import io.micronaut.data.r2dbc.annotation.R2dbcRepository
import io.micronaut.data.r2dbc.operations.R2dbcOperations
Expand All @@ -9,18 +8,21 @@ import io.r2dbc.spi.Row
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.kotlin.core.publisher.toFlux
import java.io.Serializable
import javax.persistence.*

@R2dbcRepository(dialect = Dialect.POSTGRES)
abstract class RecRecordRepository(private val operations: R2dbcOperations) :
ReactorCrudRepository<RecRecord, RecRecordKey> {
ReactorCrudRepository<RecRecord, Int> {

abstract fun update(@Id id: RecRecordKey, targetData: String?): Mono<Void>
abstract fun existsByRecRunIdAndMigrationKey(recRunId: Int, migrationKey: String): Mono<Boolean>

abstract fun findByIdRecRunId(recRunId: Int): Flux<RecRecord>
abstract fun updateByRecRunIdAndMigrationKey(recRunId: Int, migrationKey: String, targetData: String?): Mono<Void>

fun countMatchedByIdRecRunId(recRunId: Int): Mono<MatchStatus> {
abstract fun findByRecRunIdAndMigrationKeyIn(recRunId: Int, migrationKeys: List<String>): Flux<RecRecord>

abstract fun findByRecRunId(recRunId: Int): Flux<RecRecord>

fun countMatchedByKeyRecRunId(recRunId: Int): Mono<MatchStatus> {
return operations.withConnection { it.createStatement(countRecordsByStatus).bind("$1", recRunId).execute() }
.toFlux()
.flatMap { res -> res.map { row, _ -> matchStatusSetterFor(row) } }
Expand Down Expand Up @@ -66,13 +68,17 @@ abstract class RecRecordRepository(private val operations: R2dbcOperations) :
@Entity
@Table(name = "reconciliation_record")
data class RecRecord(
@EmbeddedId val id: RecRecordKey,
@Id @GeneratedValue val id: Int? = null,
@Column(name = "reconciliation_run_id") val recRunId: Int,
@Column(name = "migration_key") val migrationKey: String,
var sourceData: String? = null,
var targetData: String? = null
)
) {
constructor(key: RecRecordKey, sourceData: String? = null, targetData: String? = null) :
this(recRunId = key.recRunId, migrationKey = key.migrationKey, sourceData = sourceData, targetData = targetData)

@Embeddable
data class RecRecordKey(
@Column(name = "reconciliation_run_id") val recRunId: Int,
@Column(name = "migration_key") val migrationKey: String
) : Serializable
@Transient
val key = RecRecordKey(recRunId, migrationKey)
}

data class RecRecordKey(val recRunId: Int, val migrationKey: String)
2 changes: 1 addition & 1 deletion src/main/kotlin/recce/server/recrun/RecRunService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ open class RecRunService(

fun complete(run: RecRun): Mono<RecRun> {
logger.info { "Summarising results for $run" }
return recordRepository.countMatchedByIdRecRunId(run.id!!)
return recordRepository.countMatchedByKeyRecRunId(run.id!!)
.map { run.apply { completedTime = Instant.now(); summary = it } }
.flatMap(runRepository::update)
.doOnNext { logger.info { "Run completed for $it" } }
Expand Down
11 changes: 11 additions & 0 deletions src/main/resources/db/migration/V7__REC_RECORD.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
DROP TABLE IF EXISTS reconciliation_record;
CREATE TABLE reconciliation_record
(
id SERIAL PRIMARY KEY,
reconciliation_run_id INTEGER NOT NULL
CONSTRAINT fk_reconciliation_run_id REFERENCES reconciliation_run ON DELETE CASCADE,
migration_key VARCHAR(255) NOT NULL,
source_data VARCHAR(1024),
target_data VARCHAR(1024),
CONSTRAINT reconciliation_record_uniq UNIQUE (reconciliation_run_id, migration_key)
);
Original file line number Diff line number Diff line change
Expand Up @@ -64,41 +64,41 @@ class DatasetRecServiceIntegrationTest : DataSourceTest() {
StepVerifier.create(
runRepository.findAll()
.flatMap { run ->
recordRepository.findByIdRecRunId(run.id!!).map { Tuples.of(run, it) }
recordRepository.findByRecRunId(run.id!!).map { Tuples.of(run, it) }
}
)
.assertNext { (run, record) ->
checkPersistentFieldsFor(run)
assertThat(record.id.recRunId).isEqualTo(run.id)
assertThat(record.id.migrationKey).isEqualTo("Test0")
assertThat(record.key.recRunId).isEqualTo(run.id)
assertThat(record.key.migrationKey).isEqualTo("Test0")
assertThat(record.sourceData).isEqualTo("4e92a72630647a5bc6fc3909b52387e6dd6e4466fc7bcceb7439fd6df18fe866")
assertThat(record.targetData).isEqualTo("4e92a72630647a5bc6fc3909b52387e6dd6e4466fc7bcceb7439fd6df18fe866")
}
.assertNext { (run, record) ->
assertThat(run.datasetId).isEqualTo("test-dataset")
assertThat(record.id.recRunId).isEqualTo(run.id)
assertThat(record.id.migrationKey).isEqualTo("Test1")
assertThat(record.key.recRunId).isEqualTo(run.id)
assertThat(record.key.migrationKey).isEqualTo("Test1")
assertThat(record.sourceData).isEqualTo("ba4d2f35698204cfda7e42cb31752d878f578822920440b5aa0ed79f1ac79785")
assertThat(record.targetData).isEqualTo("ba4d2f35698204cfda7e42cb31752d878f578822920440b5aa0ed79f1ac79785")
}
.assertNext { (run, record) ->
assertThat(run.datasetId).isEqualTo("test-dataset")
assertThat(record.id.recRunId).isEqualTo(run.id)
assertThat(record.id.migrationKey).isEqualTo("Test2")
assertThat(record.key.recRunId).isEqualTo(run.id)
assertThat(record.key.migrationKey).isEqualTo("Test2")
assertThat(record.sourceData).isEqualTo("eb25fb4ad862a2ba8a753d1d1c42889d18651150070113527bf55d50b663e7ac")
assertThat(record.targetData).isNull()
}
.assertNext { (run, record) ->
assertThat(run.datasetId).isEqualTo("test-dataset")
assertThat(record.id.recRunId).isEqualTo(run.id)
assertThat(record.id.migrationKey).isEqualTo("Test3")
assertThat(record.key.recRunId).isEqualTo(run.id)
assertThat(record.key.migrationKey).isEqualTo("Test3")
assertThat(record.sourceData).isNull()
assertThat(record.targetData).isEqualTo("168c587d9c765ec2cda598750201d15a2e616641455696df176f51d6433dff37")
}
.assertNext { (run, record) ->
assertThat(run.datasetId).isEqualTo("test-dataset")
assertThat(record.id.recRunId).isEqualTo(run.id)
assertThat(record.id.migrationKey).isEqualTo("Test4")
assertThat(record.key.recRunId).isEqualTo(run.id)
assertThat(record.key.migrationKey).isEqualTo("Test4")
assertThat(record.sourceData).isNull()
assertThat(record.targetData).isEqualTo("8b4cde00f0a0d00546a59e74bc9b183a43d69143944101eeae789163b509038d")
}
Expand Down
31 changes: 17 additions & 14 deletions src/test/kotlin/recce/server/dataset/DatasetRecServiceTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ internal class DatasetRecServiceTest {
}

private val testRecordKey = RecRecordKey(1, "abc")
private val testRecord = RecRecord(testRecordKey)
private val testRecord = RecRecord(key = testRecordKey)
private val recordRepository = mock<RecRecordRepository> {
on { save(any()) } doReturn Mono.just(testRecord)
on { saveAll(anyList()) } doReturn Flux.just(testRecord)
on { update(eq(testRecordKey), any()) } doReturn Mono.empty()
on { updateByRecRunIdAndMigrationKey(eq(testRecordKey.recRunId), eq(testRecordKey.migrationKey), any()) } doReturn Mono.empty()
}

@Test
Expand Down Expand Up @@ -86,14 +86,15 @@ internal class DatasetRecServiceTest {
}
.verifyComplete()

verify(recordRepository).saveAll(listOf(RecRecord(testRecordKey, sourceData = "def")))
verify(recordRepository).saveAll(listOf(RecRecord(key = testRecordKey, sourceData = "def")))
verifyNoMoreInteractions(recordRepository)
verify(runService).complete(recRun)
}

@Test
fun `should reconcile empty source with target`() {
whenever(recordRepository.existsById(testRecordKey)).doReturn(Mono.just(false))
whenever(recordRepository.existsByRecRunIdAndMigrationKey(testRecordKey.recRunId, testRecordKey.migrationKey))
.doReturn(Mono.just(false))

val service = DatasetRecService(
RecConfiguration(mapOf(testDataset to DatasetConfiguration(emptyDataLoad, singleRowDataLoad))),
Expand All @@ -108,8 +109,8 @@ internal class DatasetRecServiceTest {
}
.verifyComplete()

verify(recordRepository).existsById(testRecordKey)
verify(recordRepository).save(RecRecord(testRecordKey, targetData = "def"))
verify(recordRepository).existsByRecRunIdAndMigrationKey(testRecordKey.recRunId, testRecordKey.migrationKey)
verify(recordRepository).save(RecRecord(key = testRecordKey, targetData = "def"))
verifyNoMoreInteractions(recordRepository)
verify(runService).complete(recRun)
}
Expand All @@ -118,7 +119,8 @@ internal class DatasetRecServiceTest {
fun `should reconcile source with target with different rows row`() {
// yes, we are re-using the same key, but let's pretend they are different by telling
// the code that the row doesn't exist
whenever(recordRepository.existsById(testRecordKey)).doReturn(Mono.just(false))
whenever(recordRepository.existsByRecRunIdAndMigrationKey(testRecordKey.recRunId, testRecordKey.migrationKey))
.doReturn(Mono.just(false))

val service = DatasetRecService(
RecConfiguration(mapOf(testDataset to DatasetConfiguration(singleRowDataLoad, singleRowDataLoad))),
Expand All @@ -133,16 +135,17 @@ internal class DatasetRecServiceTest {
}
.verifyComplete()

verify(recordRepository).saveAll(listOf(RecRecord(testRecordKey, sourceData = "def")))
verify(recordRepository).existsById(testRecordKey)
verify(recordRepository).save(RecRecord(testRecordKey, targetData = "def"))
verify(recordRepository).saveAll(listOf(RecRecord(key = testRecordKey, sourceData = "def")))
verify(recordRepository).existsByRecRunIdAndMigrationKey(testRecordKey.recRunId, testRecordKey.migrationKey)
verify(recordRepository).save(RecRecord(key = testRecordKey, targetData = "def"))
verifyNoMoreInteractions(recordRepository)
verify(runService).complete(recRun)
}

@Test
fun `should reconcile source with target when rows have matching key`() {
whenever(recordRepository.existsById(testRecordKey)).doReturn(Mono.just(true))
whenever(recordRepository.existsByRecRunIdAndMigrationKey(testRecordKey.recRunId, testRecordKey.migrationKey))
.doReturn(Mono.just(true))

val service = DatasetRecService(
RecConfiguration(mapOf(testDataset to DatasetConfiguration(singleRowDataLoad, singleRowDataLoad))),
Expand All @@ -157,9 +160,9 @@ internal class DatasetRecServiceTest {
}
.verifyComplete()

verify(recordRepository).saveAll(listOf(RecRecord(testRecordKey, sourceData = "def")))
verify(recordRepository).existsById(testRecordKey)
verify(recordRepository).update(testRecordKey, targetData = "def")
verify(recordRepository).saveAll(listOf(RecRecord(key = testRecordKey, sourceData = "def")))
verify(recordRepository).existsByRecRunIdAndMigrationKey(testRecordKey.recRunId, testRecordKey.migrationKey)
verify(recordRepository).updateByRecRunIdAndMigrationKey(testRecordKey.recRunId, testRecordKey.migrationKey, targetData = "def")
verifyNoMoreInteractions(recordRepository)
verify(runService).complete(recRun)
}
Expand Down
22 changes: 14 additions & 8 deletions src/test/kotlin/recce/server/recrun/RecRecordRepositoryTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.kotlin.core.publisher.toFlux
import reactor.kotlin.core.util.function.component1
import reactor.kotlin.core.util.function.component2
import reactor.test.StepVerifier

@MicronautTest
Expand All @@ -24,7 +26,7 @@ class RecRecordRepositoryTest {

@Test
fun `should count matches when empty`() {
StepVerifier.create(recordRepository.countMatchedByIdRecRunId(1))
StepVerifier.create(recordRepository.countMatchedByKeyRecRunId(1))
.expectNext(MatchStatus())
.verifyComplete()
}
Expand All @@ -43,7 +45,7 @@ class RecRecordRepositoryTest {
.expectNextCount(testRecordData.size.toLong())
.verifyComplete()

StepVerifier.create(recordRepository.countMatchedByIdRecRunId(savedRecords.last().id.recRunId))
StepVerifier.create(recordRepository.countMatchedByKeyRecRunId(savedRecords.last().key.recRunId))
.assertNext {
assertThat(it).isEqualTo(MatchStatus(1, 2, 3, 4))
assertThat(it.sourceTotal).isEqualTo(8)
Expand All @@ -55,10 +57,10 @@ class RecRecordRepositoryTest {

private fun saveTestRecords(testRecordData: List<Pair<String?, String?>>): Flux<RecRecord> {
return runRepository.save(RecRun("test-dataset")).toFlux().flatMap { run ->
var key = 0
Flux.fromIterable(testRecordData)
.flatMap { (source, target) ->
recordRepository.save(RecRecord(RecRecordKey(run.id!!, "${++key}"), source, target))
.index()
.flatMap { (i, data) ->
recordRepository.save(RecRecord(key = RecRecordKey(run.id!!, "${i + 1}"), sourceData = data.first, targetData = data.second))
}
}
}
Expand All @@ -72,8 +74,12 @@ class RecRecordRepositoryTest {
.expectNextCount(testRecordData.size.toLong())
.verifyComplete()

StepVerifier.create(recordRepository.findByIdInList(savedRecords.map { it.id }))
.expectNextCount(10)
.verifyComplete();
val foundRecords = mutableListOf<RecRecord>()
StepVerifier.create(recordRepository.findByRecRunIdAndMigrationKeyIn(savedRecords.first().recRunId, savedRecords.map { it.migrationKey }))
.recordWith { foundRecords }
.expectNextCount(testRecordData.size.toLong())
.verifyComplete()

assertThat(savedRecords).usingRecursiveComparison().ignoringCollectionOrder().isEqualTo(foundRecords)
}
}
2 changes: 1 addition & 1 deletion src/test/kotlin/recce/server/recrun/RecRunServiceTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ internal class RecRunServiceTest {
fun `complete should set completed time`() {
val expectedMatchStatus = MatchStatus(1, 1, 1, 1)
val recordRepository = mock<RecRecordRepository> {
on { countMatchedByIdRecRunId(any()) } doReturn Mono.just(expectedMatchStatus)
on { countMatchedByKeyRecRunId(any()) } doReturn Mono.just(expectedMatchStatus)
}

val runRepository = mock<RecRunRepository> {
Expand Down

0 comments on commit 7b9c1a7

Please sign in to comment.