From 024d2187776b85617563cca427f326552a0b48f8 Mon Sep 17 00:00:00 2001 From: Oleg Smelov Date: Wed, 10 Jan 2024 01:12:56 +0100 Subject: [PATCH] multiSendMessage method added --- build.gradle | 6 +- .../java/com/exactpro/th2/act/ActHandler.java | 106 +++++++++++++++++- 2 files changed, 106 insertions(+), 6 deletions(-) diff --git a/build.gradle b/build.gradle index 85b7ceb..ca21355 100644 --- a/build.gradle +++ b/build.gradle @@ -92,8 +92,8 @@ jar { dependencies { api platform('com.exactpro.th2:bom:4.5.0') - implementation 'com.exactpro.th2:common:5.4.0-dev' - implementation 'com.exactpro.th2:common-utils:2.2.0-dev' + implementation 'com.exactpro.th2:common:5.7.2-dev' + implementation 'com.exactpro.th2:common-utils:2.2.2-dev' implementation 'com.exactpro.th2:grpc-act-template:4.2.0' implementation 'com.exactpro.th2:grpc-check1:4.2.0-dev' @@ -104,7 +104,7 @@ dependencies { implementation "org.apache.commons:commons-lang3" - testImplementation 'org.junit.jupiter:junit-jupiter:5.10.0' + testImplementation 'org.junit.jupiter:junit-jupiter:5.10.1' testImplementation 'io.strikt:strikt-core:0.34.1' testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0" } diff --git a/src/main/java/com/exactpro/th2/act/ActHandler.java b/src/main/java/com/exactpro/th2/act/ActHandler.java index f1bc3cd..e2391ac 100644 --- a/src/main/java/com/exactpro/th2/act/ActHandler.java +++ b/src/main/java/com/exactpro/th2/act/ActHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ import com.exactpro.th2.act.grpc.ActGrpc.ActImplBase; import com.exactpro.th2.act.grpc.PlaceMessageRequest; +import com.exactpro.th2.act.grpc.MultiSendRequest; import com.exactpro.th2.act.grpc.PlaceMessageRequestOrBuilder; import com.exactpro.th2.act.grpc.PlaceMessageResponse; import com.exactpro.th2.act.grpc.SendMessageResponse; @@ -225,7 +226,7 @@ public void sendMessage(PlaceMessageRequest request, StreamObserver responseObserver) { + long startPlaceMessage = System.currentTimeMillis(); + try { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Sending message request: " + shortDebugString(request)); + } + + String actName = "multiSendMessage"; + // FIXME store parent with fail in case of children fail + EventID parentId = createAndStoreParentEvent(request, actName, PASSED); + + Checkpoint checkpoint = registerCheckPoint(parentId); + + if (Context.current().isCancelled()) { + LOGGER.warn("'{}' request cancelled by client", actName); + sendMessageErrorResponse(responseObserver, "Request has been cancelled by client"); + } + + try { + multiSendMessage(request.getMessagesList(), parentId); + } catch (Exception ex) { + createAndStoreErrorEvent("sendMessage", ex.getMessage(), Instant.now(), parentId); + throw ex; + } + + SendMessageResponse response = SendMessageResponse.newBuilder() + .setStatus(RequestStatus.newBuilder().setStatus(SUCCESS).build()) + .setCheckpointId(checkpoint) + .build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + + } catch (RuntimeException | IOException e) { + LOGGER.error("Failed to send a messages. Message = {}", request, e); + sendMessageErrorResponse(responseObserver, "Send messages failed. Error: " + e.getMessage()); + } finally { + LOGGER.debug("Sending the message has been finished in {}", System.currentTimeMillis() - startPlaceMessage); + } + } + @Override public void placeOrderMassCancelRequestFIX(PlaceMessageRequest request, StreamObserver responseObserver) { try { @@ -441,6 +483,27 @@ private EventID createAndStoreParentEvent(PlaceMessageRequestOrBuilder request, } } + private EventID createAndStoreParentEvent(MultiSendRequest request, String actName, Status status) throws IOException { + long startTime = System.currentTimeMillis(); + + Event event = start() + .name(actName + ' ' + request.getMessages(0).getMetadata().getId().getConnectionId().getSessionAlias()) + .description(request.getDescription()) + .type(actName) + .status(status) + .endTimestamp(); // FIXME set properly as is in the last child + + com.exactpro.th2.common.grpc.Event protoEvent = event.toProto(request.getParentEventId()); + //FIXME process response + try { + eventBatchMessageRouter.send(EventBatch.newBuilder().addEvents(event.toProto(request.getParentEventId())).build(), "publish", "event"); + LOGGER.debug("createAndStoreParentEvent for {} in {} ms", actName, System.currentTimeMillis() - startTime); + return protoEvent.getId(); + } catch (IOException e) { + throw new RuntimeException("Can not send event = " + protoEvent.getId().getId(), e); + } + } + private EventID createAndStoreParentEvent(SendRawMessageRequest request, String actName, Status status) throws IOException { long startTime = System.currentTimeMillis(); @@ -563,7 +626,7 @@ private void sendMessage(Message message, EventID parentEventId) throws IOExcept ParsedMessage parsedMessage = MessageUtilsKt.toTransportBuilder(message) .setEventId(EventUtilsKt.toTransport(parentEventId)) .build(); - //May be use in future for filtering + //May be used in future for filtering //request.getConnectionId().getSessionAlias(); MessageID messageID = message.getMetadata().getId(); messageRouter.send(toBatch(toGroup(parsedMessage), messageID.getBookName(), messageID.getConnectionId().getSessionGroup()), SEND_QUEUE_ATTRIBUTE); @@ -578,6 +641,43 @@ private void sendMessage(Message message, EventID parentEventId) throws IOExcept } } + private void multiSendMessage(List messages, EventID parentEventId) throws IOException { + try { + LOGGER.debug("Sending the message started"); + + if (messages.isEmpty()) { + throw new IllegalArgumentException("Empty message list is not allowed"); + } + + final var firstMessageId = messages.get(0).getMetadata().getId(); + final var bookName = firstMessageId.getBookName(); + final var sessionGroup = firstMessageId.getConnectionId().getSessionGroup(); + + // currently all messages should be placed into one batch, and therefore they should have + // the same 'bookName' and 'sessionGroup' fields + // later we can implement batching based on book name and session group + if(!messages.stream().allMatch(msg -> msg.getMetadata().getId().getBookName().equals(bookName) + && msg.getMetadata().getId().getConnectionId().getSessionGroup().equals(sessionGroup))) { + throw new IllegalArgumentException("All messages in 'multiSendMessage' should have the same 'bookName' (" + bookName + ") and 'sessionGroup' (" + sessionGroup + ") fields"); + } + + var batch = new GroupBatch(bookName, sessionGroup, messages.stream() + .map(msg -> toGroup(MessageUtilsKt.toTransportBuilder(msg).setEventId(EventUtilsKt.toTransport(parentEventId)).build())) + .collect(Collectors.toList()) + ); + + messageRouter.send(batch, SEND_QUEUE_ATTRIBUTE); + //TODO remove after solving issue TH2-217 + //TODO process response +/* EventBatch eventBatch = EventBatch.newBuilder() + .addEvents(createSendMessageEvent(parsedMessage, parentEventId)) + .build(); + eventBatchMessageRouter.send(eventBatch, "publish", "event");*/ + } finally { + LOGGER.debug("Sending the message ended"); + } + } + private void sendRawMessage(com.exactpro.th2.common.grpc.RawMessage rawMessage, EventID parentEventId) throws IOException { try { LOGGER.debug("Sending the message started");