Skip to content

Commit

Permalink
Revert "#57 Fail fast if query has zero or multiple statements"
Browse files Browse the repository at this point in the history
This reverts commit d3b7913 - reverting for now, as this change appears to be breaking the batching.
  • Loading branch information
chadlwilson committed May 25, 2022
1 parent 7ae43b3 commit 901b71b
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 60 deletions.
7 changes: 3 additions & 4 deletions src/main/kotlin/recce/server/dataset/DatasetRecService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,8 @@ open class DatasetRecService(
batchSaver: (List<HashedRow>, RecRun) -> Flux<LazyDatasetMeta>
): Mono<DatasetMeta> =
def.runQuery()
.doFirst { logger.info { "${def.datasourceDescriptor} query completed; streaming to Recce DB" } }
.single()
.flatMapMany { it.map(hashingStrategy::hash) }
.doOnNext { logger.info { "${def.datasourceDescriptor} query completed; streaming to Recce DB" } }
.flatMap { result -> result.map(hashingStrategy::hash) }
.buffer(recConfig.defaults.batchSize)
.zipWith(run.repeat())
.flatMap({ (rows, run) -> batchSaver(rows, run) }, recConfig.defaults.batchConcurrency)
Expand Down Expand Up @@ -108,4 +107,4 @@ open class DatasetRecService(
)
}

class DataLoadException(message: String, cause: Throwable? = null) : Exception(message, cause)
class DataLoadException(message: String, cause: Throwable) : Exception(message, cause)
56 changes: 0 additions & 56 deletions src/test/kotlin/recce/server/dataset/DatasetRecServiceTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -275,60 +275,4 @@ internal class DatasetRecServiceTest {
assertThat(DatasetRecService(RecConfiguration(datasetConfig), mock(), mock(), mock(), mock()).availableDataSets)
.hasSameElementsAs(listOf(first, second))
}

@Test
fun `should throw error if there is more than one result from query`() {
val multipleResults = mock<DataLoadDefinition> {
on { runQuery() } doReturn Flux.just(singleRowResult, singleRowResult)
}

val service = DatasetRecService(
RecConfiguration(mapOf(testDataset to DatasetConfiguration(multipleResults, multipleResults))),
mock(),
mock(),
runService,
mock()
)

service.runFor(testDataset)
.test()
.assertNext { assertThat(it.status).isEqualTo(RunStatus.Failed) }
.verifyComplete()

val errorCaptor = argumentCaptor<Throwable>()
verify(runService).failed(eq(recRun), errorCaptor.capture())

assertThat(errorCaptor.firstValue)
.isInstanceOf(DataLoadException::class.java)
.hasMessageContaining("Source emitted more than one item")
.hasCauseExactlyInstanceOf(IndexOutOfBoundsException::class.java)
}

@Test
fun `should throw error if there are no results from query`() {
val zeroResult = mock<DataLoadDefinition> {
on { runQuery() } doReturn Flux.empty()
}

val service = DatasetRecService(
RecConfiguration(mapOf(testDataset to DatasetConfiguration(zeroResult, zeroResult))),
mock(),
mock(),
runService,
mock()
)

service.runFor(testDataset)
.test()
.assertNext { assertThat(it.status).isEqualTo(RunStatus.Failed) }
.verifyComplete()

val errorCaptor = argumentCaptor<Throwable>()
verify(runService).failed(eq(recRun), errorCaptor.capture())

assertThat(errorCaptor.firstValue)
.isInstanceOf(DataLoadException::class.java)
.hasMessageContaining("Source was empty")
.hasCauseExactlyInstanceOf(NoSuchElementException::class.java)
}
}

0 comments on commit 901b71b

Please sign in to comment.