Skip to content

Commit

Permalink
feat: automatic deprovisioning for provider transfers
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt committed Oct 4, 2023
1 parent ffcdfb6 commit 93a2f88
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,12 @@ private boolean processCompleting(TransferProcess process) {
var description = format("Send %s to %s", message.getClass().getSimpleName(), process.getConnectorAddress());
return entityRetryProcessFactory.doAsyncStatusResultProcess(process, () -> dispatcherRegistry.dispatch(Object.class, message))
.entityRetrieve(id -> store.findById(id))
.onSuccess((t, content) -> transitionToCompleted(t))
.onSuccess((t, content) -> {
transitionToCompleted(t);
if (t.getType() == PROVIDER) {
transitionToDeprovisioning(t);
}
})
.onFailure((t, throwable) -> transitionToCompleting(t))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable))
Expand Down Expand Up @@ -389,7 +394,12 @@ private boolean processTerminating(TransferProcess process) {
var description = format("Send %s to %s", message.getClass().getSimpleName(), process.getConnectorAddress());
return entityRetryProcessFactory.doAsyncStatusResultProcess(process, () -> dispatcherRegistry.dispatch(Object.class, message))
.entityRetrieve(id -> store.findById(id))
.onSuccess((t, content) -> transitionToTerminated(t))
.onSuccess((t, content) -> {
transitionToTerminated(t);
if (t.getType() == PROVIDER) {
transitionToDeprovisioning(t);
}
})
.onFailure((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted(this::transitionToTerminated)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,8 +558,8 @@ void starting_whenShouldWait_updatesStateCount() {
}

@Test
void completing_shouldTransitionToCompleted_whenSendingMessageSucceed() {
var process = createTransferProcessBuilder(COMPLETING).dataRequest(createDataRequestBuilder().id("correlationId").build()).build();
void completing_provider_shouldTransitionToDeprovisioning_whenSendingMessageSucceed() {
var process = createTransferProcessBuilder(COMPLETING).type(PROVIDER).dataRequest(createDataRequestBuilder().id("correlationId").build()).build();
when(transferProcessStore.nextNotLeased(anyInt(), stateIs(COMPLETING.code()))).thenReturn(List.of(process)).thenReturn(emptyList());
when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(COMPLETING.code()).build());
when(dispatcherRegistry.dispatch(any(), isA(TransferCompletionMessage.class))).thenReturn(completedFuture(StatusResult.success("any")));
Expand All @@ -571,13 +571,32 @@ void completing_shouldTransitionToCompleted_whenSendingMessageSucceed() {
verify(dispatcherRegistry).dispatch(eq(Object.class), captor.capture());
var message = captor.getValue();
assertThat(message.getProcessId()).isEqualTo("correlationId");
verify(transferProcessStore, times(RETRY_LIMIT)).save(argThat(p -> p.getState() == COMPLETED.code()));
verify(transferProcessStore, atLeastOnce()).save(argThat(p -> p.getState() == DEPROVISIONING.code()));
verify(listener).completed(process);
});
}

@Test
void terminating_shouldTransitionToTerminated_whenMessageSentCorrectly() {
void completing_consumer_shouldTransitionToCompleted_whenSendingMessageSucceed() {
var process = createTransferProcessBuilder(COMPLETING).type(CONSUMER).dataRequest(createDataRequestBuilder().id("correlationId").build()).build();
when(transferProcessStore.nextNotLeased(anyInt(), stateIs(COMPLETING.code()))).thenReturn(List.of(process)).thenReturn(emptyList());
when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(COMPLETING.code()).build());
when(dispatcherRegistry.dispatch(any(), isA(TransferCompletionMessage.class))).thenReturn(completedFuture(StatusResult.success("any")));

manager.start();

await().untilAsserted(() -> {
var captor = ArgumentCaptor.forClass(TransferCompletionMessage.class);
verify(dispatcherRegistry).dispatch(eq(Object.class), captor.capture());
var message = captor.getValue();
assertThat(message.getProcessId()).isEqualTo("correlationId");
verify(transferProcessStore).save(argThat(p -> p.getState() == COMPLETED.code()));
verify(listener).completed(process);
});
}

@Test
void terminating_provider_shouldTransitionToDeprovisioning_whenMessageSentCorrectly() {
var process = createTransferProcessBuilder(TERMINATING).type(PROVIDER)
.dataRequest(createDataRequestBuilder().id("correlationId").build()).build();
when(transferProcessStore.nextNotLeased(anyInt(), stateIs(TERMINATING.code()))).thenReturn(List.of(process)).thenReturn(emptyList());
Expand All @@ -591,7 +610,27 @@ void terminating_shouldTransitionToTerminated_whenMessageSentCorrectly() {
verify(dispatcherRegistry).dispatch(eq(Object.class), captor.capture());
var message = captor.getValue();
assertThat(message.getProcessId()).isEqualTo("correlationId");
verify(transferProcessStore, times(RETRY_LIMIT)).save(argThat(p -> p.getState() == TERMINATED.code()));
verify(transferProcessStore, atLeastOnce()).save(argThat(p -> p.getState() == DEPROVISIONING.code()));
verify(listener).terminated(process);
});
}

@Test
void terminating_consumer_shouldTransitionToTerminated_whenMessageSentCorrectly() {
var process = createTransferProcessBuilder(TERMINATING).type(CONSUMER)
.dataRequest(createDataRequestBuilder().id("correlationId").build()).build();
when(transferProcessStore.nextNotLeased(anyInt(), stateIs(TERMINATING.code()))).thenReturn(List.of(process)).thenReturn(emptyList());
when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(TERMINATING.code()).build());
when(dispatcherRegistry.dispatch(any(), isA(TransferTerminationMessage.class))).thenReturn(completedFuture(StatusResult.success("any")));

manager.start();

await().untilAsserted(() -> {
var captor = ArgumentCaptor.forClass(TransferTerminationMessage.class);
verify(dispatcherRegistry).dispatch(eq(Object.class), captor.capture());
var message = captor.getValue();
assertThat(message.getProcessId()).isEqualTo("correlationId");
verify(transferProcessStore).save(argThat(p -> p.getState() == TERMINATED.code()));
verify(listener).terminated(process);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public void transitionCompleted() {
}

public boolean canBeDeprovisioned() {
return currentStateIsOneOf(COMPLETED, TERMINATED, DEPROVISIONING);
return currentStateIsOneOf(COMPLETING, TERMINATING, COMPLETED, TERMINATED, DEPROVISIONING);
}

public void transitionDeprovisioning() {
Expand Down

0 comments on commit 93a2f88

Please sign in to comment.