diff --git a/src/main/kotlin/recce/server/dataset/DatasetRecService.kt b/src/main/kotlin/recce/server/dataset/DatasetRecService.kt index f5bf5189..b4955079 100644 --- a/src/main/kotlin/recce/server/dataset/DatasetRecService.kt +++ b/src/main/kotlin/recce/server/dataset/DatasetRecService.kt @@ -56,9 +56,8 @@ open class DatasetRecService( batchSaver: (List, RecRun) -> Flux ): Mono = def.runQuery() - .doFirst { logger.info { "${def.datasourceDescriptor} query completed; streaming to Recce DB" } } - .single() - .flatMapMany { it.map(hashingStrategy::hash) } + .doOnNext { logger.info { "${def.datasourceDescriptor} query completed; streaming to Recce DB" } } + .flatMap { result -> result.map(hashingStrategy::hash) } .buffer(recConfig.defaults.batchSize) .zipWith(run.repeat()) .flatMap({ (rows, run) -> batchSaver(rows, run) }, recConfig.defaults.batchConcurrency) @@ -108,4 +107,4 @@ open class DatasetRecService( ) } -class DataLoadException(message: String, cause: Throwable? = null) : Exception(message, cause) +class DataLoadException(message: String, cause: Throwable) : Exception(message, cause) diff --git a/src/test/kotlin/recce/server/dataset/DatasetRecServiceTest.kt b/src/test/kotlin/recce/server/dataset/DatasetRecServiceTest.kt index 0fd07997..3234120d 100644 --- a/src/test/kotlin/recce/server/dataset/DatasetRecServiceTest.kt +++ b/src/test/kotlin/recce/server/dataset/DatasetRecServiceTest.kt @@ -275,60 +275,4 @@ internal class DatasetRecServiceTest { assertThat(DatasetRecService(RecConfiguration(datasetConfig), mock(), mock(), mock(), mock()).availableDataSets) .hasSameElementsAs(listOf(first, second)) } - - @Test - fun `should throw error if there is more than one result from query`() { - val multipleResults = mock { - on { runQuery() } doReturn Flux.just(singleRowResult, singleRowResult) - } - - val service = DatasetRecService( - RecConfiguration(mapOf(testDataset to DatasetConfiguration(multipleResults, multipleResults))), - mock(), - mock(), - runService, - mock() - ) - - service.runFor(testDataset) - .test() - .assertNext { assertThat(it.status).isEqualTo(RunStatus.Failed) } - .verifyComplete() - - val errorCaptor = argumentCaptor() - verify(runService).failed(eq(recRun), errorCaptor.capture()) - - assertThat(errorCaptor.firstValue) - .isInstanceOf(DataLoadException::class.java) - .hasMessageContaining("Source emitted more than one item") - .hasCauseExactlyInstanceOf(IndexOutOfBoundsException::class.java) - } - - @Test - fun `should throw error if there are no results from query`() { - val zeroResult = mock { - on { runQuery() } doReturn Flux.empty() - } - - val service = DatasetRecService( - RecConfiguration(mapOf(testDataset to DatasetConfiguration(zeroResult, zeroResult))), - mock(), - mock(), - runService, - mock() - ) - - service.runFor(testDataset) - .test() - .assertNext { assertThat(it.status).isEqualTo(RunStatus.Failed) } - .verifyComplete() - - val errorCaptor = argumentCaptor() - verify(runService).failed(eq(recRun), errorCaptor.capture()) - - assertThat(errorCaptor.firstValue) - .isInstanceOf(DataLoadException::class.java) - .hasMessageContaining("Source was empty") - .hasCauseExactlyInstanceOf(NoSuchElementException::class.java) - } }