diff --git a/ksql-engine/src/main/java/io/confluent/ksql/materialization/ks/KsStateStore.java b/ksql-engine/src/main/java/io/confluent/ksql/materialization/ks/KsStateStore.java index 5b08d9e73a37..836047c6449f 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/materialization/ks/KsStateStore.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/materialization/ks/KsStateStore.java @@ -74,6 +74,11 @@ T store(final QueryableStoreType queryableStoreType) { try { return kafkaStreams.store(stateStoreName, queryableStoreType); } catch (final Exception e) { + final State state = kafkaStreams.state(); + if (state != State.RUNNING) { + throw new NotRunningException("The query was not in a running state. state: " + state); + } + throw new MaterializationException("State store currently unavailable: " + stateStoreName, e); } } @@ -88,10 +93,5 @@ private void awaitRunning() { Thread.yield(); } - - final State state = kafkaStreams.state(); - if (state != State.RUNNING) { - throw new NotRunningException("The query was not in a running state. state: " + state); - } } } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/materialization/ks/KsStateStoreTest.java b/ksql-engine/src/test/java/io/confluent/ksql/materialization/ks/KsStateStoreTest.java index 0ca04c513970..7674d1e02d62 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/materialization/ks/KsStateStoreTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/materialization/ks/KsStateStoreTest.java @@ -128,32 +128,17 @@ public void shouldThrowIfDoesNotFinishRebalanceBeforeTimeout() { } @Test - public void shouldThrowIfNotRunningAfterRebalanced() { + public void shouldThrowIfNotRunningAfterFailedToGetStore() { // Given: when(kafkaStreams.state()) - .thenReturn(State.REBALANCING) - .thenReturn(State.REBALANCING) + .thenReturn(State.RUNNING) .thenReturn(State.NOT_RUNNING); - // When: - expectedException.expect(NotRunningException.class); - expectedException.expectMessage("The query was not in a running state. state: NOT_RUNNING"); - - // When: - store.store(QueryableStoreTypes.sessionStore()); - } - - @Test - public void shouldThrowIfPendingShutdown() { - // Given: - when(kafkaStreams.state()) - .thenReturn(State.REBALANCING) - .thenReturn(State.REBALANCING) - .thenReturn(State.PENDING_SHUTDOWN); + when(kafkaStreams.store(any(), any())).thenThrow(new IllegalStateException()); // When: expectedException.expect(NotRunningException.class); - expectedException.expectMessage("The query was not in a running state. state: PENDING_SHUTDOWN"); + expectedException.expectMessage("The query was not in a running state. state: NOT_RUNNING"); // When: store.store(QueryableStoreTypes.sessionStore());