Skip to content

Commit

Permalink
multiSendMessage method added
Browse files Browse the repository at this point in the history
  • Loading branch information
Oleg Smelov committed Jan 10, 2024
1 parent 434d5ce commit 024d218
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 6 deletions.
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ jar {
dependencies {
api platform('com.exactpro.th2:bom:4.5.0')

implementation 'com.exactpro.th2:common:5.4.0-dev'
implementation 'com.exactpro.th2:common-utils:2.2.0-dev'
implementation 'com.exactpro.th2:common:5.7.2-dev'
implementation 'com.exactpro.th2:common-utils:2.2.2-dev'
implementation 'com.exactpro.th2:grpc-act-template:4.2.0'
implementation 'com.exactpro.th2:grpc-check1:4.2.0-dev'

Expand All @@ -104,7 +104,7 @@ dependencies {

implementation "org.apache.commons:commons-lang3"

testImplementation 'org.junit.jupiter:junit-jupiter:5.10.0'
testImplementation 'org.junit.jupiter:junit-jupiter:5.10.1'
testImplementation 'io.strikt:strikt-core:0.34.1'
testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0"
}
Expand Down
106 changes: 103 additions & 3 deletions src/main/java/com/exactpro/th2/act/ActHandler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@

import com.exactpro.th2.act.grpc.ActGrpc.ActImplBase;
import com.exactpro.th2.act.grpc.PlaceMessageRequest;
import com.exactpro.th2.act.grpc.MultiSendRequest;
import com.exactpro.th2.act.grpc.PlaceMessageRequestOrBuilder;
import com.exactpro.th2.act.grpc.PlaceMessageResponse;
import com.exactpro.th2.act.grpc.SendMessageResponse;
Expand Down Expand Up @@ -225,7 +226,7 @@ public void sendMessage(PlaceMessageRequest request, StreamObserver<SendMessageR
long startPlaceMessage = System.currentTimeMillis();
try {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Sending message request: " + shortDebugString(request));
LOGGER.debug("Sending message request: " + shortDebugString(request));
}

String actName = "sendMessage";
Expand Down Expand Up @@ -261,6 +262,47 @@ public void sendMessage(PlaceMessageRequest request, StreamObserver<SendMessageR
}
}

@Override
public void multiSendMessage(MultiSendRequest request, StreamObserver<SendMessageResponse> responseObserver) {
long startPlaceMessage = System.currentTimeMillis();
try {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Sending message request: " + shortDebugString(request));
}

String actName = "multiSendMessage";
// FIXME store parent with fail in case of children fail
EventID parentId = createAndStoreParentEvent(request, actName, PASSED);

Checkpoint checkpoint = registerCheckPoint(parentId);

if (Context.current().isCancelled()) {
LOGGER.warn("'{}' request cancelled by client", actName);
sendMessageErrorResponse(responseObserver, "Request has been cancelled by client");
}

try {
multiSendMessage(request.getMessagesList(), parentId);
} catch (Exception ex) {
createAndStoreErrorEvent("sendMessage", ex.getMessage(), Instant.now(), parentId);
throw ex;
}

SendMessageResponse response = SendMessageResponse.newBuilder()
.setStatus(RequestStatus.newBuilder().setStatus(SUCCESS).build())
.setCheckpointId(checkpoint)
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();

} catch (RuntimeException | IOException e) {
LOGGER.error("Failed to send a messages. Message = {}", request, e);
sendMessageErrorResponse(responseObserver, "Send messages failed. Error: " + e.getMessage());
} finally {
LOGGER.debug("Sending the message has been finished in {}", System.currentTimeMillis() - startPlaceMessage);
}
}

@Override
public void placeOrderMassCancelRequestFIX(PlaceMessageRequest request, StreamObserver<PlaceMessageResponse> responseObserver) {
try {
Expand Down Expand Up @@ -441,6 +483,27 @@ private EventID createAndStoreParentEvent(PlaceMessageRequestOrBuilder request,
}
}

private EventID createAndStoreParentEvent(MultiSendRequest request, String actName, Status status) throws IOException {
long startTime = System.currentTimeMillis();

Event event = start()
.name(actName + ' ' + request.getMessages(0).getMetadata().getId().getConnectionId().getSessionAlias())
.description(request.getDescription())
.type(actName)
.status(status)
.endTimestamp(); // FIXME set properly as is in the last child

com.exactpro.th2.common.grpc.Event protoEvent = event.toProto(request.getParentEventId());
//FIXME process response
try {
eventBatchMessageRouter.send(EventBatch.newBuilder().addEvents(event.toProto(request.getParentEventId())).build(), "publish", "event");
LOGGER.debug("createAndStoreParentEvent for {} in {} ms", actName, System.currentTimeMillis() - startTime);
return protoEvent.getId();
} catch (IOException e) {
throw new RuntimeException("Can not send event = " + protoEvent.getId().getId(), e);
}
}

private EventID createAndStoreParentEvent(SendRawMessageRequest request, String actName, Status status) throws IOException {
long startTime = System.currentTimeMillis();

Expand Down Expand Up @@ -563,7 +626,7 @@ private void sendMessage(Message message, EventID parentEventId) throws IOExcept
ParsedMessage parsedMessage = MessageUtilsKt.toTransportBuilder(message)
.setEventId(EventUtilsKt.toTransport(parentEventId))
.build();
//May be use in future for filtering
//May be used in future for filtering
//request.getConnectionId().getSessionAlias();
MessageID messageID = message.getMetadata().getId();
messageRouter.send(toBatch(toGroup(parsedMessage), messageID.getBookName(), messageID.getConnectionId().getSessionGroup()), SEND_QUEUE_ATTRIBUTE);
Expand All @@ -578,6 +641,43 @@ private void sendMessage(Message message, EventID parentEventId) throws IOExcept
}
}

private void multiSendMessage(List<Message> messages, EventID parentEventId) throws IOException {
try {
LOGGER.debug("Sending the message started");

if (messages.isEmpty()) {
throw new IllegalArgumentException("Empty message list is not allowed");
}

final var firstMessageId = messages.get(0).getMetadata().getId();
final var bookName = firstMessageId.getBookName();
final var sessionGroup = firstMessageId.getConnectionId().getSessionGroup();

// currently all messages should be placed into one batch, and therefore they should have
// the same 'bookName' and 'sessionGroup' fields
// later we can implement batching based on book name and session group
if(!messages.stream().allMatch(msg -> msg.getMetadata().getId().getBookName().equals(bookName)
&& msg.getMetadata().getId().getConnectionId().getSessionGroup().equals(sessionGroup))) {
throw new IllegalArgumentException("All messages in 'multiSendMessage' should have the same 'bookName' (" + bookName + ") and 'sessionGroup' (" + sessionGroup + ") fields");
}

var batch = new GroupBatch(bookName, sessionGroup, messages.stream()
.map(msg -> toGroup(MessageUtilsKt.toTransportBuilder(msg).setEventId(EventUtilsKt.toTransport(parentEventId)).build()))
.collect(Collectors.toList())
);

messageRouter.send(batch, SEND_QUEUE_ATTRIBUTE);
//TODO remove after solving issue TH2-217
//TODO process response
/* EventBatch eventBatch = EventBatch.newBuilder()
.addEvents(createSendMessageEvent(parsedMessage, parentEventId))
.build();
eventBatchMessageRouter.send(eventBatch, "publish", "event");*/
} finally {
LOGGER.debug("Sending the message ended");
}
}

private void sendRawMessage(com.exactpro.th2.common.grpc.RawMessage rawMessage, EventID parentEventId) throws IOException {
try {
LOGGER.debug("Sending the message started");
Expand Down

0 comments on commit 024d218

Please sign in to comment.