diff --git a/sdk/communication/azure-communication-chat/src/test/java/com/azure/communication/chat/ChatAsyncClientTest.java b/sdk/communication/azure-communication-chat/src/test/java/com/azure/communication/chat/ChatAsyncClientTest.java index 74728f93c35c6..06dbf5c05310b 100644 --- a/sdk/communication/azure-communication-chat/src/test/java/com/azure/communication/chat/ChatAsyncClientTest.java +++ b/sdk/communication/azure-communication-chat/src/test/java/com/azure/communication/chat/ChatAsyncClientTest.java @@ -8,17 +8,21 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import org.junit.jupiter.api.Test; +import reactor.test.StepVerifier; + import com.azure.communication.administration.CommunicationIdentityClient; import com.azure.communication.administration.CommunicationUserToken; import com.azure.communication.common.CommunicationUser; import com.azure.communication.chat.implementation.ChatOptionsProvider; import com.azure.communication.chat.models.*; +import com.azure.core.exception.HttpResponseException; import com.azure.core.http.rest.PagedIterable; import com.azure.core.util.logging.ClientLogger; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; /** * Set the AZURE_TEST_MODE environment variable to either PLAYBACK or RECORD to determine if tests are playback or @@ -57,125 +61,196 @@ protected void afterTest() { @Test public void canCreateThread() { + // Arrange CreateChatThreadOptions threadRequest = ChatOptionsProvider.createThreadOptions( firstThreadMember.getId(), secondThreadMember.getId()); - ChatThreadAsyncClient chatThreadClient = client.createChatThread(threadRequest).block(); - assertNotNull(chatThreadClient); - assertNotNull(chatThreadClient.getChatThreadId()); + // Act & Assert + StepVerifier.create(client.createChatThread(threadRequest)) + .assertNext(chatThreadClient -> { + assertNotNull(chatThreadClient); + assertNotNull(chatThreadClient.getChatThreadId()); + }) + .verifyComplete(); } @Test public void canCreateThreadWithResponse() { + // Arrange CreateChatThreadOptions threadRequest = ChatOptionsProvider.createThreadOptions( firstThreadMember.getId(), secondThreadMember.getId()); - ChatThreadAsyncClient chatThreadClient = client.createChatThreadWithResponse(threadRequest).block().getValue(); - assertNotNull(chatThreadClient); - assertNotNull(chatThreadClient.getChatThreadId()); + // Act & Assert + StepVerifier.create(client.createChatThreadWithResponse(threadRequest)) + .assertNext(chatThreadClientResponse -> { + ChatThreadAsyncClient chatThreadClient = chatThreadClientResponse.getValue(); + assertNotNull(chatThreadClient); + assertNotNull(chatThreadClient.getChatThreadId()); + }) + .verifyComplete(); } @Test public void canGetChatThreadClient() { + // Arrange String threadId = "19:fe0a2f65a7834185b29164a7de57699c@thread.v2"; + // Act ChatThreadAsyncClient chatThreadClient = client.getChatThreadClient(threadId); + + // Assert assertNotNull(chatThreadClient); assertEquals(chatThreadClient.getChatThreadId(), threadId); } @Test public void canGetExistingChatThread() { + // Arrange CreateChatThreadOptions threadRequest = ChatOptionsProvider.createThreadOptions( firstThreadMember.getId(), secondThreadMember.getId()); - ChatThreadAsyncClient chatThreadClient = client.createChatThread(threadRequest).block(); - ChatThread chatThread = client.getChatThread(chatThreadClient.getChatThreadId()).block(); - assertEquals(chatThreadClient.getChatThreadId(), chatThread.getId()); + // Act & Assert + AtomicReference chatThreadClientRef = new AtomicReference<>(); + StepVerifier.create( + client.createChatThread(threadRequest) + .flatMap(chatThreadClient -> { + chatThreadClientRef.set(chatThreadClient); + return client.getChatThread(chatThreadClient.getChatThreadId()); + })) + .assertNext(chatThread -> { + assertEquals(chatThreadClientRef.get().getChatThreadId(), chatThread.getId()); + }) + .verifyComplete(); } @Test public void canGetExistingChatThreadWithResponse() { + // Arrange CreateChatThreadOptions threadRequest = ChatOptionsProvider.createThreadOptions( firstThreadMember.getId(), secondThreadMember.getId()); - ChatThreadAsyncClient chatThreadClient = client.createChatThread(threadRequest).block(); - ChatThread chatThread = client.getChatThreadWithResponse(chatThreadClient.getChatThreadId()).block().getValue(); - assertEquals(chatThreadClient.getChatThreadId(), chatThread.getId()); + // Act & Assert + AtomicReference chatThreadClientRef = new AtomicReference<>(); + StepVerifier.create( + client.createChatThread(threadRequest) + .flatMap(chatThreadClient -> { + chatThreadClientRef.set(chatThreadClient); + return client.getChatThreadWithResponse(chatThreadClient.getChatThreadId()); + })) + .assertNext(chatThreadResponse -> { + ChatThread chatThread = chatThreadResponse.getValue(); + assertEquals(chatThreadClientRef.get().getChatThreadId(), chatThread.getId()); + }) + .verifyComplete(); } @Test public void getNotFoundOnNonExistingChatThread() { - assertRestException( - () -> client.getChatThread("19:020082a8df7b44dd8c722bea8fe7167f@thread.v2").block(), 404); + // Act & Assert + StepVerifier.create(client.getChatThread("19:020082a8df7b44dd8c722bea8fe7167f@thread.v2")) + .expectErrorMatches(exception -> + ((HttpResponseException) exception).getResponse().getStatusCode() == 404 + ) + .verify(); } @Test public void getNotFoundOnNonExistingChatThreadWithResponse() { - assertRestException( - () -> client.getChatThreadWithResponse("19:020082a8df7b44dd8c722bea8fe7167f@thread.v2").block(), 404); + // Act & Assert + StepVerifier.create(client.getChatThreadWithResponse("19:020082a8df7b44dd8c722bea8fe7167f@thread.v2")) + .expectErrorMatches(exception -> + ((HttpResponseException) exception).getResponse().getStatusCode() == 404 + ) + .verify(); } @Test public void canDeleteChatThread() { + // Arrange CreateChatThreadOptions threadRequest = ChatOptionsProvider.createThreadOptions( firstThreadMember.getId(), secondThreadMember.getId()); - ChatThreadAsyncClient chatThreadClient = client.createChatThread(threadRequest).block(); - client.deleteChatThread(chatThreadClient.getChatThreadId()).block(); + // Act & Assert + AtomicReference chatThreadClientRef = new AtomicReference<>(); + StepVerifier.create( + client.createChatThread(threadRequest) + .flatMap(chatThreadClient -> { + chatThreadClientRef.set(chatThreadClient); + return client.deleteChatThread(chatThreadClient.getChatThreadId()); + }) + ) + .verifyComplete(); } @Test public void canDeleteChatThreadWithResponse() { + // Arrange CreateChatThreadOptions threadRequest = ChatOptionsProvider.createThreadOptions( firstThreadMember.getId(), secondThreadMember.getId()); - ChatThreadAsyncClient chatThreadClient = client.createChatThread(threadRequest).block(); - - client.deleteChatThreadWithResponse(chatThreadClient.getChatThreadId()).block(); + + // Act & Assert + AtomicReference chatThreadClientRef = new AtomicReference<>(); + StepVerifier.create( + client.createChatThread(threadRequest) + .flatMap(chatThreadClient -> { + chatThreadClientRef.set(chatThreadClient); + return client.deleteChatThreadWithResponse(chatThreadClient.getChatThreadId()); + }) + ) + .assertNext(deleteResponse -> { + assertEquals(deleteResponse.getStatusCode(), 204); + }) + .verifyComplete(); } @Test public void canListChatThreads() throws InterruptedException { + // Arrange CreateChatThreadOptions threadRequest = ChatOptionsProvider.createThreadOptions( firstThreadMember.getId(), secondThreadMember.getId()); - client.createChatThread(threadRequest).block(); - client.createChatThread(threadRequest).block(); - - Thread.sleep(500); - - PagedIterable threadsResponse = new PagedIterable<>(client.listChatThreads()); - - // process the iterableByPage - List returnedThreads = new ArrayList(); - threadsResponse.iterableByPage().forEach(resp -> { - assertEquals(resp.getStatusCode(), 200); - resp.getItems().forEach(item -> returnedThreads.add(item)); - }); - - assertTrue(returnedThreads.size() == 2); + + StepVerifier.create( + client.createChatThread(threadRequest) + .concatWith(client.createChatThread(threadRequest))) + .assertNext(chatThreadClient -> { + // Act & Assert + PagedIterable threadsResponse = new PagedIterable<>(client.listChatThreads()); + + // process the iterableByPage + List returnedThreads = new ArrayList(); + threadsResponse.iterableByPage().forEach(resp -> { + assertEquals(resp.getStatusCode(), 200); + resp.getItems().forEach(item -> returnedThreads.add(item)); + }); + + assertTrue(returnedThreads.size() == 2); + }); } @Test public void canListChatThreadsWithMaxPageSize() throws InterruptedException { + // Arrange CreateChatThreadOptions threadRequest = ChatOptionsProvider.createThreadOptions( firstThreadMember.getId(), secondThreadMember.getId()); - client.createChatThread(threadRequest).block(); - client.createChatThread(threadRequest).block(); - - Thread.sleep(500); - + ListChatThreadsOptions options = new ListChatThreadsOptions(); options.setMaxPageSize(10); - PagedIterable threadsResponse = new PagedIterable<>(client.listChatThreads(options)); - - // process the iterableByPage - List returnedThreads = new ArrayList(); - threadsResponse.iterableByPage().forEach(resp -> { - assertEquals(resp.getStatusCode(), 200); - resp.getItems().forEach(item -> returnedThreads.add(item)); - }); - - assertTrue(returnedThreads.size() == 2); + StepVerifier.create( + client.createChatThread(threadRequest) + .concatWith(client.createChatThread(threadRequest))) + .assertNext(chatThreadClient -> { + // Act & Assert + PagedIterable threadsResponse = new PagedIterable<>(client.listChatThreads(options)); + + // process the iterableByPage + List returnedThreads = new ArrayList(); + threadsResponse.iterableByPage().forEach(resp -> { + assertEquals(resp.getStatusCode(), 200); + resp.getItems().forEach(item -> returnedThreads.add(item)); + }); + + assertTrue(returnedThreads.size() == 2); + }); } } diff --git a/sdk/communication/azure-communication-chat/src/test/java/com/azure/communication/chat/ChatThreadAsyncClientTest.java b/sdk/communication/azure-communication-chat/src/test/java/com/azure/communication/chat/ChatThreadAsyncClientTest.java index 3b9facd4f8f49..543975945dc32 100644 --- a/sdk/communication/azure-communication-chat/src/test/java/com/azure/communication/chat/ChatThreadAsyncClientTest.java +++ b/sdk/communication/azure-communication-chat/src/test/java/com/azure/communication/chat/ChatThreadAsyncClientTest.java @@ -7,22 +7,27 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import org.junit.jupiter.api.Test; +import reactor.test.StepVerifier; + import com.azure.communication.administration.CommunicationIdentityClient; import com.azure.communication.administration.CommunicationUserToken; import com.azure.communication.common.CommunicationUser; import com.azure.communication.chat.implementation.ChatOptionsProvider; import com.azure.communication.chat.models.*; import com.azure.core.http.rest.PagedIterable; +import com.azure.core.http.rest.Response; import com.azure.core.util.logging.ClientLogger; import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; /** - * Set the AZURE_TEST_MODE environment variable to either PLAYBACK or RECORD to determine if tests are playback or - * live. By default, tests are run in playback mode. + * Set the AZURE_TEST_MODE environment variable to either PLAYBACK or RECORD to + * determine if tests are playback or live. By default, tests are run in + * playback mode. */ public class ChatThreadAsyncClientTest extends ChatClientTestBase { @@ -51,8 +56,8 @@ protected void beforeTest() { client = getChatClientBuilder(response.getToken()).buildAsyncClient(); - CreateChatThreadOptions threadRequest = ChatOptionsProvider.createThreadOptions( - firstThreadMember.getId(), secondThreadMember.getId()); + CreateChatThreadOptions threadRequest = ChatOptionsProvider.createThreadOptions(firstThreadMember.getId(), + secondThreadMember.getId()); chatThreadClient = client.createChatThread(threadRequest).block(); threadId = chatThreadClient.getChatThreadId(); } @@ -64,253 +69,373 @@ protected void afterTest() { @Test public void canUpdateThread() { + // Arrange UpdateChatThreadOptions threadRequest = ChatOptionsProvider.updateThreadOptions(); - chatThreadClient.updateChatThread(threadRequest).block(); - - ChatThread chatThread = client.getChatThread(threadId).block(); - assertEquals(chatThread.getTopic(), threadRequest.getTopic()); + // Act & Assert + StepVerifier.create( + chatThreadClient.updateChatThread(threadRequest) + .flatMap(noResp -> { + return client.getChatThread(threadId); + }) + ) + .assertNext(chatThread -> { + assertEquals(chatThread.getTopic(), threadRequest.getTopic()); + }); } @Test public void canUpdateThreadWithResponse() { + // Arrange UpdateChatThreadOptions threadRequest = ChatOptionsProvider.updateThreadOptions(); - chatThreadClient.updateChatThreadWithResponse(threadRequest).block().getValue(); - - ChatThread chatThread = client.getChatThread(threadId).block(); - assertEquals(chatThread.getTopic(), threadRequest.getTopic()); + // Act & Assert + StepVerifier.create( + chatThreadClient.updateChatThreadWithResponse(threadRequest) + .flatMap(updateThreadResponse -> { + assertEquals(updateThreadResponse.getStatusCode(), 200); + return client.getChatThread(threadId); + }) + + ) + .assertNext(chatThread -> { + assertEquals(chatThread.getTopic(), threadRequest.getTopic()); + }) + .verifyComplete(); } @Test public void canAddListAndRemoveMembersAsync() throws InterruptedException { + // Arrange firstAddedThreadMember = communicationClient.createUser(); secondAddedThreadMember = communicationClient.createUser(); AddChatThreadMembersOptions options = ChatOptionsProvider.addThreadMembersOptions( firstAddedThreadMember.getId(), secondAddedThreadMember.getId()); - chatThreadClient.addMembers(options).block(); - - PagedIterable membersResponse = new PagedIterable<>(chatThreadClient.listMembers()); - - // process the iterableByPage - List returnedMembers = new ArrayList(); - membersResponse.iterableByPage().forEach(resp -> { - assertEquals(resp.getStatusCode(), 200); - resp.getItems().forEach(item -> returnedMembers.add(item)); - }); - - for (ChatThreadMember member: options.getMembers()) { - assertTrue(checkMembersListContainsMemberId(returnedMembers, member.getUser().getId())); - } - - assertTrue(returnedMembers.size() == 4); + // Act & Assert + StepVerifier.create(chatThreadClient.addMembers(options)) + .assertNext(noResp -> { + PagedIterable membersResponse = new PagedIterable<>(chatThreadClient.listMembers()); + + // process the iterableByPage + List returnedMembers = new ArrayList(); + membersResponse.iterableByPage().forEach(resp -> { + assertEquals(resp.getStatusCode(), 200); + resp.getItems().forEach(item -> returnedMembers.add(item)); + }); + + for (ChatThreadMember member: options.getMembers()) { + assertTrue(checkMembersListContainsMemberId(returnedMembers, member.getUser().getId())); + } + assertTrue(returnedMembers.size() == 4); + }); for (ChatThreadMember member: options.getMembers()) { - chatThreadClient.removeMember(member.getUser()).block(); + StepVerifier.create(chatThreadClient.removeMember(member.getUser())) + .verifyComplete(); } } @Test public void canAddListAndRemoveMembersWithResponseAsync() throws InterruptedException { + // Arrange firstAddedThreadMember = communicationClient.createUser(); secondAddedThreadMember = communicationClient.createUser(); AddChatThreadMembersOptions options = ChatOptionsProvider.addThreadMembersOptions( firstAddedThreadMember.getId(), secondAddedThreadMember.getId()); - chatThreadClient.addMembersWithResponse(options).block().getValue(); - - PagedIterable membersResponse = new PagedIterable<>(chatThreadClient.listMembers()); - - // process the iterableByPage - List returnedMembers = new ArrayList(); - membersResponse.iterableByPage().forEach(resp -> { - assertEquals(resp.getStatusCode(), 200); - resp.getItems().forEach(item -> returnedMembers.add(item)); - }); - - for (ChatThreadMember member: options.getMembers()) { - assertTrue(checkMembersListContainsMemberId(returnedMembers, member.getUser().getId())); - } - - assertTrue(returnedMembers.size() == 4); + // Action & Assert + StepVerifier.create(chatThreadClient.addMembersWithResponse(options)) + .assertNext(addMembersResponse -> { + assertEquals(addMembersResponse.getStatusCode(), 207); + PagedIterable membersResponse = new PagedIterable<>(chatThreadClient.listMembers()); + + // process the iterableByPage + List returnedMembers = new ArrayList(); + membersResponse.iterableByPage().forEach(resp -> { + assertEquals(resp.getStatusCode(), 200); + resp.getItems().forEach(item -> returnedMembers.add(item)); + }); + + for (ChatThreadMember member: options.getMembers()) { + assertTrue(checkMembersListContainsMemberId(returnedMembers, member.getUser().getId())); + } + + assertTrue(returnedMembers.size() == 4); + }) + .verifyComplete(); for (ChatThreadMember member: options.getMembers()) { - chatThreadClient.removeMemberWithResponse(member.getUser()).block().getValue(); + StepVerifier.create(chatThreadClient.removeMemberWithResponse(member.getUser())) + .assertNext(resp -> { + assertEquals(resp.getStatusCode(), 204); + }) + .verifyComplete(); } } @Test public void canSendThenGetMessage() { + // Arrange SendChatMessageOptions messageRequest = ChatOptionsProvider.sendMessageOptions(); - - SendChatMessageResult response = chatThreadClient.sendMessage(messageRequest).block(); - - ChatMessage message = chatThreadClient.getMessage(response.getId()).block(); - assertEquals(message.getContent(), messageRequest.getContent()); - assertEquals(message.getPriority(), messageRequest.getPriority()); - assertEquals(message.getSenderDisplayName(), messageRequest.getSenderDisplayName()); + + // Action & Assert + StepVerifier + .create(chatThreadClient.sendMessage(messageRequest) + .flatMap(response -> { + return chatThreadClient.getMessage(response.getId()); + }) + ) + .assertNext(message -> { + assertEquals(message.getContent(), messageRequest.getContent()); + assertEquals(message.getPriority(), messageRequest.getPriority()); + assertEquals(message.getSenderDisplayName(), messageRequest.getSenderDisplayName()); + }) + .verifyComplete(); } @Test public void canSendThenGetMessageWithResponse() { + // Arrange SendChatMessageOptions messageRequest = ChatOptionsProvider.sendMessageOptions(); - SendChatMessageResult response = chatThreadClient.sendMessageWithResponse(messageRequest).block().getValue(); - - ChatMessage message = chatThreadClient.getMessageWithResponse(response.getId()).block().getValue(); - assertEquals(message.getContent(), messageRequest.getContent()); - assertEquals(message.getPriority(), messageRequest.getPriority()); - assertEquals(message.getSenderDisplayName(), messageRequest.getSenderDisplayName()); + // Action & Assert + StepVerifier + .create(chatThreadClient.sendMessageWithResponse(messageRequest) + .flatMap(sendResponse -> { + assertEquals(sendResponse.getStatusCode(), 201); + return chatThreadClient.getMessageWithResponse(sendResponse.getValue().getId()); + }) + ) + .assertNext(getResponse -> { + ChatMessage message = getResponse.getValue(); + assertEquals(message.getContent(), messageRequest.getContent()); + assertEquals(message.getPriority(), messageRequest.getPriority()); + assertEquals(message.getSenderDisplayName(), messageRequest.getSenderDisplayName()); + }) + .verifyComplete(); } @Test public void canDeleteExistingMessage() { + // Arrange SendChatMessageOptions messageRequest = ChatOptionsProvider.sendMessageOptions(); - SendChatMessageResult response = chatThreadClient.sendMessage(messageRequest).block(); - - chatThreadClient.deleteMessage(response.getId()); + // Action & Assert + StepVerifier.create( + chatThreadClient.sendMessage(messageRequest) + .flatMap(response -> { + return chatThreadClient.deleteMessage(response.getId()); + }) + ) + .verifyComplete(); } @Test public void canDeleteExistingMessageWithResponse() { + // Arrange SendChatMessageOptions messageRequest = ChatOptionsProvider.sendMessageOptions(); - SendChatMessageResult response = chatThreadClient.sendMessage(messageRequest).block(); - - chatThreadClient.deleteMessageWithResponse(response.getId()).block(); + // Action & Assert + StepVerifier.create( + chatThreadClient.sendMessage(messageRequest) + .flatMap(response -> { + return chatThreadClient.deleteMessageWithResponse(response.getId()); + }) + ) + .assertNext(deleteResponse -> { + assertEquals(deleteResponse.getStatusCode(), 204); + }) + .verifyComplete(); } @Test public void canUpdateExistingMessage() { + // Arrange SendChatMessageOptions messageRequest = ChatOptionsProvider.sendMessageOptions(); UpdateChatMessageOptions updateMessageRequest = ChatOptionsProvider.updateMessageOptions(); - SendChatMessageResult response = chatThreadClient.sendMessage(messageRequest).block(); - - chatThreadClient.updateMessage(response.getId(), updateMessageRequest).block(); - - ChatMessage message = chatThreadClient.getMessage(response.getId()).block(); - assertEquals(message.getContent(), updateMessageRequest.getContent()); + // Action & Assert + AtomicReference messageResponseRef = new AtomicReference<>(); + StepVerifier.create( + chatThreadClient.sendMessage(messageRequest) + .flatMap(response -> { + messageResponseRef.set(response); + return chatThreadClient.updateMessage(response.getId(), updateMessageRequest); + }) + .flatMap((Void resp) -> { + return chatThreadClient.getMessage(messageResponseRef.get().getId()); + }) + ) + .assertNext(message -> { + assertEquals(message.getContent(), updateMessageRequest.getContent()); + }); } @Test public void canUpdateExistingMessageWithResponse() { + // Arrange SendChatMessageOptions messageRequest = ChatOptionsProvider.sendMessageOptions(); UpdateChatMessageOptions updateMessageRequest = ChatOptionsProvider.updateMessageOptions(); - SendChatMessageResult response = chatThreadClient.sendMessage(messageRequest).block(); - - chatThreadClient.updateMessageWithResponse(response.getId(), updateMessageRequest).block(); - - ChatMessage message = chatThreadClient.getMessage(response.getId()).block(); - assertEquals(message.getContent(), updateMessageRequest.getContent()); + // Action & Assert + AtomicReference messageResponseRef = new AtomicReference<>(); + StepVerifier.create( + chatThreadClient.sendMessage(messageRequest) + .flatMap((SendChatMessageResult response) -> { + messageResponseRef.set(response); + return chatThreadClient.updateMessageWithResponse(response.getId(), updateMessageRequest); + }) + .flatMap((Response updateResponse) -> { + assertEquals(updateResponse.getStatusCode(), 200); + return chatThreadClient.getMessage(messageResponseRef.get().getId()); + }) + ) + .assertNext(message -> { + assertEquals(message.getContent(), updateMessageRequest.getContent()); + }) + .verifyComplete(); } @Test public void canListMessages() { + // Arrange SendChatMessageOptions messageRequest = ChatOptionsProvider.sendMessageOptions(); - chatThreadClient.sendMessage(messageRequest).block(); - chatThreadClient.sendMessage(messageRequest).block(); - - PagedIterable messagesResponse = new PagedIterable(chatThreadClient.listMessages()); - - // process the iterableByPage - List returnedMessages = new ArrayList(); - messagesResponse.iterableByPage().forEach(resp -> { - assertEquals(resp.getStatusCode(), 200); - resp.getItems().forEach(item -> { - if (item.getType().equals("Text")) { - returnedMessages.add(item); - } - }); - }); - assertTrue(returnedMessages.size() == 2); + StepVerifier.create( + chatThreadClient.sendMessage(messageRequest) + .concatWith(chatThreadClient.sendMessage(messageRequest)) + ) + .assertNext((message) -> { + // Action & Assert + PagedIterable messagesResponse = new PagedIterable(chatThreadClient.listMessages()); + + // process the iterableByPage + List returnedMessages = new ArrayList(); + messagesResponse.iterableByPage().forEach(resp -> { + assertEquals(resp.getStatusCode(), 200); + resp.getItems().forEach(item -> { + if (item.getType().equals("Text")) { + returnedMessages.add(item); + } + }); + }); + + assertTrue(returnedMessages.size() == 2); + }); } @Test public void canListMessagesWithOptions() { + // Arrange SendChatMessageOptions messageRequest = ChatOptionsProvider.sendMessageOptions(); - chatThreadClient.sendMessage(messageRequest).block(); - chatThreadClient.sendMessage(messageRequest).block(); - ListChatMessagesOptions options = new ListChatMessagesOptions(); options.setMaxPageSize(10); options.setStartTime(OffsetDateTime.parse("2020-09-08T01:02:14.387Z")); - PagedIterable messagesResponse = new PagedIterable(chatThreadClient.listMessages(options)); - - // process the iterableByPage - List returnedMessages = new ArrayList(); - messagesResponse.iterableByPage().forEach(resp -> { - assertEquals(resp.getStatusCode(), 200); - resp.getItems().forEach(item -> { - if (item.getType().equals("Text")) { - returnedMessages.add(item); - } + // Action & Assert + StepVerifier.create( + chatThreadClient.sendMessage(messageRequest) + .concatWith(chatThreadClient.sendMessage(messageRequest))) + .assertNext((message) -> { + PagedIterable messagesResponse = new PagedIterable(chatThreadClient.listMessages(options)); + + // process the iterableByPage + List returnedMessages = new ArrayList(); + messagesResponse.iterableByPage().forEach(resp -> { + assertEquals(resp.getStatusCode(), 200); + resp.getItems().forEach(item -> { + if (item.getType().equals("Text")) { + returnedMessages.add(item); + } + }); + }); + + assertTrue(returnedMessages.size() == 2); }); - }); - - assertTrue(returnedMessages.size() == 2); } @Test public void canSendTypingNotification() { - chatThreadClient.sendTypingNotification().block(); + // Action & Assert + StepVerifier.create(chatThreadClient.sendTypingNotification()) + .verifyComplete(); } @Test public void canSendTypingNotificationWithResponse() { - chatThreadClient.sendTypingNotificationWithResponse().block(); + // Action & Assert + StepVerifier.create(chatThreadClient.sendTypingNotificationWithResponse()) + .assertNext(response -> { + assertEquals(response.getStatusCode(), 200); + }) + .verifyComplete(); } @Test public void canSendThenListReadReceipts() throws InterruptedException { + // Arrange SendChatMessageOptions messageRequest = ChatOptionsProvider.sendMessageOptions(); - - SendChatMessageResult response = chatThreadClient.sendMessage(messageRequest).block(); - - chatThreadClient.sendReadReceipt(response.getId()).block(); - - PagedIterable readReceiptsResponse = new PagedIterable(chatThreadClient.listReadReceipts()); - - // process the iterableByPage - List returnedReadReceipts = new ArrayList(); - readReceiptsResponse.iterableByPage().forEach(resp -> { - assertEquals(resp.getStatusCode(), 200); - resp.getItems().forEach(item -> returnedReadReceipts.add(item)); - }); - - if (interceptorManager.isPlaybackMode()) { - assertTrue(returnedReadReceipts.size() > 0); - checkReadReceiptListContainsMessageId(returnedReadReceipts, response.getId()); - } + AtomicReference messageResponseRef = new AtomicReference<>(); + + // Action & Assert + StepVerifier.create( + chatThreadClient.sendMessage(messageRequest) + .flatMap(response -> { + messageResponseRef.set(response); + return chatThreadClient.sendReadReceipt(response.getId()); + }) + ) + .assertNext(noResp -> { + PagedIterable readReceiptsResponse = new PagedIterable(chatThreadClient.listReadReceipts()); + + // process the iterableByPage + List returnedReadReceipts = new ArrayList(); + readReceiptsResponse.iterableByPage().forEach(resp -> { + assertEquals(resp.getStatusCode(), 200); + resp.getItems().forEach(item -> returnedReadReceipts.add(item)); + }); + + if (interceptorManager.isPlaybackMode()) { + assertTrue(returnedReadReceipts.size() > 0); + checkReadReceiptListContainsMessageId(returnedReadReceipts, messageResponseRef.get().getId()); + } + }); } @Test public void canSendThenListReadReceiptsWithResponse() throws InterruptedException { + // Arrange SendChatMessageOptions messageRequest = ChatOptionsProvider.sendMessageOptions(); - - SendChatMessageResult response = chatThreadClient.sendMessage(messageRequest).block(); - - chatThreadClient.sendReadReceiptWithResponse(response.getId()).block(); - - PagedIterable readReceiptsResponse = new PagedIterable(chatThreadClient.listReadReceipts()); - - // process the iterableByPage - List returnedReadReceipts = new ArrayList(); - readReceiptsResponse.iterableByPage().forEach(resp -> { - assertEquals(resp.getStatusCode(), 200); - resp.getItems().forEach(item -> returnedReadReceipts.add(item)); - }); - - if (interceptorManager.isPlaybackMode()) { - assertTrue(returnedReadReceipts.size() > 0); - checkReadReceiptListContainsMessageId(returnedReadReceipts, response.getId()); - } + AtomicReference messageResponseRef = new AtomicReference<>(); + + // Action & Assert + StepVerifier.create( + chatThreadClient.sendMessage(messageRequest) + .flatMap(response -> { + messageResponseRef.set(response); + return chatThreadClient.sendReadReceiptWithResponse(response.getId()); + }) + ) + .assertNext(receiptResponse -> { + assertEquals(receiptResponse.getStatusCode(), 201); + PagedIterable readReceiptsResponse = new PagedIterable(chatThreadClient.listReadReceipts()); + + // process the iterableByPage + List returnedReadReceipts = new ArrayList(); + readReceiptsResponse.iterableByPage().forEach(resp -> { + assertEquals(resp.getStatusCode(), 200); + resp.getItems().forEach(item -> returnedReadReceipts.add(item)); + }); + + if (interceptorManager.isPlaybackMode()) { + assertTrue(returnedReadReceipts.size() > 0); + checkReadReceiptListContainsMessageId(returnedReadReceipts, messageResponseRef.get().getId()); + } + }) + .verifyComplete(); + } }