Skip to content

Commit

Permalink
fix(sqs inbound): add try catch for correlate method (#701)
Browse files Browse the repository at this point in the history
  • Loading branch information
Oleksiivanov authored Jun 8, 2023
1 parent 7632dea commit bb23b4b
Showing 1 changed file with 24 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.InboundConnectorResult;
import io.camunda.connector.impl.ConnectorInputException;
import io.camunda.connector.inbound.model.SqsInboundProperties;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -38,28 +39,38 @@ public SqsQueueConsumer(
@Override
public void run() {
LOGGER.info("Started SQS consumer for queue {}", properties.getQueue().getUrl());

final ReceiveMessageRequest receiveMessageRequest = createReceiveMessageRequest();
ReceiveMessageResult receiveMessageResult;
do {
receiveMessageResult = sqsClient.receiveMessage(receiveMessageRequest);
List<Message> messages = receiveMessageResult.getMessages();
for (Message message : messages) {
InboundConnectorResult<?> correlate =
context.correlate(MessageMapper.toSqsInboundMessage(message));
if (correlate.isActivated()) {
sqsClient.deleteMessage(properties.getQueue().getUrl(), message.getReceiptHandle());
LOGGER.debug(
"Inbound event correlated successfully: {}, and message with ID {} was deleted",
correlate.getResponseData(),
message.getMessageId());
} else {
LOGGER.debug("Inbound event not correlated: {}", correlate.getErrorData());
try {
receiveMessageResult = sqsClient.receiveMessage(receiveMessageRequest);
List<Message> messages = receiveMessageResult.getMessages();
for (Message message : messages) {
try {
correlate(message);
sqsClient.deleteMessage(properties.getQueue().getUrl(), message.getReceiptHandle());
} catch (ConnectorInputException e) {
LOGGER.warn("NACK - failed to parse SQS message body: {}", e.getMessage());
}
}
} catch (Exception e) {
LOGGER.debug("NACK - failed to correlate event", e);
}
} while (queueConsumerActive.get());
LOGGER.info("Stopping SQS consumer for queue {}", properties.getQueue().getUrl());
}

private void correlate(final Message message) {
InboundConnectorResult<?> correlate =
context.correlate(MessageMapper.toSqsInboundMessage(message));
if (correlate.isActivated()) {
LOGGER.debug("Inbound event correlated successfully: {}", correlate.getResponseData());
} else {
LOGGER.debug("Inbound event was correlated but not activated: {}", correlate.getErrorData());
}
}

private ReceiveMessageRequest createReceiveMessageRequest() {
ReceiveMessageRequest receiveMessageRequest =
new ReceiveMessageRequest()
Expand Down

0 comments on commit bb23b4b

Please sign in to comment.