diff --git a/README.md b/README.md index 25d539a..9980322 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2 act template (5.1.1) +# th2 act template (5.2.0) ## Overview @@ -100,6 +100,9 @@ the `protobuf-description-base64` label. Such descriptors can be used to interac ## Release Notes +### 5.2.0 ++ `multiSendMessage` method implemented + ### 5.1.1 + Use pins with `send` pin attribute for parsed messages sends. diff --git a/build.gradle b/build.gradle index 85b7ceb..d6a9fe6 100644 --- a/build.gradle +++ b/build.gradle @@ -92,10 +92,10 @@ jar { dependencies { api platform('com.exactpro.th2:bom:4.5.0') - implementation 'com.exactpro.th2:common:5.4.0-dev' - implementation 'com.exactpro.th2:common-utils:2.2.0-dev' + implementation 'com.exactpro.th2:common:5.8.0-dev' + implementation 'com.exactpro.th2:common-utils:2.2.2-dev' implementation 'com.exactpro.th2:grpc-act-template:4.2.0' - implementation 'com.exactpro.th2:grpc-check1:4.2.0-dev' + implementation 'com.exactpro.th2:grpc-check1:4.4.0-dev' implementation "com.fasterxml.jackson.core:jackson-core" implementation "com.fasterxml.jackson.core:jackson-databind" @@ -104,9 +104,9 @@ dependencies { implementation "org.apache.commons:commons-lang3" - testImplementation 'org.junit.jupiter:junit-jupiter:5.10.0' + testImplementation 'org.junit.jupiter:junit-jupiter:5.10.1' testImplementation 'io.strikt:strikt-core:0.34.1' - testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0" + testImplementation 'org.mockito.kotlin:mockito-kotlin:3.2.0' } test { diff --git a/gradle.properties b/gradle.properties index e2eda78..5c66e6a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -release_version=5.1.1 +release_version=5.2.0 nexus_url= nexus_user= nexus_password= diff --git a/src/main/java/com/exactpro/th2/act/ActHandler.java b/src/main/java/com/exactpro/th2/act/ActHandler.java index f1bc3cd..645213e 100644 --- a/src/main/java/com/exactpro/th2/act/ActHandler.java +++ b/src/main/java/com/exactpro/th2/act/ActHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,10 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.exactpro.th2.act; import com.exactpro.th2.act.grpc.ActGrpc.ActImplBase; import com.exactpro.th2.act.grpc.PlaceMessageRequest; +import com.exactpro.th2.act.grpc.MultiSendRequest; import com.exactpro.th2.act.grpc.PlaceMessageRequestOrBuilder; import com.exactpro.th2.act.grpc.PlaceMessageResponse; import com.exactpro.th2.act.grpc.SendMessageResponse; @@ -225,7 +227,7 @@ public void sendMessage(PlaceMessageRequest request, StreamObserver responseObserver) { + long startPlaceMessage = System.currentTimeMillis(); + try { + var messages = request.getMessagesList(); + if (messages.isEmpty()) { + throw new IllegalArgumentException("Empty message list is not allowed"); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Sending message request: " + shortDebugString(request)); + } + + String actName = "multiSendMessage"; + // FIXME store parent with fail in case of children fail + EventID parentId = createAndStoreParentEvent(request, actName, PASSED); + + Checkpoint checkpoint = registerCheckPoint(parentId); + + if (Context.current().isCancelled()) { + LOGGER.warn("'{}' request cancelled by client", actName); + sendMessageErrorResponse(responseObserver, "Request has been cancelled by client"); + } + + try { + multiSendMessage(messages, parentId); + } catch (Exception ex) { + createAndStoreErrorEvent("sendMessage", ex.getMessage(), Instant.now(), parentId); + throw ex; + } + + SendMessageResponse response = SendMessageResponse.newBuilder() + .setStatus(RequestStatus.newBuilder().setStatus(SUCCESS).build()) + .setCheckpointId(checkpoint) + .build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + + } catch (RuntimeException | IOException e) { + LOGGER.error("Failed to send a messages. Message = {}", request, e); + sendMessageErrorResponse(responseObserver, "Send messages failed. Error: " + e.getMessage()); + } finally { + LOGGER.debug("Sending the message has been finished in {}", System.currentTimeMillis() - startPlaceMessage); + } + } + @Override public void placeOrderMassCancelRequestFIX(PlaceMessageRequest request, StreamObserver responseObserver) { try { @@ -433,7 +481,28 @@ private EventID createAndStoreParentEvent(PlaceMessageRequestOrBuilder request, com.exactpro.th2.common.grpc.Event protoEvent = event.toProto(request.getParentEventId()); //FIXME process response try { - eventBatchMessageRouter.send(EventBatch.newBuilder().addEvents(event.toProto(request.getParentEventId())).build(), "publish", "event"); + eventBatchMessageRouter.send(EventBatch.newBuilder().addEvents(event.toProto(request.getParentEventId())).build()); + LOGGER.debug("createAndStoreParentEvent for {} in {} ms", actName, System.currentTimeMillis() - startTime); + return protoEvent.getId(); + } catch (IOException e) { + throw new RuntimeException("Can not send event = " + protoEvent.getId().getId(), e); + } + } + + private EventID createAndStoreParentEvent(MultiSendRequest request, String actName, Status status) throws IOException { + long startTime = System.currentTimeMillis(); + + Event event = start() + .name(actName + ' ' + request.getMessages(0).getMetadata().getId().getConnectionId().getSessionAlias()) + .description(request.getDescription()) + .type(actName) + .status(status) + .endTimestamp(); + + com.exactpro.th2.common.grpc.Event protoEvent = event.toProto(request.getParentEventId()); + //FIXME process response + try { + eventBatchMessageRouter.send(EventBatch.newBuilder().addEvents(event.toProto(request.getParentEventId())).build()); LOGGER.debug("createAndStoreParentEvent for {} in {} ms", actName, System.currentTimeMillis() - startTime); return protoEvent.getId(); } catch (IOException e) { @@ -453,7 +522,7 @@ private EventID createAndStoreParentEvent(SendRawMessageRequest request, String com.exactpro.th2.common.grpc.Event protoEvent = event.toProto(request.getParentEventId()); //FIXME process response try { - eventBatchMessageRouter.send(EventBatch.newBuilder().addEvents(event.toProto(request.getParentEventId())).build(), "publish", "event"); + eventBatchMessageRouter.send(EventBatch.newBuilder().addEvents(event.toProto(request.getParentEventId())).build()); LOGGER.debug("createAndStoreParentEvent for {} in {} ms", actName, System.currentTimeMillis() - startTime); return protoEvent.getId(); } catch (IOException e) { @@ -563,7 +632,7 @@ private void sendMessage(Message message, EventID parentEventId) throws IOExcept ParsedMessage parsedMessage = MessageUtilsKt.toTransportBuilder(message) .setEventId(EventUtilsKt.toTransport(parentEventId)) .build(); - //May be use in future for filtering + //May be used in future for filtering //request.getConnectionId().getSessionAlias(); MessageID messageID = message.getMetadata().getId(); messageRouter.send(toBatch(toGroup(parsedMessage), messageID.getBookName(), messageID.getConnectionId().getSessionGroup()), SEND_QUEUE_ATTRIBUTE); @@ -572,12 +641,39 @@ private void sendMessage(Message message, EventID parentEventId) throws IOExcept EventBatch eventBatch = EventBatch.newBuilder() .addEvents(createSendMessageEvent(parsedMessage, parentEventId)) .build(); - eventBatchMessageRouter.send(eventBatch, "publish", "event"); + eventBatchMessageRouter.send(eventBatch); } finally { LOGGER.debug("Sending the message ended"); } } + private void multiSendMessage(List messages, EventID parentEventId) throws IOException { + try { + LOGGER.debug("Sending the message list started"); + + final var firstMessageId = messages.get(0).getMetadata().getId(); + final var bookName = firstMessageId.getBookName(); + final var sessionGroup = firstMessageId.getConnectionId().getSessionGroup(); + + // currently all messages should be placed into one batch, and therefore they should have + // the same 'bookName' and 'sessionGroup' fields + // later we can implement batching based on book name and session group + if(!messages.stream().allMatch(msg -> msg.getMetadata().getId().getBookName().equals(bookName) + && msg.getMetadata().getId().getConnectionId().getSessionGroup().equals(sessionGroup))) { + throw new IllegalArgumentException("All messages in 'multiSendMessage' should have the same 'bookName' (" + bookName + ") and 'sessionGroup' (" + sessionGroup + ") fields"); + } + + var batch = new GroupBatch(bookName, sessionGroup, messages.stream() + .map(msg -> toGroup(MessageUtilsKt.toTransportBuilder(msg).setEventId(EventUtilsKt.toTransport(parentEventId)).build())) + .collect(Collectors.toList()) + ); + + messageRouter.send(batch, SEND_QUEUE_ATTRIBUTE); + } finally { + LOGGER.debug("Sending the message list completed"); + } + } + private void sendRawMessage(com.exactpro.th2.common.grpc.RawMessage rawMessage, EventID parentEventId) throws IOException { try { LOGGER.debug("Sending the message started"); @@ -595,7 +691,7 @@ private void sendRawMessage(com.exactpro.th2.common.grpc.RawMessage rawMessage, EventBatch eventBatch = EventBatch.newBuilder() .addEvents(createSendRawMessageEvent(transportRawMessage, parentEventId)) .build(); - eventBatchMessageRouter.send(eventBatch, "publish", "event"); + eventBatchMessageRouter.send(eventBatch); } finally { LOGGER.debug("Sending the message ended"); } @@ -636,7 +732,7 @@ private void storeEvent(com.exactpro.th2.common.grpc.Event eventRequest) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Try to store event: {}", toDebugMessage(eventRequest)); } - eventBatchMessageRouter.send(EventBatch.newBuilder().addEvents(eventRequest).build(), "publish", "event"); + eventBatchMessageRouter.send(EventBatch.newBuilder().addEvents(eventRequest).build()); } catch (Exception e) { LOGGER.error("Could not store event", e); } @@ -785,4 +881,4 @@ public ConnectionID getConnectionID() { return connectionID; } } -} +} \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/act/TestBiDirectionalMessageReceiver.kt b/src/test/kotlin/com/exactpro/th2/act/TestBiDirectionalMessageReceiver.kt index 70fd1ca..5a6609a 100644 --- a/src/test/kotlin/com/exactpro/th2/act/TestBiDirectionalMessageReceiver.kt +++ b/src/test/kotlin/com/exactpro/th2/act/TestBiDirectionalMessageReceiver.kt @@ -1,5 +1,5 @@ /* - * Copyright 2021-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2021-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.exactpro.th2.act import com.exactpro.th2.act.impl.SubscriptionManagerImpl @@ -27,8 +28,8 @@ import com.exactpro.th2.common.utils.message.MessageHolder import com.exactpro.th2.common.utils.message.toTransport import com.exactpro.th2.common.utils.message.transport.toBatch import com.exactpro.th2.common.utils.message.transport.toGroup -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.verify +import org.mockito.kotlin.mock +import org.mockito.kotlin.verify import org.junit.jupiter.api.Test import strikt.api.expect import strikt.assertions.containsExactlyInAnyOrder @@ -44,7 +45,7 @@ class TestBiDirectionalMessageReceiver { .build() private val manager = SubscriptionManagerImpl() private val monitor: ResponseMonitor = mock { } - private val deliveryMetadata: DeliveryMetadata = mock { } + private val deliveryMetadata = DeliveryMetadata("test_tag_1", false) private fun receiver(outgoing: CheckRule, incomingSupplier: (MessageHolder) -> CheckRule): AbstractMessageReceiver = BiDirectionalMessageReceiver( diff --git a/src/test/kotlin/com/exactpro/th2/act/impl/TestSubscriptionManagerImpl.kt b/src/test/kotlin/com/exactpro/th2/act/impl/TestSubscriptionManagerImpl.kt index 4daa0c0..2e2f2ae 100644 --- a/src/test/kotlin/com/exactpro/th2/act/impl/TestSubscriptionManagerImpl.kt +++ b/src/test/kotlin/com/exactpro/th2/act/impl/TestSubscriptionManagerImpl.kt @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2021-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.exactpro.th2.act.impl import com.exactpro.th2.act.Listener @@ -26,11 +27,11 @@ import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.ParsedMess import com.exactpro.th2.common.utils.message.TransportMessageHolder import com.exactpro.th2.common.utils.message.transport.toBatch import com.exactpro.th2.common.utils.message.transport.toGroup -import com.nhaarman.mockitokotlin2.any -import com.nhaarman.mockitokotlin2.eq -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.never -import com.nhaarman.mockitokotlin2.verify +import org.mockito.kotlin.any +import org.mockito.kotlin.eq +import org.mockito.kotlin.mock +import org.mockito.kotlin.never +import org.mockito.kotlin.verify import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest @@ -39,7 +40,7 @@ import org.junit.jupiter.params.provider.EnumSource class TestSubscriptionManagerImpl { private val manager = SubscriptionManagerImpl() - private val deliveryMetadata: DeliveryMetadata = mock { } + private val deliveryMetadata = DeliveryMetadata("test_tag_1", false) @Test fun `correctly distributes the batches`() {