Skip to content

Commit

Permalink
#57 Fail fast if query has zero or multiple statements
Browse files Browse the repository at this point in the history
  • Loading branch information
jiawen-tw committed Mar 3, 2022
1 parent 3e9744f commit d3b7913
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 3 deletions.
10 changes: 7 additions & 3 deletions src/main/kotlin/recce/server/dataset/DatasetRecService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,12 @@ open class DatasetRecService(
batchSaver: (List<HashedRow>, RecRun) -> Flux<LazyDatasetMeta>
): Mono<DatasetMeta> =
def.runQuery()
.doOnNext { logger.info { "${def.datasourceDescriptor} query completed; streaming to Recce DB" } }
.flatMap { result -> result.map(hashingStrategy::hash) }
.doFirst { logger.info { "${def.datasourceDescriptor} query completed; streaming to Recce DB" } }
.collectList()
.flatMapMany {
if (it.size != 1) throw DataLoadException("SQL query must be a single statement")
else it.first().map(hashingStrategy::hash)
}
.buffer(recConfig.defaults.batchSize)
.zipWith(run.repeat())
.flatMap({ (rows, run) -> batchSaver(rows, run) }, recConfig.defaults.batchConcurrency)
Expand Down Expand Up @@ -107,4 +111,4 @@ open class DatasetRecService(
)
}

class DataLoadException(message: String, cause: Throwable) : Exception(message, cause)
class DataLoadException(message: String, cause: Throwable? = null) : Exception(message, cause)
52 changes: 52 additions & 0 deletions src/test/kotlin/recce/server/dataset/DatasetRecServiceTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -267,4 +267,56 @@ internal class DatasetRecServiceTest {
assertThat(DatasetRecService(RecConfiguration(datasetConfig), mock(), mock(), mock(), mock()).availableDataSets)
.hasSameElementsAs(listOf(first, second))
}

@Test
fun `should throw error if there is more than one result from query`() {
val multipleResults = mock<DataLoadDefinition> {
on { runQuery() } doReturn Flux.just(singleRowResult, singleRowResult)
}

val service = DatasetRecService(
RecConfiguration(mapOf(testDataset to DatasetConfiguration(multipleResults, multipleResults))),
mock(),
mock(),
runService,
mock()
)

StepVerifier.create(service.runFor(testDataset))
.assertNext { assertThat(it.status).isEqualTo(RunStatus.Failed) }
.verifyComplete()

val errorCaptor = argumentCaptor<Throwable>()
verify(runService).failed(eq(recRun), errorCaptor.capture())

assertThat(errorCaptor.firstValue)
.isInstanceOf(DataLoadException::class.java)
.hasMessageContaining("SQL query must be a single statement")
}

@Test
fun `should throw error if there are no results from query`() {
val zeroResult = mock<DataLoadDefinition> {
on { runQuery() } doReturn Flux.empty()
}

val service = DatasetRecService(
RecConfiguration(mapOf(testDataset to DatasetConfiguration(zeroResult, zeroResult))),
mock(),
mock(),
runService,
mock()
)

StepVerifier.create(service.runFor(testDataset))
.assertNext { assertThat(it.status).isEqualTo(RunStatus.Failed) }
.verifyComplete()

val errorCaptor = argumentCaptor<Throwable>()
verify(runService).failed(eq(recRun), errorCaptor.capture())

assertThat(errorCaptor.firstValue)
.isInstanceOf(DataLoadException::class.java)
.hasMessageContaining("SQL query must be a single statement")
}
}

0 comments on commit d3b7913

Please sign in to comment.