Skip to content

Commit

Permalink
multiSendMessage method implemented (#123)
Browse files Browse the repository at this point in the history
multiSendMessage method implemented
  • Loading branch information
lumber1000 authored Feb 1, 2024
1 parent 434d5ce commit c813298
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 27 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# th2 act template (5.1.1)
# th2 act template (5.2.0)

## Overview

Expand Down Expand Up @@ -100,6 +100,9 @@ the `protobuf-description-base64` label. Such descriptors can be used to interac

## Release Notes

### 5.2.0
+ `multiSendMessage` method implemented

### 5.1.1
+ Use pins with `send` pin attribute for parsed messages sends.

Expand Down
10 changes: 5 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ jar {
dependencies {
api platform('com.exactpro.th2:bom:4.5.0')

implementation 'com.exactpro.th2:common:5.4.0-dev'
implementation 'com.exactpro.th2:common-utils:2.2.0-dev'
implementation 'com.exactpro.th2:common:5.8.0-dev'
implementation 'com.exactpro.th2:common-utils:2.2.2-dev'
implementation 'com.exactpro.th2:grpc-act-template:4.2.0'
implementation 'com.exactpro.th2:grpc-check1:4.2.0-dev'
implementation 'com.exactpro.th2:grpc-check1:4.4.0-dev'

implementation "com.fasterxml.jackson.core:jackson-core"
implementation "com.fasterxml.jackson.core:jackson-databind"
Expand All @@ -104,9 +104,9 @@ dependencies {

implementation "org.apache.commons:commons-lang3"

testImplementation 'org.junit.jupiter:junit-jupiter:5.10.0'
testImplementation 'org.junit.jupiter:junit-jupiter:5.10.1'
testImplementation 'io.strikt:strikt-core:0.34.1'
testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0"
testImplementation 'org.mockito.kotlin:mockito-kotlin:3.2.0'
}

test {
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
release_version=5.1.1
release_version=5.2.0
nexus_url=
nexus_user=
nexus_password=
Expand Down
114 changes: 105 additions & 9 deletions src/main/java/com/exactpro/th2/act/ActHandler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,10 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.th2.act;

import com.exactpro.th2.act.grpc.ActGrpc.ActImplBase;
import com.exactpro.th2.act.grpc.PlaceMessageRequest;
import com.exactpro.th2.act.grpc.MultiSendRequest;
import com.exactpro.th2.act.grpc.PlaceMessageRequestOrBuilder;
import com.exactpro.th2.act.grpc.PlaceMessageResponse;
import com.exactpro.th2.act.grpc.SendMessageResponse;
Expand Down Expand Up @@ -225,7 +227,7 @@ public void sendMessage(PlaceMessageRequest request, StreamObserver<SendMessageR
long startPlaceMessage = System.currentTimeMillis();
try {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Sending message request: " + shortDebugString(request));
LOGGER.debug("Sending message request: " + shortDebugString(request));
}

String actName = "sendMessage";
Expand Down Expand Up @@ -261,6 +263,52 @@ public void sendMessage(PlaceMessageRequest request, StreamObserver<SendMessageR
}
}

@Override
public void multiSendMessage(MultiSendRequest request, StreamObserver<SendMessageResponse> responseObserver) {
long startPlaceMessage = System.currentTimeMillis();
try {
var messages = request.getMessagesList();
if (messages.isEmpty()) {
throw new IllegalArgumentException("Empty message list is not allowed");
}

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Sending message request: " + shortDebugString(request));
}

String actName = "multiSendMessage";
// FIXME store parent with fail in case of children fail
EventID parentId = createAndStoreParentEvent(request, actName, PASSED);

Checkpoint checkpoint = registerCheckPoint(parentId);

if (Context.current().isCancelled()) {
LOGGER.warn("'{}' request cancelled by client", actName);
sendMessageErrorResponse(responseObserver, "Request has been cancelled by client");
}

try {
multiSendMessage(messages, parentId);
} catch (Exception ex) {
createAndStoreErrorEvent("sendMessage", ex.getMessage(), Instant.now(), parentId);
throw ex;
}

SendMessageResponse response = SendMessageResponse.newBuilder()
.setStatus(RequestStatus.newBuilder().setStatus(SUCCESS).build())
.setCheckpointId(checkpoint)
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();

} catch (RuntimeException | IOException e) {
LOGGER.error("Failed to send a messages. Message = {}", request, e);
sendMessageErrorResponse(responseObserver, "Send messages failed. Error: " + e.getMessage());
} finally {
LOGGER.debug("Sending the message has been finished in {}", System.currentTimeMillis() - startPlaceMessage);
}
}

@Override
public void placeOrderMassCancelRequestFIX(PlaceMessageRequest request, StreamObserver<PlaceMessageResponse> responseObserver) {
try {
Expand Down Expand Up @@ -433,7 +481,28 @@ private EventID createAndStoreParentEvent(PlaceMessageRequestOrBuilder request,
com.exactpro.th2.common.grpc.Event protoEvent = event.toProto(request.getParentEventId());
//FIXME process response
try {
eventBatchMessageRouter.send(EventBatch.newBuilder().addEvents(event.toProto(request.getParentEventId())).build(), "publish", "event");
eventBatchMessageRouter.send(EventBatch.newBuilder().addEvents(event.toProto(request.getParentEventId())).build());
LOGGER.debug("createAndStoreParentEvent for {} in {} ms", actName, System.currentTimeMillis() - startTime);
return protoEvent.getId();
} catch (IOException e) {
throw new RuntimeException("Can not send event = " + protoEvent.getId().getId(), e);
}
}

private EventID createAndStoreParentEvent(MultiSendRequest request, String actName, Status status) throws IOException {
long startTime = System.currentTimeMillis();

Event event = start()
.name(actName + ' ' + request.getMessages(0).getMetadata().getId().getConnectionId().getSessionAlias())
.description(request.getDescription())
.type(actName)
.status(status)
.endTimestamp();

com.exactpro.th2.common.grpc.Event protoEvent = event.toProto(request.getParentEventId());
//FIXME process response
try {
eventBatchMessageRouter.send(EventBatch.newBuilder().addEvents(event.toProto(request.getParentEventId())).build());
LOGGER.debug("createAndStoreParentEvent for {} in {} ms", actName, System.currentTimeMillis() - startTime);
return protoEvent.getId();
} catch (IOException e) {
Expand All @@ -453,7 +522,7 @@ private EventID createAndStoreParentEvent(SendRawMessageRequest request, String
com.exactpro.th2.common.grpc.Event protoEvent = event.toProto(request.getParentEventId());
//FIXME process response
try {
eventBatchMessageRouter.send(EventBatch.newBuilder().addEvents(event.toProto(request.getParentEventId())).build(), "publish", "event");
eventBatchMessageRouter.send(EventBatch.newBuilder().addEvents(event.toProto(request.getParentEventId())).build());
LOGGER.debug("createAndStoreParentEvent for {} in {} ms", actName, System.currentTimeMillis() - startTime);
return protoEvent.getId();
} catch (IOException e) {
Expand Down Expand Up @@ -563,7 +632,7 @@ private void sendMessage(Message message, EventID parentEventId) throws IOExcept
ParsedMessage parsedMessage = MessageUtilsKt.toTransportBuilder(message)
.setEventId(EventUtilsKt.toTransport(parentEventId))
.build();
//May be use in future for filtering
//May be used in future for filtering
//request.getConnectionId().getSessionAlias();
MessageID messageID = message.getMetadata().getId();
messageRouter.send(toBatch(toGroup(parsedMessage), messageID.getBookName(), messageID.getConnectionId().getSessionGroup()), SEND_QUEUE_ATTRIBUTE);
Expand All @@ -572,12 +641,39 @@ private void sendMessage(Message message, EventID parentEventId) throws IOExcept
EventBatch eventBatch = EventBatch.newBuilder()
.addEvents(createSendMessageEvent(parsedMessage, parentEventId))
.build();
eventBatchMessageRouter.send(eventBatch, "publish", "event");
eventBatchMessageRouter.send(eventBatch);
} finally {
LOGGER.debug("Sending the message ended");
}
}

private void multiSendMessage(List<Message> messages, EventID parentEventId) throws IOException {
try {
LOGGER.debug("Sending the message list started");

final var firstMessageId = messages.get(0).getMetadata().getId();
final var bookName = firstMessageId.getBookName();
final var sessionGroup = firstMessageId.getConnectionId().getSessionGroup();

// currently all messages should be placed into one batch, and therefore they should have
// the same 'bookName' and 'sessionGroup' fields
// later we can implement batching based on book name and session group
if(!messages.stream().allMatch(msg -> msg.getMetadata().getId().getBookName().equals(bookName)
&& msg.getMetadata().getId().getConnectionId().getSessionGroup().equals(sessionGroup))) {
throw new IllegalArgumentException("All messages in 'multiSendMessage' should have the same 'bookName' (" + bookName + ") and 'sessionGroup' (" + sessionGroup + ") fields");
}

var batch = new GroupBatch(bookName, sessionGroup, messages.stream()
.map(msg -> toGroup(MessageUtilsKt.toTransportBuilder(msg).setEventId(EventUtilsKt.toTransport(parentEventId)).build()))
.collect(Collectors.toList())
);

messageRouter.send(batch, SEND_QUEUE_ATTRIBUTE);
} finally {
LOGGER.debug("Sending the message list completed");
}
}

private void sendRawMessage(com.exactpro.th2.common.grpc.RawMessage rawMessage, EventID parentEventId) throws IOException {
try {
LOGGER.debug("Sending the message started");
Expand All @@ -595,7 +691,7 @@ private void sendRawMessage(com.exactpro.th2.common.grpc.RawMessage rawMessage,
EventBatch eventBatch = EventBatch.newBuilder()
.addEvents(createSendRawMessageEvent(transportRawMessage, parentEventId))
.build();
eventBatchMessageRouter.send(eventBatch, "publish", "event");
eventBatchMessageRouter.send(eventBatch);
} finally {
LOGGER.debug("Sending the message ended");
}
Expand Down Expand Up @@ -636,7 +732,7 @@ private void storeEvent(com.exactpro.th2.common.grpc.Event eventRequest) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Try to store event: {}", toDebugMessage(eventRequest));
}
eventBatchMessageRouter.send(EventBatch.newBuilder().addEvents(eventRequest).build(), "publish", "event");
eventBatchMessageRouter.send(EventBatch.newBuilder().addEvents(eventRequest).build());
} catch (Exception e) {
LOGGER.error("Could not store event", e);
}
Expand Down Expand Up @@ -785,4 +881,4 @@ public ConnectionID getConnectionID() {
return connectionID;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2021-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.th2.act

import com.exactpro.th2.act.impl.SubscriptionManagerImpl
Expand All @@ -27,8 +28,8 @@ import com.exactpro.th2.common.utils.message.MessageHolder
import com.exactpro.th2.common.utils.message.toTransport
import com.exactpro.th2.common.utils.message.transport.toBatch
import com.exactpro.th2.common.utils.message.transport.toGroup
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.verify
import org.mockito.kotlin.mock
import org.mockito.kotlin.verify
import org.junit.jupiter.api.Test
import strikt.api.expect
import strikt.assertions.containsExactlyInAnyOrder
Expand All @@ -44,7 +45,7 @@ class TestBiDirectionalMessageReceiver {
.build()
private val manager = SubscriptionManagerImpl()
private val monitor: ResponseMonitor = mock { }
private val deliveryMetadata: DeliveryMetadata = mock { }
private val deliveryMetadata = DeliveryMetadata("test_tag_1", false)

private fun receiver(outgoing: CheckRule, incomingSupplier: (MessageHolder) -> CheckRule): AbstractMessageReceiver =
BiDirectionalMessageReceiver(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 Exactpro (Exactpro Systems Limited)
* Copyright 2021-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.th2.act.impl

import com.exactpro.th2.act.Listener
Expand All @@ -26,11 +27,11 @@ import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.ParsedMess
import com.exactpro.th2.common.utils.message.TransportMessageHolder
import com.exactpro.th2.common.utils.message.transport.toBatch
import com.exactpro.th2.common.utils.message.transport.toGroup
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.eq
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.never
import com.nhaarman.mockitokotlin2.verify
import org.mockito.kotlin.any
import org.mockito.kotlin.eq
import org.mockito.kotlin.mock
import org.mockito.kotlin.never
import org.mockito.kotlin.verify
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
Expand All @@ -39,7 +40,7 @@ import org.junit.jupiter.params.provider.EnumSource
class TestSubscriptionManagerImpl {

private val manager = SubscriptionManagerImpl()
private val deliveryMetadata: DeliveryMetadata = mock { }
private val deliveryMetadata = DeliveryMetadata("test_tag_1", false)

@Test
fun `correctly distributes the batches`() {
Expand Down

0 comments on commit c813298

Please sign in to comment.