From 91f39ee6b9c6faf2911da05a3fadcad42e1f3c4d Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 9 Oct 2023 10:47:01 +0000 Subject: [PATCH 1/2] Bump com.diffplug.spotless from 6.21.0 to 6.22.0 Bumps com.diffplug.spotless from 6.21.0 to 6.22.0. --- updated-dependencies: - dependency-name: com.diffplug.spotless dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle.kts b/build.gradle.kts index 5ed63091..d8d72d2e 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -7,7 +7,7 @@ plugins { id("org.jetbrains.kotlin.plugin.allopen") version kotlinVersion id("org.jetbrains.kotlin.plugin.jpa") version kotlinVersion id("io.micronaut.application") version "3.7.10" - id("com.diffplug.spotless") version "6.21.0" + id("com.diffplug.spotless") version "6.22.0" id("io.gitlab.arturbosch.detekt") version "1.23.1" id("com.github.spotbugs") version "5.1.4" jacoco From b1fda4b12be5e54e392289c39c6d6b6aaa3b9c5a Mon Sep 17 00:00:00 2001 From: Chad Wilson Date: Wed, 11 Oct 2023 19:22:10 +0800 Subject: [PATCH 2/2] Fix spotless violations --- build.gradle.kts | 60 +++--- .../kotlin/recce/server/R2dbcConfiguration.kt | 25 +-- .../kotlin/recce/server/RecConfiguration.kt | 66 ++++--- src/main/kotlin/recce/server/RecceServer.kt | 12 +- .../recce/server/api/DatasetApiModel.kt | 10 +- .../recce/server/api/DatasetController.kt | 8 +- .../server/api/DatasetRecRunController.kt | 31 +-- .../kotlin/recce/server/api/RunApiModel.kt | 121 ++++++------ .../recce/server/auth/AuthConfiguration.kt | 7 +- .../server/dataset/DataLoadDefinition.kt | 107 +++++----- .../server/dataset/DatasetConfiguration.kt | 1 - .../server/dataset/DatasetRecScheduler.kt | 1 - .../recce/server/dataset/DatasetRecService.kt | 65 +++--- .../recce/server/dataset/HashingStrategy.kt | 36 +++- .../server/recrun/RecRecordRepository.kt | 70 ++++--- .../recce/server/recrun/RecRunRepository.kt | 15 +- .../recce/server/recrun/RecRunService.kt | 17 +- .../recce/server/R2dbcConfigurationTest.kt | 10 +- .../server/RecConfigurationPropertiesTest.kt | 24 +-- .../recce/server/RecConfigurationTest.kt | 1 - .../kotlin/recce/server/RecceServerTest.kt | 1 - src/test/kotlin/recce/server/TestConfig.kt | 5 +- .../recce/server/api/DatasetControllerTest.kt | 38 ++-- .../server/api/DatasetRecRunControllerTest.kt | 73 +++---- .../recce/server/api/RunApiModelTest.kt | 1 - .../server/auth/AuthConfigurationTest.kt | 7 +- .../auth/BasicAuthenticationProviderTest.kt | 1 - .../server/dataset/DataLoadDefinitionTest.kt | 97 +++++---- .../dataset/DatasetConfigurationTest.kt | 1 - .../server/dataset/DatasetRecSchedulerTest.kt | 32 +-- ...tRecServiceCrossDatabaseIntegrationTest.kt | 92 +++++---- .../DatasetRecServiceIntegrationTest.kt | 29 +-- .../server/dataset/DatasetRecServiceTest.kt | 185 ++++++++++-------- .../recce/server/dataset/HashedRowTest.kt | 11 +- .../server/dataset/HashingStrategyTest.kt | 115 ++++++----- .../kotlin/recce/server/dataset/R2dbcFakes.kt | 27 ++- .../dataset/datasource/FlywayMigrator.kt | 23 ++- .../recce/server/recrun/RecRunServiceTest.kt | 41 ++-- .../recce/server/util/ThrowableUtilsTest.kt | 12 +- 39 files changed, 816 insertions(+), 662 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index d8d72d2e..67d03e33 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -21,14 +21,17 @@ group = "recce.server" // Workaround to allow dependabot to update versions of libraries together, since dependabot doesn't understand // the Gradle DSL properly. Here we pick one of the versions where multiple artifacts are released at the same time // and use this to bump the others consistently. -val depDescriptors = mapOf( - "micronaut" to "io.micronaut:micronaut-core:3.10.1", - "restAssured" to "io.rest-assured:rest-assured:4.5.1" -) -val depVersions = depDescriptors.mapValues { (_, v) -> v.split(':').last() } + mapOf( - "javaMajor" to "17", - "reactorToolsVersionExpected" to "3.5.11" -) +val depDescriptors = + mapOf( + "micronaut" to "io.micronaut:micronaut-core:3.10.1", + "restAssured" to "io.rest-assured:rest-assured:4.5.1" + ) +val depVersions = + depDescriptors.mapValues { (_, v) -> v.split(':').last() } + + mapOf( + "javaMajor" to "17", + "reactorToolsVersionExpected" to "3.5.11" + ) repositories { mavenCentral() @@ -56,11 +59,7 @@ micronaut { kapt { arguments { - val props = mapOf( - "rapidoc.enabled" to true, - "rapidoc.theme" to "dark", - "rapidoc.render-style" to "view" - ) + val props = mapOf("rapidoc.enabled" to true, "rapidoc.theme" to "dark", "rapidoc.render-style" to "view") arg("micronaut.openapi.views.spec", props.entries.joinToString(",") { "${it.key}=${it.value}" }) } } @@ -265,10 +264,11 @@ jib { } to { val fullVersion = com.github.zafarkhaja.semver.Version.valueOf(project.version.toString()) - val tagVersion = com.github.zafarkhaja.semver.Version.Builder() - .setNormalVersion(fullVersion.normalVersion) - .setPreReleaseVersion(fullVersion.preReleaseVersion.split('.')[0]) - .build() + val tagVersion = + com.github.zafarkhaja.semver.Version.Builder() + .setNormalVersion(fullVersion.normalVersion) + .setPreReleaseVersion(fullVersion.preReleaseVersion.split('.')[0]) + .build() tags = setOf(tagVersion.toString(), "latest") } container { @@ -280,19 +280,23 @@ jib { } } -val checkJibDependencies = tasks.register("checkJibDependencies") { - doFirst { - val resolvedReactorToolsVersion = - project.configurations.runtimeClasspath.get() - .resolvedConfiguration.resolvedArtifacts.find { it.name == "reactor-tools" }?.moduleVersion?.id?.version - if (depVersions["reactorToolsVersionExpected"] != resolvedReactorToolsVersion) { - throw GradleException( - "Jib docker build expected reactor-tools [${depVersions["reactorToolsVersionExpected"]}] but found " + - "[$resolvedReactorToolsVersion] in dependencies. Update reactorToolsVersionExpected!" - ) +val checkJibDependencies = + tasks.register("checkJibDependencies") { + doFirst { + val resolvedReactorToolsVersion = + project.configurations + .runtimeClasspath + .get() + .resolvedConfiguration + .resolvedArtifacts.find { it.name == "reactor-tools" }?.moduleVersion?.id?.version + if (depVersions["reactorToolsVersionExpected"] != resolvedReactorToolsVersion) { + throw GradleException( + "Jib docker build expected reactor-tools [${depVersions["reactorToolsVersionExpected"]}] but " + + "found [$resolvedReactorToolsVersion] in dependencies. Update reactorToolsVersionExpected!" + ) + } } } -} // Jib task pushes an image. Only do so after running all checks tasks.register("jibGitHubContainerRegistry") { diff --git a/src/main/kotlin/recce/server/R2dbcConfiguration.kt b/src/main/kotlin/recce/server/R2dbcConfiguration.kt index de8901ba..c7040eba 100644 --- a/src/main/kotlin/recce/server/R2dbcConfiguration.kt +++ b/src/main/kotlin/recce/server/R2dbcConfiguration.kt @@ -8,21 +8,24 @@ import jakarta.inject.Inject import jakarta.inject.Singleton @Singleton -class R2dbcConfiguration @Inject constructor(sources: List) { - private var datasources: Map +class R2dbcConfiguration + @Inject + constructor(sources: List) { + private var datasources: Map - init { - this.datasources = sources.associate { it.name to it.url } - } + init { + this.datasources = sources.associate { it.name to it.url } + } - fun getUrl(datasourceRef: String): String = - datasources[datasourceRef] - ?: throw ConfigurationException("Cannot locate datasourceRef [$datasourceRef] in r2dbc configuration!") -} + fun getUrl(datasourceRef: String): String = + datasources[datasourceRef] + ?: throw ConfigurationException("Cannot locate datasourceRef [$datasourceRef] in r2dbc configuration!") + } @Introspected @EachProperty("r2dbc.datasources") -class R2dbcDatasource -constructor(@param:Parameter val name: String) { +class R2dbcDatasource( + @param:Parameter val name: String +) { lateinit var url: String } diff --git a/src/main/kotlin/recce/server/RecConfiguration.kt b/src/main/kotlin/recce/server/RecConfiguration.kt index 89fa8803..4420f5bc 100644 --- a/src/main/kotlin/recce/server/RecConfiguration.kt +++ b/src/main/kotlin/recce/server/RecConfiguration.kt @@ -21,43 +21,45 @@ interface PostConstructable { @Context @ConfigurationProperties("reconciliation") class RecConfiguration -@ConfigurationInject constructor( - val datasets: Map, - val defaults: DefaultsProvider = DefaultsProvider() -) : PostConstructable { - - @PostConstruct - override fun populate(locator: BeanLocator) { - for ((id, config) in datasets) { - config.id = id - config.populate(locator) - } - logger.info { - "Loaded ${datasets.size} datasets available for triggering: " + - "${datasets.values.groupBy { it.datasourceDescriptor }.toSortedMap()} " + @ConfigurationInject + constructor( + val datasets: Map, + val defaults: DefaultsProvider = DefaultsProvider() + ) : PostConstructable { + @PostConstruct + override fun populate(locator: BeanLocator) { + for ((id, config) in datasets) { + config.id = id + config.populate(locator) + } + logger.info { + "Loaded ${datasets.size} datasets available for triggering: " + + "${datasets.values.groupBy { it.datasourceDescriptor }.toSortedMap()} " + } } } -} @Context @ConfigurationProperties("reconciliation.defaults") -class DefaultsProvider @ConfigurationInject constructor( - @Bindable(defaultValue = "1000") val batchSize: Int, - @Bindable(defaultValue = "5") val batchConcurrency: Int, - @Bindable(defaultValue = "TypeLenient") val hashingStrategy: HashingStrategy, - @Bindable(defaultValue = "queries") val queryFileBaseDir: Path -) { - constructor() : this( - batchSize = 1000, - batchConcurrency = 5, - hashingStrategy = HashingStrategy.TypeLenient, - queryFileBaseDir = Path("queries") - ) +class DefaultsProvider + @ConfigurationInject + constructor( + @Bindable(defaultValue = "1000") val batchSize: Int, + @Bindable(defaultValue = "5") val batchConcurrency: Int, + @Bindable(defaultValue = "TypeLenient") val hashingStrategy: HashingStrategy, + @Bindable(defaultValue = "queries") val queryFileBaseDir: Path + ) { + constructor() : this( + batchSize = 1000, + batchConcurrency = 5, + hashingStrategy = HashingStrategy.TypeLenient, + queryFileBaseDir = Path("queries") + ) - @PostConstruct - fun log() { - logger.info { - "Reconciliation batch size is $batchSize and concurrency is $batchConcurrency" + @PostConstruct + fun log() { + logger.info { + "Reconciliation batch size is $batchSize and concurrency is $batchConcurrency" + } } } -} diff --git a/src/main/kotlin/recce/server/RecceServer.kt b/src/main/kotlin/recce/server/RecceServer.kt index 88fd2598..e8aa3c69 100644 --- a/src/main/kotlin/recce/server/RecceServer.kt +++ b/src/main/kotlin/recce/server/RecceServer.kt @@ -16,11 +16,12 @@ private const val GITHUB_PROJECT = "https://github.com/thoughtworks-sea/recce" private val logger = KotlinLogging.logger {} @OpenAPIDefinition( - info = Info( - title = "Recce Server", - description = "Server-based database reconciliation tool for developers", - contact = Contact(name = "Recce Community", url = "$GITHUB_PROJECT/issues") - ), + info = + Info( + title = "Recce Server", + description = "Server-based database reconciliation tool for developers", + contact = Contact(name = "Recce Community", url = "$GITHUB_PROJECT/issues") + ), externalDocs = ExternalDocumentation(description = "Server Docs", url = "$GITHUB_PROJECT/README.md"), security = [SecurityRequirement(name = "basicAuth")] ) @@ -33,7 +34,6 @@ private val logger = KotlinLogging.logger {} ) ) object RecceServer { - @JvmStatic @Suppress("SpreadOperator") fun main(args: Array) { diff --git a/src/main/kotlin/recce/server/api/DatasetApiModel.kt b/src/main/kotlin/recce/server/api/DatasetApiModel.kt index cdf28bf5..7387b6b1 100644 --- a/src/main/kotlin/recce/server/api/DatasetApiModel.kt +++ b/src/main/kotlin/recce/server/api/DatasetApiModel.kt @@ -9,13 +9,10 @@ import java.time.ZonedDateTime data class DatasetApiModel( @field:Schema(description = "Identifier for this dataset as specified in server configuration") val id: String, - @field:Schema(description = "Datasource considered the `source` of this reconciliation dataset") val source: DatasourceApiModel, - @field:Schema(description = "Datasource considered the `target` of this reconciliation dataset") val target: DatasourceApiModel, - @field:Schema(description = "Information about the scheduling of this dataset, if configured") val schedule: ScheduleApiModel? = null ) { @@ -29,7 +26,6 @@ data class DatasetApiModel( @Schema(name = "Datasource") data class DatasourceApiModel( - @field:Schema(description = "The logical name of the datasource this refers to") val ref: String ) @@ -37,11 +33,11 @@ data class DatasourceApiModel( @Schema(name = "Schedule", description = "Scheduling information") data class ScheduleApiModel( @field:Schema( - description = "Cron Expression valid per " + - "https://docs.micronaut.io/latest/api/io/micronaut/scheduling/cron/CronExpression.html" + description = + "Cron Expression valid per " + + "https://docs.micronaut.io/latest/api/io/micronaut/scheduling/cron/CronExpression.html" ) val cronExpression: String, - @field:Schema(description = "Time this dataset is scheduled to next be triggered") val nextTriggerTime: ZonedDateTime ) { diff --git a/src/main/kotlin/recce/server/api/DatasetController.kt b/src/main/kotlin/recce/server/api/DatasetController.kt index e066d135..1bf0824d 100644 --- a/src/main/kotlin/recce/server/api/DatasetController.kt +++ b/src/main/kotlin/recce/server/api/DatasetController.kt @@ -16,7 +16,6 @@ import recce.server.dataset.DatasetConfigProvider class DatasetController( @Inject private val configProvider: DatasetConfigProvider ) { - @Get @Operation( summary = "Retrieve pre-configured datasets", @@ -24,7 +23,8 @@ class DatasetController( tags = ["Datasets"], responses = [ApiResponse(responseCode = "200", description = "Dataset configurations found")] ) - fun getDatasets() = configProvider.availableDataSets - .map { DatasetApiModel(it) } - .sortedBy { it.id } + fun getDatasets() = + configProvider.availableDataSets + .map { DatasetApiModel(it) } + .sortedBy { it.id } } diff --git a/src/main/kotlin/recce/server/api/DatasetRecRunController.kt b/src/main/kotlin/recce/server/api/DatasetRecRunController.kt index 0dece8cf..dd1adedf 100644 --- a/src/main/kotlin/recce/server/api/DatasetRecRunController.kt +++ b/src/main/kotlin/recce/server/api/DatasetRecRunController.kt @@ -40,10 +40,10 @@ class DatasetRecRunController( @field:Schema(description = "The identifier of the reconciliation run to retrieve") @field:PathVariable val runId: Int, - @field:Schema( - description = "How many sample mismatched migration keys of each type (only in source, only in " + - "target, mismatched data) in the results" + description = + "How many sample mismatched migration keys of each type (only in source, only in target, " + + "mismatched data) in the results" ) @field:QueryValue(defaultValue = "0") @field:Nullable @@ -65,18 +65,19 @@ class DatasetRecRunController( ): Mono { logger.info { "Finding $params" } - val findSampleRows = if (params.includeSampleKeys == 0) { - Mono.just(emptyMap()) - } else { - recordRepository.findFirstByRecRunIdSplitByMatchStatus(params.runId, params.includeSampleKeys) - .map { it.matchStatus to it.migrationKey } - .collectList() - .defaultIfEmpty(emptyList()) - .map { records -> - records.groupBy { it.first } - .mapValues { entry -> entry.value.map { pair -> pair.second } } - } - } + val findSampleRows = + if (params.includeSampleKeys == 0) { + Mono.just(emptyMap()) + } else { + recordRepository.findFirstByRecRunIdSplitByMatchStatus(params.runId, params.includeSampleKeys) + .map { it.matchStatus to it.migrationKey } + .collectList() + .defaultIfEmpty(emptyList()) + .map { records -> + records.groupBy { it.first } + .mapValues { entry -> entry.value.map { pair -> pair.second } } + } + } return runRepository .findById(params.runId) diff --git a/src/main/kotlin/recce/server/api/RunApiModel.kt b/src/main/kotlin/recce/server/api/RunApiModel.kt index a9fadba7..b3c17d24 100644 --- a/src/main/kotlin/recce/server/api/RunApiModel.kt +++ b/src/main/kotlin/recce/server/api/RunApiModel.kt @@ -16,26 +16,19 @@ import java.time.Instant data class RunApiModel( @field:Schema(description = "Identifier of the individual reconciliation run") val id: Int, - @field:Schema(description = "Identifier of the dataset that this reconciliation run is for") val datasetId: String, - @field:Schema(description = "Time when the run started") val createdTime: Instant, - @field:Schema(description = "Time when the run completed") val completedTime: Instant?, - @field:Schema(description = "Status of this run") val status: RunStatus, - @field:Schema(description = "Summary results from the run") val summary: RunSummary?, - @field:Schema(description = "Reason for failure of the run") val failureCause: String? = null, - - @field: Schema(description = "Metadata about the run") + @field:Schema(description = "Metadata about the run") val metadata: Map ) { @get:Schema(type = "number", format = "double", description = "How long the run took, in seconds") @@ -50,19 +43,21 @@ data class RunApiModel( fun migrationKeySamples(migrationKeySamples: Map>) = apply { summaryBuilder.migrationKeySamples(migrationKeySamples) } - fun build() = RunApiModel( - id = run.id!!, - datasetId = run.datasetId, - createdTime = run.createdTime!!, - completedTime = run.completedTime, - status = run.status, - summary = summaryBuilder - .sourceMeta(run.sourceMeta) - .targetMeta(run.targetMeta) - .build(), - failureCause = run.failureCause?.run { extractFailureCause(this) }, - metadata = run.metadata - ) + fun build() = + RunApiModel( + id = run.id!!, + datasetId = run.datasetId, + createdTime = run.createdTime!!, + completedTime = run.completedTime, + status = run.status, + summary = + summaryBuilder + .sourceMeta(run.sourceMeta) + .targetMeta(run.targetMeta) + .build(), + failureCause = run.failureCause?.run { extractFailureCause(this) }, + metadata = run.metadata + ) } } @@ -72,28 +67,23 @@ data class RunApiModel( ) data class RunSummary( @field:Schema( - description = "Total number of rows in the dataset across both datasources, determined by distinct " + - "MigrationKey values" + description = + "Total number of rows in the dataset across both datasources, determined by distinct MigrationKey values" ) val totalCount: Int, - @field:Schema(description = "Number of rows considered matched (identical values) across both source and target") val bothMatchedCount: Int, - @field:Schema( - description = "Number of rows considered mis-matched (one or more column value differed) across both source " + - "and target" + description = + "Number of rows considered mis-matched (one or more column value differed) across both source and target" ) val bothMismatchedCount: Int, - @field:Schema( description = "Sample MigrationKeys for rows considered mis-matched that may need deeper investigation" ) var bothMismatchedSampleKeys: List? = null, - @field:Schema(description = "Summary results relating only to the dataset from the source datasource") val source: IndividualDbRunSummary, - @field:Schema(description = "Summary results relating only to the dataset from the target datasource") val target: IndividualDbRunSummary ) { @@ -101,38 +91,43 @@ data class RunSummary( private var matchStatus: MatchStatus? = null, private var sourceMeta: DatasetMeta? = null, private var targetMeta: DatasetMeta? = null, - private var migrationKeySamples: Map>? = null ) { fun matchStatus(matchStatus: MatchStatus?) = apply { this.matchStatus = matchStatus } + fun sourceMeta(sourceMeta: DatasetMeta) = apply { this.sourceMeta = sourceMeta } + fun targetMeta(targetMeta: DatasetMeta) = apply { this.targetMeta = targetMeta } + fun migrationKeySamples(migrationKeySamples: Map>) = apply { this.migrationKeySamples = migrationKeySamples } - fun build() = matchStatus?.let { - RunSummary( - matchStatus!!.total, - matchStatus!!.bothMatched, - matchStatus!!.bothMismatched, - source = IndividualDbRunSummary( - IndividualDbMeta(sourceMeta), - matchStatus!!.sourceTotal, - matchStatus!!.sourceOnly - ), - target = IndividualDbRunSummary( - IndividualDbMeta(targetMeta), - matchStatus!!.targetTotal, - matchStatus!!.targetOnly - ) - ).apply { - migrationKeySamples?.let { - source.onlyHereSampleKeys = migrationKeySamples!![RecordMatchStatus.SourceOnly] - target.onlyHereSampleKeys = migrationKeySamples!![RecordMatchStatus.TargetOnly] - bothMismatchedSampleKeys = migrationKeySamples!![RecordMatchStatus.BothMismatched] + fun build() = + matchStatus?.let { + RunSummary( + matchStatus!!.total, + matchStatus!!.bothMatched, + matchStatus!!.bothMismatched, + source = + IndividualDbRunSummary( + IndividualDbMeta(sourceMeta), + matchStatus!!.sourceTotal, + matchStatus!!.sourceOnly + ), + target = + IndividualDbRunSummary( + IndividualDbMeta(targetMeta), + matchStatus!!.targetTotal, + matchStatus!!.targetOnly + ) + ).apply { + migrationKeySamples?.let { + source.onlyHereSampleKeys = migrationKeySamples!![RecordMatchStatus.SourceOnly] + target.onlyHereSampleKeys = migrationKeySamples!![RecordMatchStatus.TargetOnly] + bothMismatchedSampleKeys = migrationKeySamples!![RecordMatchStatus.BothMismatched] + } } } - } } } @@ -143,13 +138,10 @@ data class RunSummary( data class IndividualDbRunSummary( @field:Schema(description = "Metadata relating to the query executed on the underlying datasource") val meta: IndividualDbMeta?, - @field:Schema(description = "Total number of rows in the dataset for an individual datasource") val totalCount: Int, - @field:Schema(description = "Number of rows only found in this datasource") val onlyHereCount: Int, - @field:Schema(description = "Sample MigrationKeys for rows only found within this datasource") var onlyHereSampleKeys: List? = null ) @@ -160,8 +152,8 @@ data class IndividualDbRunSummary( ) data class IndividualDbMeta( @field:Schema( - description = "Metadata describing the individual columns within the dataset query, ordered as in " + - "the query expression" + description = + "Metadata describing the individual columns within the dataset query, ordered as in the query expression" ) var cols: List = emptyList() ) { @@ -174,16 +166,17 @@ data class IndividualDbMeta( ) data class ColMeta( @field:Schema( - description = "Name of the column as retrieved by the dataset query. Name columns in your dataset " + - "SQL expressions to alter these." + description = + "Name of the column as retrieved by the dataset query. Name columns in your dataset SQL expressions " + + "to alter these." ) val name: String, - @field:Schema( - description = "The deserialized Java type representing the column after retrieval from the data " + - "DataLoadRole.source. This can help you understand why two rows may not have a matching hash. If the " + - "columns have incompatible types which will not be hashed consistently, you may need to coerce the " + - "types in your dataset query. " + description = + "The deserialized Java type representing the column after retrieval from the data " + + "DataLoadRole.source. This can help you understand why two rows may not have a matching hash. If the " + + "columns have incompatible types which will not be hashed consistently, you may need to coerce the " + + "types in your dataset query. " ) val javaType: String ) { diff --git a/src/main/kotlin/recce/server/auth/AuthConfiguration.kt b/src/main/kotlin/recce/server/auth/AuthConfiguration.kt index 74e73b2f..8bb14f85 100644 --- a/src/main/kotlin/recce/server/auth/AuthConfiguration.kt +++ b/src/main/kotlin/recce/server/auth/AuthConfiguration.kt @@ -7,4 +7,9 @@ import javax.validation.constraints.NotBlank @Singleton @ConfigurationProperties("auth") -class AuthConfiguration @ConfigurationInject constructor(@NotBlank val username: String, @NotBlank val password: String) +class AuthConfiguration + @ConfigurationInject + constructor( + @NotBlank val username: String, + @NotBlank val password: String + ) diff --git a/src/main/kotlin/recce/server/dataset/DataLoadDefinition.kt b/src/main/kotlin/recce/server/dataset/DataLoadDefinition.kt index 20c9b4bb..a36e3554 100644 --- a/src/main/kotlin/recce/server/dataset/DataLoadDefinition.kt +++ b/src/main/kotlin/recce/server/dataset/DataLoadDefinition.kt @@ -18,62 +18,69 @@ import javax.validation.constraints.NotBlank import kotlin.io.path.readText class DataLoadDefinition -@ConfigurationInject constructor( - @NotBlank val datasourceRef: String, - var query: Optional = Optional.empty(), - var queryFile: Optional = Optional.empty() -) : PostConstructable { - lateinit var dbOperations: R2dbcOperations - lateinit var role: DataLoadRole - lateinit var queryStatement: String - lateinit var queryFileBaseDir: Path - lateinit var datasetId: String + @ConfigurationInject + constructor( + @NotBlank val datasourceRef: String, + var query: Optional = Optional.empty(), + var queryFile: Optional = Optional.empty() + ) : PostConstructable { + lateinit var dbOperations: R2dbcOperations + lateinit var role: DataLoadRole + lateinit var queryStatement: String + lateinit var queryFileBaseDir: Path + lateinit var datasetId: String - override fun populate(locator: BeanLocator) { - dbOperations = locator.findBean(R2dbcOperations::class.java, Qualifiers.byName(datasourceRef)) - .orElseThrow { - ConfigurationException( - "Cannot locate ${R2dbcOperations::class.java.simpleName} named [$datasourceRef] in configuration!" - ) - } - - queryFileBaseDir = locator.findBean(DefaultsProvider::class.java) - .orElseThrow { - ConfigurationException("Cannot locate ${DefaultsProvider::class.java.simpleName} in configuration!") - } - .queryFileBaseDir + override fun populate(locator: BeanLocator) { + val ops = R2dbcOperations::class.java + dbOperations = + locator.findBean(ops, Qualifiers.byName(datasourceRef)) + .orElseThrow { + ConfigurationException( + "Cannot locate ${ops.simpleName} named [$datasourceRef] in configuration!" + ) + } - queryStatement = this.resolveQueryStatement() - } - - companion object { - const val migrationKeyColumnName: String = "MigrationKey" - } + val defaults = DefaultsProvider::class.java + queryFileBaseDir = + locator.findBean(defaults) + .orElseThrow { + ConfigurationException("Cannot locate ${defaults.simpleName} in configuration!") + } + .queryFileBaseDir - fun runQuery(): Flux = Flux.usingWhen( - dbOperations.connectionFactory().create(), - { it.createStatement(queryStatement).execute() }, - { it.close() } - ) - .index() - .map { (i, r) -> - require(i == 0L) { "More than one query found." } - r + queryStatement = this.resolveQueryStatement() } - @TestOnly - fun resolveQueryStatement(): String = kotlin.runCatching { - this.query.orElseGet { - this.queryFile.orElseGet { - this.queryFileBaseDir.resolve("${this.datasetId}-${this.role.name.lowercase()}.sql") - }.readText(Charsets.UTF_8) + companion object { + const val MIGRATION_KEY_COLUMN_NAME = "MigrationKey" } - } - .onFailure { e -> throw ConfigurationException("Cannot load query: ${e.message}") } - .getOrThrow() - val datasourceDescriptor: String - get() = "$role(ref=$datasourceRef)" -} + fun runQuery(): Flux = + Flux.usingWhen( + dbOperations.connectionFactory().create(), + { it.createStatement(queryStatement).execute() }, + { it.close() } + ) + .index() + .map { (i, r) -> + require(i == 0L) { "More than one query found." } + r + } + + @TestOnly + fun resolveQueryStatement(): String = + kotlin.runCatching { + this.query.orElseGet { + this.queryFile.orElseGet { + this.queryFileBaseDir.resolve("${this.datasetId}-${this.role.name.lowercase()}.sql") + }.readText(Charsets.UTF_8) + } + } + .onFailure { e -> throw ConfigurationException("Cannot load query: ${e.message}") } + .getOrThrow() + + val datasourceDescriptor: String + get() = "$role(ref=$datasourceRef)" + } enum class DataLoadRole { Source, Target } diff --git a/src/main/kotlin/recce/server/dataset/DatasetConfiguration.kt b/src/main/kotlin/recce/server/dataset/DatasetConfiguration.kt index c35fddc9..12391c08 100644 --- a/src/main/kotlin/recce/server/dataset/DatasetConfiguration.kt +++ b/src/main/kotlin/recce/server/dataset/DatasetConfiguration.kt @@ -17,7 +17,6 @@ class DatasetConfiguration( @Bindable(defaultValue = "") val schedule: Schedule = Schedule(), @Nullable val hashingStrategy: Optional = Optional.empty() ) : PostConstructable { - lateinit var id: String lateinit var defaults: DefaultsProvider diff --git a/src/main/kotlin/recce/server/dataset/DatasetRecScheduler.kt b/src/main/kotlin/recce/server/dataset/DatasetRecScheduler.kt index 5bf0d93b..434aff49 100644 --- a/src/main/kotlin/recce/server/dataset/DatasetRecScheduler.kt +++ b/src/main/kotlin/recce/server/dataset/DatasetRecScheduler.kt @@ -19,7 +19,6 @@ class DatasetRecScheduler( private val runner: DatasetRecRunner, @param:Named(TaskExecutors.SCHEDULED) private val scheduler: TaskScheduler ) : ApplicationEventListener { - override fun onApplicationEvent(event: ServerStartupEvent?) { logger.info { "Scheduling regular recs..." } config.datasets.values.forEach(::schedule) diff --git a/src/main/kotlin/recce/server/dataset/DatasetRecService.kt b/src/main/kotlin/recce/server/dataset/DatasetRecService.kt index 4d1166a4..290cfa28 100644 --- a/src/main/kotlin/recce/server/dataset/DatasetRecService.kt +++ b/src/main/kotlin/recce/server/dataset/DatasetRecService.kt @@ -34,8 +34,9 @@ open class DatasetRecService( override val availableDataSets = recConfig.datasets.values override fun runFor(datasetId: String): Mono { - val datasetConfig = recConfig.datasets[datasetId] - ?: throw IllegalArgumentException("Dataset definition [$datasetId] not found!") + val datasetConfig = + recConfig.datasets[datasetId] + ?: throw IllegalArgumentException("Dataset definition [$datasetId] not found!") logger.info { "Starting reconciliation run for [$datasetId]..." } @@ -69,7 +70,10 @@ open class DatasetRecService( .map { meta -> meta() } .doOnNext { logger.info { "Load from ${def.datasourceDescriptor} completed" } } - private fun saveSourceBatch(rows: List, run: RecRun): Flux { + private fun saveSourceBatch( + rows: List, + run: RecRun + ): Flux { val records = rows.map { RecRecord(key = RecRecordKey(run.id!!, it.migrationKey), sourceData = it.hashedValue) } return recordRepository .saveAll(records) @@ -77,36 +81,41 @@ open class DatasetRecService( .map { (i, _) -> rows[i.toInt()].lazyMeta() } } - private fun saveTargetBatch(rows: List, run: RecRun): Flux { + private fun saveTargetBatch( + rows: List, + run: RecRun + ): Flux { val toPersist = rows.associateByTo(mutableMapOf()) { it.migrationKey } - val updateExistingRecords = recordRepository - .findByRecRunIdAndMigrationKeyIn(run.id!!, rows.map { it.migrationKey }) - .map { it.apply { targetData = toPersist.remove(it.migrationKey)!!.hashedValue } } - .collectList() - .toFlux() - .flatMap { if (it.isEmpty()) Flux.empty() else recordRepository.updateAll(it) } - val saveNewRecords = Flux - .defer { Mono.just(toPersist.values) } - .map { hashedRows -> - hashedRows.map { row -> - RecRecord( - recRunId = run.id, - migrationKey = row.migrationKey, - targetData = row.hashedValue - ) - } - }.flatMap { if (it.isEmpty()) Flux.empty() else recordRepository.saveAll(it) } + val updateExistingRecords = + recordRepository + .findByRecRunIdAndMigrationKeyIn(run.id!!, rows.map { it.migrationKey }) + .map { it.apply { targetData = toPersist.remove(it.migrationKey)!!.hashedValue } } + .collectList() + .toFlux() + .flatMap { if (it.isEmpty()) Flux.empty() else recordRepository.updateAll(it) } + val saveNewRecords = + Flux.defer { Mono.just(toPersist.values) } + .map { hashedRows -> + hashedRows.map { row -> + RecRecord( + recRunId = run.id, + migrationKey = row.migrationKey, + targetData = row.hashedValue + ) + } + }.flatMap { if (it.isEmpty()) Flux.empty() else recordRepository.saveAll(it) } return updateExistingRecords.concatWith(saveNewRecords).map { rows.first().lazyMeta() } } - private fun generateMetadata(datasetConfig: DatasetConfiguration): Map = mapOf( - "sourceQuery" to datasetConfig.source.queryStatement, - "targetQuery" to datasetConfig.target.queryStatement, - "sourceUrl" to r2dbcConfig.getUrl(datasetConfig.source.datasourceRef), - "targetUrl" to r2dbcConfig.getUrl(datasetConfig.target.datasourceRef), - "version" to buildConfig.version - ) + private fun generateMetadata(datasetConfig: DatasetConfiguration): Map = + mapOf( + "sourceQuery" to datasetConfig.source.queryStatement, + "targetQuery" to datasetConfig.target.queryStatement, + "sourceUrl" to r2dbcConfig.getUrl(datasetConfig.source.datasourceRef), + "targetUrl" to r2dbcConfig.getUrl(datasetConfig.target.datasourceRef), + "version" to buildConfig.version + ) } class DataLoadException(message: String, cause: Throwable) : Exception(message, cause) diff --git a/src/main/kotlin/recce/server/dataset/HashingStrategy.kt b/src/main/kotlin/recce/server/dataset/HashingStrategy.kt index afc4501b..4ae686a1 100644 --- a/src/main/kotlin/recce/server/dataset/HashingStrategy.kt +++ b/src/main/kotlin/recce/server/dataset/HashingStrategy.kt @@ -5,14 +5,19 @@ import com.google.common.hash.Hashing import io.r2dbc.spi.ColumnMetadata import io.r2dbc.spi.Row import io.r2dbc.spi.RowMetadata -import recce.server.dataset.DataLoadDefinition.Companion.migrationKeyColumnName +import recce.server.dataset.DataLoadDefinition.Companion.MIGRATION_KEY_COLUMN_NAME import java.math.BigDecimal import java.nio.ByteBuffer @Suppress("UnstableApiUsage") enum class HashingStrategy { TypeLenient { - override fun hashCol(hasher: Hasher, index: Int, colMeta: ColumnMetadata, col: Any?) { + override fun hashCol( + hasher: Hasher, + index: Int, + colMeta: ColumnMetadata, + col: Any? + ) { when (col) { null -> {} is Boolean -> hasher.putLong(if (col) 1 else 0) @@ -36,7 +41,12 @@ enum class HashingStrategy { } }, TypeStrict { - override fun hashCol(hasher: Hasher, index: Int, colMeta: ColumnMetadata, col: Any?) { + override fun hashCol( + hasher: Hasher, + index: Int, + colMeta: ColumnMetadata, + col: Any? + ) { when (col) { null -> hasher.putString("${colMeta.javaType.simpleName}(NULL)", Charsets.UTF_8) is Boolean -> hasher.putBoolean(col) @@ -60,19 +70,27 @@ enum class HashingStrategy { } }; - protected abstract fun hashCol(hasher: Hasher, index: Int, colMeta: ColumnMetadata, col: Any?) + protected abstract fun hashCol( + hasher: Hasher, + index: Int, + colMeta: ColumnMetadata, + col: Any? + ) - fun hash(row: Row, meta: RowMetadata): HashedRow { + fun hash( + row: Row, + meta: RowMetadata + ): HashedRow { var migrationKey: String? = null val hasher = Hashing.sha256().newHasher() fun trySetMigrationKey(col: Any?) { when { col == null -> throw IllegalArgumentException( - "$migrationKeyColumnName has null value somewhere in dataset" + "$MIGRATION_KEY_COLUMN_NAME has null value somewhere in dataset" ) migrationKey != null -> throw IllegalArgumentException( - "More than one column named $migrationKeyColumnName found in dataset" + "More than one column named $MIGRATION_KEY_COLUMN_NAME found in dataset" ) else -> migrationKey = col.toString() } @@ -80,7 +98,7 @@ enum class HashingStrategy { meta.columnMetadatas .forEachIndexed { i, colMeta -> - if (colMeta.name.equals(migrationKeyColumnName, ignoreCase = true)) { + if (colMeta.name.equals(MIGRATION_KEY_COLUMN_NAME, ignoreCase = true)) { trySetMigrationKey(row[i]) } else { hashCol(hasher, i, colMeta, row[i]) @@ -91,7 +109,7 @@ enum class HashingStrategy { return HashedRow( migrationKey ?: throw IllegalArgumentException( - "No column named $migrationKeyColumnName found in dataset" + "No column named $MIGRATION_KEY_COLUMN_NAME found in dataset" ), hasher.hash().toString(), meta.columnMetadatas diff --git a/src/main/kotlin/recce/server/recrun/RecRecordRepository.kt b/src/main/kotlin/recce/server/recrun/RecRecordRepository.kt index be8df342..90452929 100644 --- a/src/main/kotlin/recce/server/recrun/RecRecordRepository.kt +++ b/src/main/kotlin/recce/server/recrun/RecRecordRepository.kt @@ -15,7 +15,11 @@ import kotlin.reflect.KMutableProperty1 // Declared as an interface to make it possible to replace the bean with a mock in tests // The replacement doesn't seem to work with Micronaut Test with an abstract class interface RecRecordRepository : ReactorCrudRepository { - fun findByRecRunIdAndMigrationKeyIn(recRunId: Int, migrationKeys: List): Flux + fun findByRecRunIdAndMigrationKeyIn( + recRunId: Int, + migrationKeys: List + ): Flux + fun findByRecRunId(recRunId: Int): Flux @Query( @@ -27,13 +31,16 @@ interface RecRecordRepository : ReactorCrudRepository { (SELECT * FROM reconciliation_record r WHERE r.reconciliation_run_id = :recRunId AND r.source_data <> r.target_data LIMIT :limit) """ ) - fun findFirstByRecRunIdSplitByMatchStatus(recRunId: Int, limit: Int = 10): Flux + fun findFirstByRecRunIdSplitByMatchStatus( + recRunId: Int, + limit: Int = 10 + ): Flux + fun countMatchedByKeyRecRunId(recRunId: Int): Mono } @R2dbcRepository(dialect = Dialect.POSTGRES) internal abstract class AbstractRecRecordRepository(private val operations: R2dbcOperations) : RecRecordRepository { - override fun countMatchedByKeyRecRunId(recRunId: Int): Mono { return operations.withConnection { it.createStatement(countRecordsByStatus).bind("$1", recRunId).execute() @@ -47,37 +54,39 @@ internal abstract class AbstractRecRecordRepository(private val operations: R2db } private fun matchStatusSetterFor(row: Row): (MatchStatus) -> Unit { - val count = row.get(countColumnName, Number::class.java)?.toInt() - ?: throw IllegalArgumentException("Missing [$countColumnName] column!") + val count = + row.get(COUNT_COLUMN_NAME, Number::class.java)?.toInt() + ?: throw IllegalArgumentException("Missing [$COUNT_COLUMN_NAME] column!") - val recordMatchStatus = RecordMatchStatus.valueOf( - row.get(statusColumnName, String::class.java) - ?: throw IllegalArgumentException("Missing [$statusColumnName] column!") - ) + val recordMatchStatus = + RecordMatchStatus.valueOf( + row.get(STATUS_COLUMN_NAME, String::class.java) + ?: throw IllegalArgumentException("Missing [$STATUS_COLUMN_NAME] column!") + ) return { st -> recordMatchStatus.setter(st, count) } } companion object { - private const val statusColumnName = "match_status" - private const val countColumnName = "count" + private const val STATUS_COLUMN_NAME = "match_status" + private const val COUNT_COLUMN_NAME = "count" private val countRecordsByStatus = """ - WITH matching_data AS - (SELECT migration_key, - CASE - WHEN target_data IS NULL THEN '${RecordMatchStatus.SourceOnly}' - WHEN source_data IS NULL THEN '${RecordMatchStatus.TargetOnly}' - WHEN source_data = target_data THEN '${RecordMatchStatus.BothMatched}' - ELSE '${RecordMatchStatus.BothMismatched}' - END AS $statusColumnName - FROM reconciliation_record - WHERE reconciliation_run_id = $1) - SELECT $statusColumnName, count(*) AS "$countColumnName" - FROM matching_data - GROUP BY $statusColumnName; + WITH matching_data AS + (SELECT migration_key, + CASE + WHEN target_data IS NULL THEN '${RecordMatchStatus.SourceOnly}' + WHEN source_data IS NULL THEN '${RecordMatchStatus.TargetOnly}' + WHEN source_data = target_data THEN '${RecordMatchStatus.BothMatched}' + ELSE '${RecordMatchStatus.BothMismatched}' + END AS $STATUS_COLUMN_NAME + FROM reconciliation_record + WHERE reconciliation_run_id = $1) + SELECT $STATUS_COLUMN_NAME, count(*) AS "$COUNT_COLUMN_NAME" + FROM matching_data + GROUP BY $STATUS_COLUMN_NAME; """.trimIndent() } } @@ -110,12 +119,13 @@ enum class RecordMatchStatus(val setter: KMutableProperty1.Setter SourceOnly - record.sourceData == null -> TargetOnly - record.sourceData == record.targetData -> BothMatched - else -> BothMismatched - } + fun from(record: RecRecord) = + when { + record.targetData == null -> SourceOnly + record.sourceData == null -> TargetOnly + record.sourceData == record.targetData -> BothMatched + else -> BothMismatched + } } } diff --git a/src/main/kotlin/recce/server/recrun/RecRunRepository.kt b/src/main/kotlin/recce/server/recrun/RecRunRepository.kt index b695c94f..fe3dd130 100644 --- a/src/main/kotlin/recce/server/recrun/RecRunRepository.kt +++ b/src/main/kotlin/recce/server/recrun/RecRunRepository.kt @@ -46,7 +46,10 @@ data class RecRun( @field:TypeDef(type = DataType.JSON) var metadata: Map = emptyMap() - fun withMetaData(source: DatasetMeta, target: DatasetMeta): RecRun { + fun withMetaData( + source: DatasetMeta, + target: DatasetMeta + ): RecRun { sourceMeta = source targetMeta = target return this @@ -115,11 +118,17 @@ data class ColMeta(val name: String, val javaType: String) class ColsConverter : AttributeConverter, String?> { private val objectMapper = jacksonObjectMapper() - override fun convertToPersistedValue(entityValue: List?, context: ConversionContext): String? { + override fun convertToPersistedValue( + entityValue: List?, + context: ConversionContext + ): String? { return objectMapper.writeValueAsString(entityValue ?: emptyList()) } - override fun convertToEntityValue(persistedValue: String?, context: ConversionContext): List? { + override fun convertToEntityValue( + persistedValue: String?, + context: ConversionContext + ): List? { return if (persistedValue == null) emptyList() else objectMapper.readValue(persistedValue) } } diff --git a/src/main/kotlin/recce/server/recrun/RecRunService.kt b/src/main/kotlin/recce/server/recrun/RecRunService.kt index f4d3b3d2..dad23f40 100644 --- a/src/main/kotlin/recce/server/recrun/RecRunService.kt +++ b/src/main/kotlin/recce/server/recrun/RecRunService.kt @@ -11,10 +11,14 @@ open class RecRunService( private val runRepository: RecRunRepository, private val recordRepository: RecRecordRepository ) { - fun start(datasetId: String, metadata: Map): Mono = runRepository - .save(RecRun(datasetId).apply { this.metadata = metadata }) - .doOnNext { logger.info { "Starting reconciliation run for $it}..." } } - .cache() + fun start( + datasetId: String, + metadata: Map + ): Mono = + runRepository + .save(RecRun(datasetId).apply { this.metadata = metadata }) + .doOnNext { logger.info { "Starting reconciliation run for $it}..." } } + .cache() fun successful(run: RecRun): Mono { logger.info { "Summarising results for $run" } @@ -24,7 +28,10 @@ open class RecRunService( .doOnNext { logger.info { "Run completed for $it" } } } - fun failed(run: RecRun, cause: Throwable): Mono { + fun failed( + run: RecRun, + cause: Throwable + ): Mono { logger.info(cause) { "Recording failure for $run" } return runRepository.update(run.asFailed(cause)) } diff --git a/src/test/kotlin/recce/server/R2dbcConfigurationTest.kt b/src/test/kotlin/recce/server/R2dbcConfigurationTest.kt index 241f7d35..37eea81c 100644 --- a/src/test/kotlin/recce/server/R2dbcConfigurationTest.kt +++ b/src/test/kotlin/recce/server/R2dbcConfigurationTest.kt @@ -7,11 +7,11 @@ import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.jupiter.api.Test internal class R2dbcConfigurationTest { - - private val properties = mapOf( - "r2dbc.datasources.source.url" to "r2dbc:h2:mem:///sourceDb;LOCK_TIMEOUT=10000;DB_CLOSE_ON_EXIT=FALSE", - "r2dbc.datasources.target.url" to "r2dbc:h2:mem:///targetDb;LOCK_TIMEOUT=10000;DB_CLOSE_ON_EXIT=FALSE" - ) + private val properties = + mapOf( + "r2dbc.datasources.source.url" to "r2dbc:h2:mem:///sourceDb;LOCK_TIMEOUT=10000;DB_CLOSE_ON_EXIT=FALSE", + "r2dbc.datasources.target.url" to "r2dbc:h2:mem:///targetDb;LOCK_TIMEOUT=10000;DB_CLOSE_ON_EXIT=FALSE" + ) @Test fun `retrieve url for valid data sources`() { diff --git a/src/test/kotlin/recce/server/RecConfigurationPropertiesTest.kt b/src/test/kotlin/recce/server/RecConfigurationPropertiesTest.kt index c73d5da3..e5f112ab 100644 --- a/src/test/kotlin/recce/server/RecConfigurationPropertiesTest.kt +++ b/src/test/kotlin/recce/server/RecConfigurationPropertiesTest.kt @@ -14,18 +14,18 @@ import kotlin.io.path.Path // Faster tests that do not load the full configuration and are quicker to iterate on when testing // configuration binding internal class RecConfigurationPropertiesTest { - - private val properties = mutableMapOf( - "flyway.datasources.default.enabled" to "false", - "r2dbc.datasources.source.url" to "r2dbc:h2:mem:///sourceDb;LOCK_TIMEOUT=10000;DB_CLOSE_ON_EXIT=FALSE", - "r2dbc.datasources.target.url" to "r2dbc:h2:mem:///targetDb;LOCK_TIMEOUT=10000;DB_CLOSE_ON_EXIT=FALSE", - "reconciliation.datasets.test-dataset.hashingStrategy" to "TypeStrict", - "reconciliation.datasets.test-dataset.schedule.cronExpression" to "0 0 0 ? * *", - "reconciliation.datasets.test-dataset.source.datasourceRef" to "source", - "reconciliation.datasets.test-dataset.source.query" to "SELECT count(*) AS sourcedatacount FROM testdata", - "reconciliation.datasets.test-dataset.target.datasourceRef" to "target", - "reconciliation.datasets.test-dataset.target.query" to "SELECT count(*) AS targetdatacount FROM testdata" - ) + private val properties = + mutableMapOf( + "flyway.datasources.default.enabled" to "false", + "r2dbc.datasources.source.url" to "r2dbc:h2:mem:///sourceDb;LOCK_TIMEOUT=10000;DB_CLOSE_ON_EXIT=FALSE", + "r2dbc.datasources.target.url" to "r2dbc:h2:mem:///targetDb;LOCK_TIMEOUT=10000;DB_CLOSE_ON_EXIT=FALSE", + "reconciliation.datasets.test-dataset.hashingStrategy" to "TypeStrict", + "reconciliation.datasets.test-dataset.schedule.cronExpression" to "0 0 0 ? * *", + "reconciliation.datasets.test-dataset.source.datasourceRef" to "source", + "reconciliation.datasets.test-dataset.source.query" to "SELECT count(*) AS sourcedatacount FROM testdata", + "reconciliation.datasets.test-dataset.target.datasourceRef" to "target", + "reconciliation.datasets.test-dataset.target.query" to "SELECT count(*) AS targetdatacount FROM testdata" + ) @Test fun `can override defaults from config`() { diff --git a/src/test/kotlin/recce/server/RecConfigurationTest.kt b/src/test/kotlin/recce/server/RecConfigurationTest.kt index 103ea594..c2e78f71 100644 --- a/src/test/kotlin/recce/server/RecConfigurationTest.kt +++ b/src/test/kotlin/recce/server/RecConfigurationTest.kt @@ -11,7 +11,6 @@ import org.junit.jupiter.api.Test transactional = false ) internal class RecConfigurationTest { - @Inject lateinit var config: RecConfiguration diff --git a/src/test/kotlin/recce/server/RecceServerTest.kt b/src/test/kotlin/recce/server/RecceServerTest.kt index 033ab805..f8a45e36 100644 --- a/src/test/kotlin/recce/server/RecceServerTest.kt +++ b/src/test/kotlin/recce/server/RecceServerTest.kt @@ -8,7 +8,6 @@ import org.junit.jupiter.api.Test @MicronautTest(transactional = false) class RecceServerTest { - @Inject lateinit var application: EmbeddedApplication<*> diff --git a/src/test/kotlin/recce/server/TestConfig.kt b/src/test/kotlin/recce/server/TestConfig.kt index f1004234..c51fdb55 100644 --- a/src/test/kotlin/recce/server/TestConfig.kt +++ b/src/test/kotlin/recce/server/TestConfig.kt @@ -16,8 +16,9 @@ class TestConfig { @Singleton fun restAssuredSpec(server: EmbeddedServer): RequestSpecification { // Yes, this is static configuration; seems no other way to do it with RestAssured - RestAssured.config = RestAssured.config() - .jsonConfig(JsonConfig.jsonConfig().numberReturnType(JsonPathConfig.NumberReturnType.DOUBLE)) + RestAssured.config = + RestAssured.config() + .jsonConfig(JsonConfig.jsonConfig().numberReturnType(JsonPathConfig.NumberReturnType.DOUBLE)) return RequestSpecBuilder() .setContentType(ContentType.JSON) diff --git a/src/test/kotlin/recce/server/api/DatasetControllerTest.kt b/src/test/kotlin/recce/server/api/DatasetControllerTest.kt index b36fbbc1..3a16baa6 100644 --- a/src/test/kotlin/recce/server/api/DatasetControllerTest.kt +++ b/src/test/kotlin/recce/server/api/DatasetControllerTest.kt @@ -26,22 +26,31 @@ import java.time.format.DateTimeFormatter import java.time.temporal.ChronoUnit import java.util.* -val configProvider = mock { - on { availableDataSets } doReturn setOf( - DatasetConfiguration( - DataLoadDefinition("source1", Optional.of("SELECT name AS MigrationKey, name, value FROM testdata")), - DataLoadDefinition("target1", Optional.of("SELECT name AS MigrationKey, name, value FROM testdata")) - ).apply { id = "two" }, - DatasetConfiguration( - DataLoadDefinition("source2", Optional.of("SELECT name AS MigrationKey, name, value FROM testdata")), - DataLoadDefinition("target2", Optional.of("SELECT name AS MigrationKey, name, value FROM testdata")), - Schedule("0 0 0 ? * *") - ).apply { id = "datasets" } - ) -} +val configProvider = + mock { + on { availableDataSets } doReturn + setOf( + DatasetConfiguration( + DataLoadDefinition( + "source1", Optional.of("SELECT name AS MigrationKey, name, value FROM testdata") + ), + DataLoadDefinition( + "target1", Optional.of("SELECT name AS MigrationKey, name, value FROM testdata") + ) + ).apply { id = "two" }, + DatasetConfiguration( + DataLoadDefinition( + "source2", Optional.of("SELECT name AS MigrationKey, name, value FROM testdata") + ), + DataLoadDefinition( + "target2", Optional.of("SELECT name AS MigrationKey, name, value FROM testdata") + ), + Schedule("0 0 0 ? * *") + ).apply { id = "datasets" } + ) + } internal class DatasetControllerTest { - @Test fun `should retrieve empty dataset Ids`() { assertThat(DatasetController(mock()).getDatasets()) @@ -80,7 +89,6 @@ internal class DatasetControllerTest { @MicronautTest(transactional = false) internal class DatasetControllerApiTest { - @Inject lateinit var spec: RequestSpecification diff --git a/src/test/kotlin/recce/server/api/DatasetRecRunControllerTest.kt b/src/test/kotlin/recce/server/api/DatasetRecRunControllerTest.kt index 7552d70c..6d8aaa75 100644 --- a/src/test/kotlin/recce/server/api/DatasetRecRunControllerTest.kt +++ b/src/test/kotlin/recce/server/api/DatasetRecRunControllerTest.kt @@ -40,38 +40,41 @@ private const val SAMPLE_KEYS_LIMIT = 3 private const val NOT_FOUND_ID = 0 private const val TEST_DATASET_ID = "testDataset" private val TEST_COMPLETED_DURATION = Duration.ofMinutes(3).plusNanos(234) -private val TEST_RESULTS = RecRun( - id = 12, - datasetId = TEST_DATASET_ID, - createdTime = LocalDateTime.of(2021, 10, 25, 16, 16, 16).toInstant(ZoneOffset.UTC) -).apply { - completedTime = createdTime?.plusNanos(TEST_COMPLETED_DURATION.toNanos()) - status = RunStatus.Successful - updatedTime = completedTime?.plusSeconds(10) - sourceMeta = DatasetMeta(listOf(recce.server.recrun.ColMeta("test1", "String"))) - targetMeta = DatasetMeta(listOf(recce.server.recrun.ColMeta("test1", "String"))) - summary = MatchStatus(1, 2, 3, 4) - metadata = mapOf("sourceQuery" to "mockQuery", "targetQuery" to "mockQuery") -} +private val TEST_RESULTS = + RecRun( + id = 12, + datasetId = TEST_DATASET_ID, + createdTime = LocalDateTime.of(2021, 10, 25, 16, 16, 16).toInstant(ZoneOffset.UTC) + ).apply { + completedTime = createdTime?.plusNanos(TEST_COMPLETED_DURATION.toNanos()) + status = RunStatus.Successful + updatedTime = completedTime?.plusSeconds(10) + sourceMeta = DatasetMeta(listOf(recce.server.recrun.ColMeta("test1", "String"))) + targetMeta = DatasetMeta(listOf(recce.server.recrun.ColMeta("test1", "String"))) + summary = MatchStatus(1, 2, 3, 4) + metadata = mapOf("sourceQuery" to "mockQuery", "targetQuery" to "mockQuery") + } -private fun mockService() = mock { - on { runFor(eq(TEST_DATASET_ID)) } doReturn Mono.just(TEST_RESULTS) -} +private fun mockService() = + mock { on { runFor(eq(TEST_DATASET_ID)) } doReturn Mono.just(TEST_RESULTS) } -private fun mockRunRepository() = mock { - on { findById(TEST_RESULTS.id!!) } doReturn Mono.just(TEST_RESULTS) - on { existsById(TEST_RESULTS.id!!) } doReturn Mono.just(true) - on { findById(NOT_FOUND_ID) } doReturn Mono.empty() - on { existsById(NOT_FOUND_ID) } doReturn Mono.just(false) - on { findTop10ByDatasetIdOrderByCompletedTimeDesc(TEST_DATASET_ID) } doReturn Flux.just(TEST_RESULTS, TEST_RESULTS) -} +private fun mockRunRepository() = + mock { + on { findById(TEST_RESULTS.id!!) } doReturn Mono.just(TEST_RESULTS) + on { existsById(TEST_RESULTS.id!!) } doReturn Mono.just(true) + on { findById(NOT_FOUND_ID) } doReturn Mono.empty() + on { existsById(NOT_FOUND_ID) } doReturn Mono.just(false) + on { findTop10ByDatasetIdOrderByCompletedTimeDesc(TEST_DATASET_ID) } doReturn + Flux.just(TEST_RESULTS, TEST_RESULTS) + } -private fun mockRecordRepository(sampleRecords: List) = mock { - on { findFirstByRecRunIdSplitByMatchStatus(TEST_RESULTS.id!!, SAMPLE_KEYS_LIMIT) } doReturn Flux.fromIterable( - sampleRecords - ) - on { findFirstByRecRunIdSplitByMatchStatus(NOT_FOUND_ID, SAMPLE_KEYS_LIMIT) } doReturn Flux.empty() -} +private fun mockRecordRepository(sampleRecords: List) = + mock { + on { findFirstByRecRunIdSplitByMatchStatus(TEST_RESULTS.id!!, SAMPLE_KEYS_LIMIT) } doReturn + Flux.fromIterable(sampleRecords) + on { findFirstByRecRunIdSplitByMatchStatus(NOT_FOUND_ID, SAMPLE_KEYS_LIMIT) } doReturn + Flux.empty() + } internal class DatasetRecRunControllerTest { private val sampleRows = @@ -157,11 +160,12 @@ internal class DatasetRecRunControllerTest { @Test fun `failed run should return with cause`() { val failureCause = DataLoadException("Could not load data", IllegalArgumentException("Root Cause")) - val failedRun = RecRun( - id = TEST_RESULTS.id, - datasetId = TEST_DATASET_ID, - createdTime = TEST_RESULTS.createdTime - ).asFailed(failureCause) + val failedRun = + RecRun( + id = TEST_RESULTS.id, + datasetId = TEST_DATASET_ID, + createdTime = TEST_RESULTS.createdTime + ).asFailed(failureCause) whenever(service.runFor(TEST_DATASET_ID)).doReturn(Mono.just(failedRun)) @@ -182,7 +186,6 @@ internal class DatasetRecRunControllerTest { @MicronautTest(transactional = false) internal class DatasetRecRunControllerApiTest { - private val sampleRows = List(1) { RecRecord(RecRecordKey(TEST_RESULTS.id!!, "source-$it"), sourceData = "set") } + List(1) { RecRecord(RecRecordKey(TEST_RESULTS.id!!, "target-$it"), targetData = "set") } + diff --git a/src/test/kotlin/recce/server/api/RunApiModelTest.kt b/src/test/kotlin/recce/server/api/RunApiModelTest.kt index bffaae00..0bec0373 100644 --- a/src/test/kotlin/recce/server/api/RunApiModelTest.kt +++ b/src/test/kotlin/recce/server/api/RunApiModelTest.kt @@ -6,7 +6,6 @@ import recce.server.recrun.RecRun import java.time.Instant internal class RunApiModelTest { - @Test fun `incomplete runs don't have duration`() { Assertions.assertThat(RunApiModel.Builder(RecRun(1, "empty", Instant.now())).build().completedDuration) diff --git a/src/test/kotlin/recce/server/auth/AuthConfigurationTest.kt b/src/test/kotlin/recce/server/auth/AuthConfigurationTest.kt index 1e099f60..486f9ebd 100644 --- a/src/test/kotlin/recce/server/auth/AuthConfigurationTest.kt +++ b/src/test/kotlin/recce/server/auth/AuthConfigurationTest.kt @@ -6,11 +6,8 @@ import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.jupiter.api.Test internal class AuthConfigurationTest { - - private val properties = mutableMapOf( - "auth.username" to "test-user", - "auth.password" to "test-password" - ) + private val properties = + mutableMapOf("auth.username" to "test-user", "auth.password" to "test-password") @Test fun `can override defaults from config`() { diff --git a/src/test/kotlin/recce/server/auth/BasicAuthenticationProviderTest.kt b/src/test/kotlin/recce/server/auth/BasicAuthenticationProviderTest.kt index 899c90d3..82979322 100644 --- a/src/test/kotlin/recce/server/auth/BasicAuthenticationProviderTest.kt +++ b/src/test/kotlin/recce/server/auth/BasicAuthenticationProviderTest.kt @@ -11,7 +11,6 @@ import org.junit.jupiter.api.Test @MicronautTest(transactional = false) class BasicAuthenticationProviderTest { - @Inject lateinit var spec: RequestSpecification diff --git a/src/test/kotlin/recce/server/dataset/DataLoadDefinitionTest.kt b/src/test/kotlin/recce/server/dataset/DataLoadDefinitionTest.kt index 7e5eb7b2..87b0419a 100644 --- a/src/test/kotlin/recce/server/dataset/DataLoadDefinitionTest.kt +++ b/src/test/kotlin/recce/server/dataset/DataLoadDefinitionTest.kt @@ -32,9 +32,10 @@ internal class DataLoadDefinitionTest { private lateinit var definitionQuery: DataLoadDefinition - private val mockConnection: Connection = mock { - on { close() } doReturn Mono.empty() - } + private val mockConnection: Connection = + mock { + on { close() } doReturn Mono.empty() + } @BeforeEach fun setUp() { @@ -51,10 +52,13 @@ internal class DataLoadDefinitionTest { val defaultsProvider = mock() whenever(defaultsProvider.queryFileBaseDir).thenReturn(Path(testQueryFileBaseDir)) - val beanLocator = mock { - on { findBean(any>(), eq(Qualifiers.byName(testSourceName))) } doReturn Optional.of(operations) - on { findBean(DefaultsProvider::class.java) } doReturn Optional.of(defaultsProvider) - } + val beanLocator = + mock { + on { findBean(any>(), eq(Qualifiers.byName(testSourceName))) } doReturn + Optional.of(operations) + on { findBean(DefaultsProvider::class.java) } doReturn + Optional.of(defaultsProvider) + } definitionQuery.populate(beanLocator) @@ -71,10 +75,13 @@ internal class DataLoadDefinitionTest { val defaultsProvider = mock() whenever(defaultsProvider.queryFileBaseDir).thenReturn(Path(testQueryFileBaseDir)) - val beanLocator = mock { - on { findBean(any>(), eq(Qualifiers.byName(testSourceName))) } doReturn Optional.of(operations) - on { findBean(DefaultsProvider::class.java) } doReturn Optional.of(defaultsProvider) - } + val beanLocator = + mock { + on { findBean(any>(), eq(Qualifiers.byName(testSourceName))) } doReturn + Optional.of(operations) + on { findBean(DefaultsProvider::class.java) } doReturn + Optional.of(defaultsProvider) + } definitionQuery.populate(beanLocator) @@ -92,10 +99,13 @@ internal class DataLoadDefinitionTest { val defaultsProvider = mock() whenever(defaultsProvider.queryFileBaseDir).thenReturn(Path(testQueryFileBaseDir)) - val beanLocator = mock { - on { findBean(any>(), eq(Qualifiers.byName(testSourceName))) } doReturn Optional.of(operations) - on { findBean(DefaultsProvider::class.java) } doReturn Optional.of(defaultsProvider) - } + val beanLocator = + mock { + on { findBean(any>(), eq(Qualifiers.byName(testSourceName))) } doReturn + Optional.of(operations) + on { findBean(DefaultsProvider::class.java) } doReturn + Optional.of(defaultsProvider) + } definitionQuery.populate(beanLocator) @@ -124,10 +134,13 @@ internal class DataLoadDefinitionTest { val defaultsProvider = mock() whenever(defaultsProvider.queryFileBaseDir).thenReturn(Path("blah")) - val beanLocator = mock { - on { findBean(any>(), eq(Qualifiers.byName(testSourceName))) } doReturn Optional.of(operations) - on { findBean(DefaultsProvider::class.java) } doReturn Optional.of(defaultsProvider) - } + val beanLocator = + mock { + on { findBean(any>(), eq(Qualifiers.byName(testSourceName))) } doReturn + Optional.of(operations) + on { findBean(DefaultsProvider::class.java) } doReturn + Optional.of(defaultsProvider) + } assertThatThrownBy { definitionQuery.populate(beanLocator) } .isExactlyInstanceOf(ConfigurationException::class.java) @@ -140,10 +153,13 @@ internal class DataLoadDefinitionTest { val defaultsProvider = mock() whenever(defaultsProvider.queryFileBaseDir).thenReturn(Path(testQueryFileBaseDir)) - val beanLocator = mock { - on { findBean(any>(), eq(Qualifiers.byName(testSourceName))) } doReturn Optional.of(operations) - on { findBean(DefaultsProvider::class.java) } doReturn Optional.of(defaultsProvider) - } + val beanLocator = + mock { + on { findBean(any>(), eq(Qualifiers.byName(testSourceName))) } doReturn + Optional.of(operations) + on { findBean(DefaultsProvider::class.java) } doReturn + Optional.of(defaultsProvider) + } definitionQuery.populate(beanLocator) @@ -167,13 +183,15 @@ internal class DataLoadDefinitionTest { fun `should stream rows from query`() { val result = mock() - val statement: Statement = mock { - on { execute() } doReturn Mono.just(result) - } + val statement: Statement = + mock { + on { execute() } doReturn Mono.just(result) + } - definitionQuery.dbOperations = mock(defaultAnswer = Answers.RETURNS_DEEP_STUBS) { - on { connectionFactory().create() } doReturn Mono.just(mockConnection) - } + definitionQuery.dbOperations = + mock(defaultAnswer = Answers.RETURNS_DEEP_STUBS) { + on { connectionFactory().create() } doReturn Mono.just(mockConnection) + } definitionQuery.queryStatement = testQuery @@ -193,9 +211,10 @@ internal class DataLoadDefinitionTest { @Test fun `should close connection after failed query`() { - definitionQuery.dbOperations = mock(defaultAnswer = Answers.RETURNS_DEEP_STUBS) { - on { connectionFactory().create() } doReturn Mono.just(mockConnection) - } + definitionQuery.dbOperations = + mock(defaultAnswer = Answers.RETURNS_DEEP_STUBS) { + on { connectionFactory().create() } doReturn Mono.just(mockConnection) + } definitionQuery.queryStatement = testQuery @@ -218,13 +237,15 @@ internal class DataLoadDefinitionTest { fun `should fail on query with more than one statement`() { val result = mock() - val statement: Statement = mock { - on { execute() } doReturn Flux.just(result, result) - } + val statement: Statement = + mock { + on { execute() } doReturn Flux.just(result, result) + } - definitionQuery.dbOperations = mock(defaultAnswer = Answers.RETURNS_DEEP_STUBS) { - on { connectionFactory().create() } doReturn Mono.just(mockConnection) - } + definitionQuery.dbOperations = + mock(defaultAnswer = Answers.RETURNS_DEEP_STUBS) { + on { connectionFactory().create() } doReturn Mono.just(mockConnection) + } definitionQuery.queryStatement = testQuery diff --git a/src/test/kotlin/recce/server/dataset/DatasetConfigurationTest.kt b/src/test/kotlin/recce/server/dataset/DatasetConfigurationTest.kt index 805b7b0d..f5d74c63 100644 --- a/src/test/kotlin/recce/server/dataset/DatasetConfigurationTest.kt +++ b/src/test/kotlin/recce/server/dataset/DatasetConfigurationTest.kt @@ -5,7 +5,6 @@ import org.junit.jupiter.api.Test import java.util.* internal class DatasetConfigurationTest { - private val conf = DatasetConfiguration( DataLoadDefinition("source", Optional.of("blah")), diff --git a/src/test/kotlin/recce/server/dataset/DatasetRecSchedulerTest.kt b/src/test/kotlin/recce/server/dataset/DatasetRecSchedulerTest.kt index e7ad8269..f65897cd 100644 --- a/src/test/kotlin/recce/server/dataset/DatasetRecSchedulerTest.kt +++ b/src/test/kotlin/recce/server/dataset/DatasetRecSchedulerTest.kt @@ -19,9 +19,10 @@ import java.util.concurrent.TimeUnit internal class DatasetRecSchedulerTest { private val testDataset = "test-dataset" - private val runner = mock { - on { runFor(testDataset) } doReturn Mono.just(RecRun(testDataset)) - } + private val runner = + mock { + on { runFor(testDataset) } doReturn Mono.just(RecRun(testDataset)) + } private val scheduler = mock() @@ -35,10 +36,11 @@ internal class DatasetRecSchedulerTest { @Test fun `schedules a single run`() { val cronExpression = "0 0 * * *" - val datasetConfig = mock { - on { id } doReturn testDataset - on { schedule } doReturn Schedule(cronExpression) - } + val datasetConfig = + mock { + on { id } doReturn testDataset + on { schedule } doReturn Schedule(cronExpression) + } val config = RecConfiguration(mapOf(testDataset to datasetConfig)) @@ -54,15 +56,15 @@ internal class DatasetRecSchedulerTest { } internal class DatasetRecSchedulerIntegrationTest { - private val everySecond = "* * * * * *" - private val items = mutableMapOf( - "reconciliation.datasets.test-dataset.schedule.cronExpression" to everySecond, - "reconciliation.datasets.test-dataset.source.datasourceRef" to "default", - "reconciliation.datasets.test-dataset.source.query" to "SELECT id AS MigrationKey FROM reconciliation_run", - "reconciliation.datasets.test-dataset.target.datasourceRef" to "default", - "reconciliation.datasets.test-dataset.target.query" to "SELECT id AS MigrationKey FROM reconciliation_run" - ) + private val items = + mutableMapOf( + "reconciliation.datasets.test-dataset.schedule.cronExpression" to everySecond, + "reconciliation.datasets.test-dataset.source.datasourceRef" to "default", + "reconciliation.datasets.test-dataset.source.query" to "SELECT id AS MigrationKey FROM reconciliation_run", + "reconciliation.datasets.test-dataset.target.datasourceRef" to "default", + "reconciliation.datasets.test-dataset.target.query" to "SELECT id AS MigrationKey FROM reconciliation_run" + ) private lateinit var ctx: ApplicationContext diff --git a/src/test/kotlin/recce/server/dataset/DatasetRecServiceCrossDatabaseIntegrationTest.kt b/src/test/kotlin/recce/server/dataset/DatasetRecServiceCrossDatabaseIntegrationTest.kt index 35188b61..a62fd89d 100644 --- a/src/test/kotlin/recce/server/dataset/DatasetRecServiceCrossDatabaseIntegrationTest.kt +++ b/src/test/kotlin/recce/server/dataset/DatasetRecServiceCrossDatabaseIntegrationTest.kt @@ -30,37 +30,40 @@ internal open class DatasetRecServiceCrossDatabaseIntegrationTest { /** * Databases which we expect to produce matching hashes for values of similar types. */ - private val databases: Map> = buildMap { - put("mysql", MySQLContainer("mysql:8")) - put("mariadb", MariaDBContainer("mariadb:10")) - put("postgres", PostgreSQLContainer("postgres:14-alpine")) - if (!System.getProperty("os.arch").contains(Regex("arm64|aarch64"))) { - put( - "mssql", - MSSQLServerContainer("mcr.microsoft.com/mssql/server:2019-latest").acceptLicense() - ) + private val databases: Map> = + buildMap { + put("mysql", MySQLContainer("mysql:8")) + put("mariadb", MariaDBContainer("mariadb:10")) + put("postgres", PostgreSQLContainer("postgres:14-alpine")) + if (!System.getProperty("os.arch").contains(Regex("arm64|aarch64"))) { + put( + "mssql", + MSSQLServerContainer("mcr.microsoft.com/mssql/server:2019-latest").acceptLicense() + ) + } } - } /** * Types we want to test for each database combination */ - private val sqlTypesToValues = listOf( - "SMALLINT" to "1", - "INTEGER" to "1", - "BIGINT" to "1", - "VARCHAR(4)" to "'text'", - "TEXT" to "'text'" - ) + private val sqlTypesToValues = + listOf( + "SMALLINT" to "1", + "INTEGER" to "1", + "BIGINT" to "1", + "VARCHAR(4)" to "'text'", + "TEXT" to "'text'" + ) /** * Create pairs of databases to test against. We don't have to test against all possibly combinations * because we can reasonably conclude that the matching is transitive, i.e if A==B and B==C then A==C. */ - private val databaseCombinations = IntStream - .range(1, databases.keys.size) - .mapToObj { databases.keys.toList()[it - 1] to databases.keys.toList()[it] } - .toList() + private val databaseCombinations = + IntStream + .range(1, databases.keys.size) + .mapToObj { databases.keys.toList()[it - 1] to databases.keys.toList()[it] } + .toList() private fun containerFor(name: String) = databases[name] ?: throw IllegalArgumentException("Cannot find db type [$name].") @@ -76,26 +79,28 @@ internal open class DatasetRecServiceCrossDatabaseIntegrationTest { @JvmStatic @BeforeAll fun startApplication() { - val datasources = databases.flatMap { (name, container) -> - listOf( - "r2dbc.datasources.$name.url" to r2dbcUrl(container.jdbcUrl), - "r2dbc.datasources.$name.username" to container.username, - "r2dbc.datasources.$name.password" to container.password - ) - }.toMap() - - val datasets = databaseCombinations.flatMap { (source, target) -> - listOf( - "reconciliation.datasets.$source-to-$target.source.datasourceRef" - to source, - "reconciliation.datasets.$source-to-$target.source.query" - to "SELECT id AS MigrationKey, value FROM TestData", - "reconciliation.datasets.$source-to-$target.target.datasourceRef" - to target, - "reconciliation.datasets.$source-to-$target.target.query" - to "SELECT id AS MigrationKey, value FROM TestData" - ) - }.toMap() + val datasources = + databases.flatMap { (name, container) -> + listOf( + "r2dbc.datasources.$name.url" to r2dbcUrl(container.jdbcUrl), + "r2dbc.datasources.$name.username" to container.username, + "r2dbc.datasources.$name.password" to container.password + ) + }.toMap() + + val datasets = + databaseCombinations.flatMap { (source, target) -> + listOf( + "reconciliation.datasets.$source-to-$target.source.datasourceRef" + to source, + "reconciliation.datasets.$source-to-$target.source.query" + to "SELECT id AS MigrationKey, value FROM TestData", + "reconciliation.datasets.$source-to-$target.target.datasourceRef" + to target, + "reconciliation.datasets.$source-to-$target.target.query" + to "SELECT id AS MigrationKey, value FROM TestData" + ) + }.toMap() ctx = ApplicationContext.run(datasources + datasets) } @@ -156,7 +161,10 @@ internal open class DatasetRecServiceCrossDatabaseIntegrationTest { @ParameterizedTest @ArgumentsSource(TestScenarios::class) - fun `rows match between source and target`(source: ScenarioConfig, target: ScenarioConfig) { + fun `rows match between source and target`( + source: ScenarioConfig, + target: ScenarioConfig + ) { allOf( runAsync { createTestData(source) }, runAsync { createTestData(target) } diff --git a/src/test/kotlin/recce/server/dataset/DatasetRecServiceIntegrationTest.kt b/src/test/kotlin/recce/server/dataset/DatasetRecServiceIntegrationTest.kt index e72b708c..7da0ce59 100644 --- a/src/test/kotlin/recce/server/dataset/DatasetRecServiceIntegrationTest.kt +++ b/src/test/kotlin/recce/server/dataset/DatasetRecServiceIntegrationTest.kt @@ -28,7 +28,6 @@ import java.nio.file.Path transactional = false ) open class DatasetRecServiceIntegrationTest { - @TempDir lateinit var tempDir: Path @Inject lateinit var migrator: FlywayMigrator @@ -111,13 +110,14 @@ open class DatasetRecServiceIntegrationTest { val sourceR2dbcConfig = ctx.getBean(R2dbcDatasource::class.java, Qualifiers.byName("source-h2")) val targetR2dbcConfig = ctx.getBean(R2dbcDatasource::class.java, Qualifiers.byName("target-h2")) - val expectedMeta = mapOf( - "sourceQuery" to datasetConfig?.source?.queryStatement, - "targetQuery" to datasetConfig?.target?.queryStatement, - "sourceUrl" to sourceR2dbcConfig.url, - "targetUrl" to targetR2dbcConfig.url, - "version" to buildConfig.version - ) + val expectedMeta = + mapOf( + "sourceQuery" to datasetConfig?.source?.queryStatement, + "targetQuery" to datasetConfig?.target?.queryStatement, + "sourceUrl" to sourceR2dbcConfig.url, + "targetUrl" to targetR2dbcConfig.url, + "version" to buildConfig.version + ) SoftAssertions.assertSoftly { softly -> softly.assertThat(run.id).isNotNull @@ -134,13 +134,14 @@ open class DatasetRecServiceIntegrationTest { softly.assertThat(run.status).isEqualTo(RunStatus.Successful) softly.assertThat(run.failureCause).isNull() softly.assertThat(run.summary).isEqualTo(MatchStatus(1, 2, 2, 0)) - val expectedMeta = DatasetMeta( - listOf( - ColMeta("MIGRATIONKEY", "String"), - ColMeta("NAME", "String"), - ColMeta("VAL", "String") + val expectedMeta = + DatasetMeta( + listOf( + ColMeta("MIGRATIONKEY", "String"), + ColMeta("NAME", "String"), + ColMeta("VAL", "String") + ) ) - ) softly.assertThat(run.sourceMeta).usingRecursiveComparison().isEqualTo(expectedMeta) softly.assertThat(run.targetMeta).usingRecursiveComparison().isEqualTo(expectedMeta) } diff --git a/src/test/kotlin/recce/server/dataset/DatasetRecServiceTest.kt b/src/test/kotlin/recce/server/dataset/DatasetRecServiceTest.kt index c6629298..33b18ded 100644 --- a/src/test/kotlin/recce/server/dataset/DatasetRecServiceTest.kt +++ b/src/test/kotlin/recce/server/dataset/DatasetRecServiceTest.kt @@ -16,40 +16,47 @@ import java.util.function.BiFunction internal class DatasetRecServiceTest { private val testDataset = "test-dataset" - private val recRun = RecRun(1, testDataset) + private val run = RecRun(1, testDataset) - private val runService = mock { - on { start(eq(testDataset), any()) } doReturn Mono.just(recRun) - on { successful(recRun) } doReturn Mono.just(recRun).map { recRun.asSuccessful(MatchStatus()) } - on { failed(eq(recRun), any()) } doReturn Mono.just(recRun).map { recRun.asFailed(IllegalArgumentException()) } - } + private val runService = + mock { + on { start(eq(testDataset), any()) } doReturn Mono.just(run) + on { successful(run) } doReturn Mono.just(run).map { run.asSuccessful(MatchStatus()) } + on { failed(eq(run), any()) } doReturn Mono.just(run).map { run.asFailed(IllegalArgumentException()) } + } - private val emptyRowResult = mock { - on { map(any>()) } doReturn Flux.empty() - } + private val emptyRowResult = + mock { + on { map(any>()) } doReturn Flux.empty() + } - private val emptyDataLoad = mock { - on { runQuery() } doReturn Flux.just(emptyRowResult) - } + private val emptyDataLoad = + mock { + on { runQuery() } doReturn Flux.just(emptyRowResult) + } private val testMeta = listOf(FakeColumnMetadata("col1", String::class.java)) - private val singleRowResult = mock { - on { map(any>()) } doReturn Flux.just(HashedRow("abc", "def", testMeta)) - } + private val singleRowResult = + mock { + on { map(any>()) } doReturn + Flux.just(HashedRow("abc", "def", testMeta)) + } - private val singleRowDataLoad = mock { - on { runQuery() } doReturn Flux.just(singleRowResult) - } + private val singleRowDataLoad = + mock { + on { runQuery() } doReturn Flux.just(singleRowResult) + } private val testRecordKey = RecRecordKey(1, "abc") private val testRecord = RecRecord(key = testRecordKey) - private val recordRepository = mock { - on { save(any()) } doReturn Mono.just(testRecord) - on { saveAll(anyList()) } doReturn Flux.just(testRecord) - on { findByRecRunIdAndMigrationKeyIn(eq(testRecordKey.recRunId), anyList()) } doReturn Flux.just(testRecord) - on { updateAll(anyList()) } doReturn Flux.just(testRecord) - } + private val recordRepository = + mock { + on { save(any()) } doReturn Mono.just(testRecord) + on { saveAll(anyList()) } doReturn Flux.just(testRecord) + on { findByRecRunIdAndMigrationKeyIn(eq(testRecordKey.recRunId), anyList()) } doReturn Flux.just(testRecord) + on { updateAll(anyList()) } doReturn Flux.just(testRecord) + } @Test fun `should throw on missing dataset`() { @@ -60,13 +67,14 @@ internal class DatasetRecServiceTest { @Test fun `mono should return failed run on failed data load`() { - val service = DatasetRecService( - RecConfiguration(mapOf(testDataset to DatasetConfiguration(emptyDataLoad, emptyDataLoad))), - mock(), - mock(), - runService, - mock() - ) + val service = + DatasetRecService( + RecConfiguration(mapOf(testDataset to DatasetConfiguration(emptyDataLoad, emptyDataLoad))), + mock(), + mock(), + runService, + mock() + ) val rootCause = IllegalArgumentException("Could not connect to database") whenever(emptyDataLoad.runQuery()).thenReturn(Flux.error(rootCause)) @@ -79,7 +87,7 @@ internal class DatasetRecServiceTest { .verifyComplete() val errorCaptor = argumentCaptor() - verify(runService).failed(eq(recRun), errorCaptor.capture()) + verify(runService).failed(eq(run), errorCaptor.capture()) assertThat(errorCaptor.firstValue) .isInstanceOf(DataLoadException::class.java) @@ -90,13 +98,14 @@ internal class DatasetRecServiceTest { @Test fun `mono should error on failed initial save`() { - val service = DatasetRecService( - RecConfiguration(mapOf(testDataset to DatasetConfiguration(emptyDataLoad, emptyDataLoad))), - mock(), - mock(), - runService, - mock() - ) + val service = + DatasetRecService( + RecConfiguration(mapOf(testDataset to DatasetConfiguration(emptyDataLoad, emptyDataLoad))), + mock(), + mock(), + runService, + mock() + ) whenever(runService.start(any(), any())).thenReturn(Mono.error(IllegalArgumentException("failed!"))) @@ -112,13 +121,14 @@ internal class DatasetRecServiceTest { @Test fun `mono should error on failed save of failed run`() { - val service = DatasetRecService( - RecConfiguration(mapOf(testDataset to DatasetConfiguration(emptyDataLoad, emptyDataLoad))), - mock(), - mock(), - runService, - mock() - ) + val service = + DatasetRecService( + RecConfiguration(mapOf(testDataset to DatasetConfiguration(emptyDataLoad, emptyDataLoad))), + mock(), + mock(), + runService, + mock() + ) val rootCause = IllegalArgumentException("Could not connect to database") whenever(emptyDataLoad.runQuery()).thenReturn(Flux.error(rootCause)) @@ -137,13 +147,14 @@ internal class DatasetRecServiceTest { @Test fun `should reconcile empty datasets without error`() { - val service = DatasetRecService( - RecConfiguration(mapOf(testDataset to DatasetConfiguration(emptyDataLoad, emptyDataLoad))), - mock(), - mock(), - runService, - mock() - ) + val service = + DatasetRecService( + RecConfiguration(mapOf(testDataset to DatasetConfiguration(emptyDataLoad, emptyDataLoad))), + mock(), + mock(), + runService, + mock() + ) service.runFor(testDataset) .test() .assertNext { @@ -158,13 +169,14 @@ internal class DatasetRecServiceTest { @Test fun `should reconcile source with empty target`() { - val service = DatasetRecService( - RecConfiguration(mapOf(testDataset to DatasetConfiguration(singleRowDataLoad, emptyDataLoad))), - mock(), - mock(), - runService, - recordRepository - ) + val service = + DatasetRecService( + RecConfiguration(mapOf(testDataset to DatasetConfiguration(singleRowDataLoad, emptyDataLoad))), + mock(), + mock(), + runService, + recordRepository + ) service.runFor(testDataset) .test() @@ -177,7 +189,7 @@ internal class DatasetRecServiceTest { verify(recordRepository).saveAll(listOf(RecRecord(key = testRecordKey, sourceData = "def"))) verifyNoMoreInteractions(recordRepository) - verify(runService).successful(recRun) + verify(runService).successful(run) } @Test @@ -189,13 +201,14 @@ internal class DatasetRecServiceTest { ) ).doReturn(Flux.empty()) - val service = DatasetRecService( - RecConfiguration(mapOf(testDataset to DatasetConfiguration(emptyDataLoad, singleRowDataLoad))), - mock(), - mock(), - runService, - recordRepository - ) + val service = + DatasetRecService( + RecConfiguration(mapOf(testDataset to DatasetConfiguration(emptyDataLoad, singleRowDataLoad))), + mock(), + mock(), + runService, + recordRepository + ) service.runFor(testDataset) .test() @@ -212,7 +225,7 @@ internal class DatasetRecServiceTest { ) verify(recordRepository).saveAll(listOf(RecRecord(key = testRecordKey, targetData = "def"))) verifyNoMoreInteractions(recordRepository) - verify(runService).successful(recRun) + verify(runService).successful(run) } @Test @@ -226,13 +239,14 @@ internal class DatasetRecServiceTest { ) ).doReturn(Flux.empty()) - val service = DatasetRecService( - RecConfiguration(mapOf(testDataset to DatasetConfiguration(singleRowDataLoad, singleRowDataLoad))), - mock(), - mock(), - runService, - recordRepository - ) + val service = + DatasetRecService( + RecConfiguration(mapOf(testDataset to DatasetConfiguration(singleRowDataLoad, singleRowDataLoad))), + mock(), + mock(), + runService, + recordRepository + ) service.runFor(testDataset) .test() @@ -250,7 +264,7 @@ internal class DatasetRecServiceTest { ) verify(recordRepository).saveAll(listOf(RecRecord(key = testRecordKey, targetData = "def"))) verifyNoMoreInteractions(recordRepository) - verify(runService).successful(recRun) + verify(runService).successful(run) } @Test @@ -262,13 +276,14 @@ internal class DatasetRecServiceTest { ) ).doReturn(Flux.just(testRecord)) - val service = DatasetRecService( - RecConfiguration(mapOf(testDataset to DatasetConfiguration(singleRowDataLoad, singleRowDataLoad))), - mock(), - mock(), - runService, - recordRepository - ) + val service = + DatasetRecService( + RecConfiguration(mapOf(testDataset to DatasetConfiguration(singleRowDataLoad, singleRowDataLoad))), + mock(), + mock(), + runService, + recordRepository + ) service.runFor(testDataset) .test() @@ -286,7 +301,7 @@ internal class DatasetRecServiceTest { ) verify(recordRepository).updateAll(listOf(testRecord)) verifyNoMoreInteractions(recordRepository) - verify(runService).successful(recRun) + verify(runService).successful(run) } @Test diff --git a/src/test/kotlin/recce/server/dataset/HashedRowTest.kt b/src/test/kotlin/recce/server/dataset/HashedRowTest.kt index 8e5d763a..f4d945e6 100644 --- a/src/test/kotlin/recce/server/dataset/HashedRowTest.kt +++ b/src/test/kotlin/recce/server/dataset/HashedRowTest.kt @@ -11,12 +11,13 @@ internal class HashedRowTest { val columnMetas = R2dbcFakeBuilder().withCol("test", String::class.java).buildColMetas() val row = HashedRow("test", "test", columnMetas) - val expectedMeta = DatasetMeta( - listOf( - ColMeta(DataLoadDefinition.migrationKeyColumnName, "String"), - ColMeta("test", "String") + val expectedMeta = + DatasetMeta( + listOf( + ColMeta(DataLoadDefinition.MIGRATION_KEY_COLUMN_NAME, "String"), + ColMeta("test", "String") + ) ) - ) Assertions.assertThat(row.lazyMeta()()).usingRecursiveComparison().isEqualTo(expectedMeta) } } diff --git a/src/test/kotlin/recce/server/dataset/HashingStrategyTest.kt b/src/test/kotlin/recce/server/dataset/HashingStrategyTest.kt index b6eb1dd8..1ab48f97 100644 --- a/src/test/kotlin/recce/server/dataset/HashingStrategyTest.kt +++ b/src/test/kotlin/recce/server/dataset/HashingStrategyTest.kt @@ -10,7 +10,7 @@ import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.ArgumentsProvider import org.junit.jupiter.params.provider.ArgumentsSource import org.junit.jupiter.params.provider.EnumSource -import recce.server.dataset.DataLoadDefinition.Companion.migrationKeyColumnName +import recce.server.dataset.DataLoadDefinition.Companion.MIGRATION_KEY_COLUMN_NAME import java.math.BigDecimal import java.nio.ByteBuffer import java.time.Instant @@ -30,15 +30,17 @@ private fun isHexSha256Hash(value: String): Boolean { } internal class HashingStrategyTest { - private val rowMetaWithTestCol = R2dbcFakeBuilder() - .withCol("test", String::class.java) + private val rowMetaWithTestCol = + R2dbcFakeBuilder() + .withCol("test", String::class.java) @ParameterizedTest @EnumSource(HashingStrategy::class) fun `should throw on null migration key`(strat: HashingStrategy) { - val row = rowMetaWithTestCol - .withRowValues(null, "test-val") - .build() + val row = + rowMetaWithTestCol + .withRowValues(null, "test-val") + .build() assertThatThrownBy { strat.hash(row, row.metadata) } .isExactlyInstanceOf(IllegalArgumentException::class.java) @@ -48,11 +50,12 @@ internal class HashingStrategyTest { @ParameterizedTest @EnumSource(HashingStrategy::class) fun `should throw on missing migration key column`(strat: HashingStrategy) { - val row = R2dbcFakeBuilder() - .noMigrationKey() - .withCol("test", String::class.java) - .withRowValues("test-val") - .build() + val row = + R2dbcFakeBuilder() + .noMigrationKey() + .withCol("test", String::class.java) + .withRowValues("test-val") + .build() assertThatThrownBy { strat.hash(row, row.metadata) } .isExactlyInstanceOf(IllegalArgumentException::class.java) @@ -62,10 +65,11 @@ internal class HashingStrategyTest { @ParameterizedTest @EnumSource(HashingStrategy::class) fun `should throw on duplicate migration key column`(strat: HashingStrategy) { - val row = rowMetaWithTestCol - .withCol(migrationKeyColumnName, String::class.java) - .withRowValues("key", "test-val", "key") - .build() + val row = + rowMetaWithTestCol + .withCol(MIGRATION_KEY_COLUMN_NAME, String::class.java) + .withRowValues("key", "test-val", "key") + .build() assertThatThrownBy { strat.hash(row, row.metadata) } .isExactlyInstanceOf(IllegalArgumentException::class.java) @@ -85,17 +89,19 @@ internal class HashingStrategyTest { @ParameterizedTest @EnumSource(HashingStrategy::class) fun `consecutive fields should always lead to different hashes`(strat: HashingStrategy) { - val row = R2dbcFakeBuilder() - .withCol("first", String::class.java) - .withCol("second", String::class.java) - .withRowValues(1, "abc", "def") - .build() + val row = + R2dbcFakeBuilder() + .withCol("first", String::class.java) + .withCol("second", String::class.java) + .withRowValues(1, "abc", "def") + .build() - val row2 = R2dbcFakeBuilder() - .withCol("first", String::class.java) - .withCol("second", String::class.java) - .withRowValues(1, "ab", "cdef") - .build() + val row2 = + R2dbcFakeBuilder() + .withCol("first", String::class.java) + .withCol("second", String::class.java) + .withRowValues(1, "ab", "cdef") + .build() assertThat(strat.hash(row, row.metadata).hashedValue) .isNotEqualTo(strat.hash(row2, row2.metadata).hashedValue) @@ -106,15 +112,17 @@ internal class HashingStrategyTest { fun `strategy should dictate whether nulls of different defined column java types should be considered unequal `( strat: HashingStrategy ) { - val stringRow = R2dbcFakeBuilder() - .withCol("test", String::class.java) - .withRowValues("key", null) - .build() + val stringRow = + R2dbcFakeBuilder() + .withCol("test", String::class.java) + .withRowValues("key", null) + .build() - val intRow = R2dbcFakeBuilder() - .withCol("test", Integer::class.java) - .withRowValues("key", null) - .build() + val intRow = + R2dbcFakeBuilder() + .withCol("test", Integer::class.java) + .withRowValues("key", null) + .build() val stringTypeRow = strat.hash(stringRow, stringRow.metadata) val intTypeRow = strat.hash(intRow, intRow.metadata) @@ -127,16 +135,21 @@ internal class HashingStrategyTest { @ParameterizedTest @ArgumentsSource(EquivalentTypeExamples::class) - fun `lenient type equivalence strategy should consider similar types equal`(first: Any, second: Any) { - val row = R2dbcFakeBuilder() - .withCol("test", first.javaClass) - .withRowValues("key", first) - .build() + fun `lenient type equivalence strategy should consider similar types equal`( + first: Any, + second: Any + ) { + val row = + R2dbcFakeBuilder() + .withCol("test", first.javaClass) + .withRowValues("key", first) + .build() - val row2 = R2dbcFakeBuilder() - .withCol("test", second.javaClass) - .withRowValues("key", second) - .build() + val row2 = + R2dbcFakeBuilder() + .withCol("test", second.javaClass) + .withRowValues("key", second) + .build() assertThat(HashingStrategy.TypeLenient.hash(row, row.metadata).hashedValue) .describedAs( @@ -165,18 +178,20 @@ internal class HashingStrategyTest { expectedStrictHash: String, expectedLenientHash: Optional ) { - val row = R2dbcFakeBuilder() - .withCol("test", type) - .withRowValues("key", inputSupplier()) - .build() + val row = + R2dbcFakeBuilder() + .withCol("test", type) + .withRowValues("key", inputSupplier()) + .build() assertThat(HashingStrategy.TypeStrict.hash(row, row.metadata)) .describedAs("strict hash not as expected") .isEqualTo(HashedRow("key", expectedStrictHash, row.metadata.columnMetadatas)) - val row2 = R2dbcFakeBuilder() - .withCol("test", type) - .withRowValues("key", inputSupplier()) - .build() + val row2 = + R2dbcFakeBuilder() + .withCol("test", type) + .withRowValues("key", inputSupplier()) + .build() assertThat(HashingStrategy.TypeLenient.hash(row2, row2.metadata)) .describedAs("lenient hash not as expected") .isEqualTo(HashedRow("key", expectedLenientHash.orElse(expectedStrictHash), row2.metadata.columnMetadatas)) diff --git a/src/test/kotlin/recce/server/dataset/R2dbcFakes.kt b/src/test/kotlin/recce/server/dataset/R2dbcFakes.kt index 44d61d23..789eee4c 100644 --- a/src/test/kotlin/recce/server/dataset/R2dbcFakes.kt +++ b/src/test/kotlin/recce/server/dataset/R2dbcFakes.kt @@ -7,18 +7,23 @@ import io.r2dbc.spi.Type data class FakeColumnMetadata(private val name: String, private val javaType: Class<*>) : ColumnMetadata { override fun getName() = name + override fun getJavaType() = type.javaType + override fun getType() = FakeType(javaType) } data class FakeType(private val javaType: Class<*>) : Type { override fun getJavaType() = javaType + override fun getName(): String = javaType.name } class FakeRowMetadata(private val cols: List) : RowMetadata { override fun getColumnMetadata(index: Int) = cols[index] + override fun getColumnMetadata(name: String) = cols.first { it.name == name } + override fun getColumnMetadatas() = cols } @@ -26,13 +31,19 @@ class FakeRow(private val meta: RowMetadata, private val values: List get(index: Int, type: Class): T? { + override fun get( + index: Int, + type: Class + ): T? { require(type == Object::class.java) { "Only support generic Object type returns" } return values[index].second as T } @Suppress("UNCHECKED_CAST") - override fun get(name: String, type: Class): T? { + override fun get( + name: String, + type: Class + ): T? { require(type == Object::class.java) { "Only support generic Object type returns" } return valuesByColName[name] as T } @@ -41,9 +52,10 @@ class FakeRow(private val meta: RowMetadata, private val values: List( - FakeColumnMetadata(DataLoadDefinition.migrationKeyColumnName, String::class.java) - ) + private val cols = + mutableListOf( + FakeColumnMetadata(DataLoadDefinition.MIGRATION_KEY_COLUMN_NAME, String::class.java) + ) private var hasMigrationKey: Boolean = true private lateinit var rowValues: List> @@ -59,7 +71,10 @@ class R2dbcFakeBuilder { return this } - fun withCol(name: String, javaType: Class<*>): R2dbcFakeBuilder { + fun withCol( + name: String, + javaType: Class<*> + ): R2dbcFakeBuilder { return withCol(FakeColumnMetadata(name, javaType)) } diff --git a/src/test/kotlin/recce/server/dataset/datasource/FlywayMigrator.kt b/src/test/kotlin/recce/server/dataset/datasource/FlywayMigrator.kt index d3944431..e307f14e 100644 --- a/src/test/kotlin/recce/server/dataset/datasource/FlywayMigrator.kt +++ b/src/test/kotlin/recce/server/dataset/datasource/FlywayMigrator.kt @@ -13,18 +13,18 @@ import javax.transaction.Transactional @Singleton open class FlywayMigrator { - - private val createTable = """ + private val createTable = + """ CREATE TABLE TestData ( name VARCHAR(255) PRIMARY KEY NOT NULL, val VARCHAR(255) NOT NULL ); - """.trimMargin() + """.trimMargin() private val insertUser: (Int) -> String = { i -> """ - INSERT INTO TestData (name, val) - VALUES ('Test$i', 'User$i'); + INSERT INTO TestData (name, val) + VALUES ('Test$i', 'User$i'); """.trimIndent() } @@ -55,10 +55,17 @@ open class FlywayMigrator { } } -fun flywayCleanMigrate(temporaryDir: Path, sql: String, db: DbDescriptor) = - flywayCleanMigrate(temporaryDir, sql) { it.dataSource(db.jdbcUrl, db.username, db.password) } +fun flywayCleanMigrate( + temporaryDir: Path, + sql: String, + db: DbDescriptor +) = flywayCleanMigrate(temporaryDir, sql) { it.dataSource(db.jdbcUrl, db.username, db.password) } -private fun flywayCleanMigrate(temporaryDir: Path, sql: String, configureHook: (FluentConfiguration) -> Unit) { +private fun flywayCleanMigrate( + temporaryDir: Path, + sql: String, + configureHook: (FluentConfiguration) -> Unit +) { val migrationsLoc = Files.createTempDirectory(temporaryDir, "scenario-") Files.writeString(migrationsLoc.resolve("V1__SETUP_TEST.sql"), sql) Flyway.configure() diff --git a/src/test/kotlin/recce/server/recrun/RecRunServiceTest.kt b/src/test/kotlin/recce/server/recrun/RecRunServiceTest.kt index 0ce761b7..9fb0d497 100644 --- a/src/test/kotlin/recce/server/recrun/RecRunServiceTest.kt +++ b/src/test/kotlin/recce/server/recrun/RecRunServiceTest.kt @@ -7,19 +7,18 @@ import org.mockito.kotlin.doReturn import org.mockito.kotlin.mock import reactor.core.publisher.Mono import reactor.kotlin.test.test -import java.lang.IllegalArgumentException import java.time.Instant internal class RecRunServiceTest { - private val datasetId = "my-dataset" private val startedRun = RecRun(1, datasetId, Instant.now()) @Test fun `should return start results`() { - val runRepository = mock { - on { save(any()) } doReturn Mono.just(startedRun) - } + val runRepository = + mock { + on { save(any()) } doReturn Mono.just(startedRun) + } val eventualRun = RecRunService(runRepository, mock()).start(datasetId, emptyMap()) @@ -32,18 +31,21 @@ internal class RecRunServiceTest { @Test fun `should set successful run with match status`() { val expectedMatchStatus = MatchStatus(1, 1, 1, 1) - val recordRepository = mock { - on { countMatchedByKeyRecRunId(any()) } doReturn Mono.just(expectedMatchStatus) - } + val recordRepository = + mock { + on { countMatchedByKeyRecRunId(any()) } doReturn Mono.just(expectedMatchStatus) + } - val runRepository = mock { - on { update(any()) } doReturn Mono.just( - startedRun.apply { - sourceMeta = DatasetMeta() - targetMeta = DatasetMeta() - } - ) - } + val runRepository = + mock { + on { update(any()) } doReturn + Mono.just( + startedRun.apply { + sourceMeta = DatasetMeta() + targetMeta = DatasetMeta() + } + ) + } RecRunService(runRepository, recordRepository).successful(startedRun) .test() @@ -57,9 +59,10 @@ internal class RecRunServiceTest { @Test fun `should set failed run`() { - val runRepository = mock { - on { update(any()) } doReturn Mono.just(startedRun) - } + val runRepository = + mock { + on { update(any()) } doReturn Mono.just(startedRun) + } RecRunService(runRepository, mock()).failed(startedRun, IllegalArgumentException("failed run!")) .test() diff --git a/src/test/kotlin/recce/server/util/ThrowableUtilsTest.kt b/src/test/kotlin/recce/server/util/ThrowableUtilsTest.kt index defd3fcf..136b026f 100644 --- a/src/test/kotlin/recce/server/util/ThrowableUtilsTest.kt +++ b/src/test/kotlin/recce/server/util/ThrowableUtilsTest.kt @@ -4,14 +4,14 @@ import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test internal class ThrowableUtilsTest { - private val rootCause = IllegalArgumentException("root") private val causedByRoot = IllegalArgumentException("causedByRoot", rootCause) - private val causedByCausedBy = IllegalArgumentException( - "causedByCausedByRoot" + - "", - causedByRoot - ) + private val causedByCausedBy = + IllegalArgumentException( + "causedByCausedByRoot" + + "", + causedByRoot + ) @Test fun `should return just message for throwable with no cause`() {