diff --git a/Brighter/paramore.brighter.commandprocessor.messaginggateway.rmq/HeaderNames.cs b/Brighter/paramore.brighter.commandprocessor.messaginggateway.rmq/HeaderNames.cs index 5c09fe2202..7fc108807b 100644 --- a/Brighter/paramore.brighter.commandprocessor.messaginggateway.rmq/HeaderNames.cs +++ b/Brighter/paramore.brighter.commandprocessor.messaginggateway.rmq/HeaderNames.cs @@ -52,6 +52,10 @@ public class HeaderNames /// public const string MESSAGE_ID = "MessageId"; /// + /// The correlation id + /// + public const string CORRELATION_ID = "CorrelationId"; + /// /// The topic{CC2D43FA-BBC4-448A-9D0B-7B57ADF2655C} /// public const string TOPIC = "Topic"; @@ -72,7 +76,7 @@ public class HeaderNames /// public const string ORIGINAL_MESSAGE_ID = Message.OriginalMessageIdHeaderName; /// - /// Tag used to identify this message in the sequence against it's Id (used to perform multiple ack against Id upto Tag). + /// Tag used to identify this message in the sequence against its Id (used to perform multiple ack against Id upto Tag). /// public const string DELIVERY_TAG = Message.DeliveryTagHeaderName; } diff --git a/Brighter/paramore.brighter.commandprocessor.messaginggateway.rmq/RmqMessageCreator.cs b/Brighter/paramore.brighter.commandprocessor.messaginggateway.rmq/RmqMessageCreator.cs index f15a5ec835..6af4d793a2 100644 --- a/Brighter/paramore.brighter.commandprocessor.messaginggateway.rmq/RmqMessageCreator.cs +++ b/Brighter/paramore.brighter.commandprocessor.messaginggateway.rmq/RmqMessageCreator.cs @@ -99,13 +99,13 @@ public Message CreateMessage(BasicDeliverEventArgs fromQueue) } else { - string body = Encoding.UTF8.GetString(fromQueue.Body); + var messageHeader = timeStamp.Success + ? new MessageHeader(messageId.Result, topic.Result, messageType.Result, timeStamp.Result, handledCount.Result, delayedMilliseconds.Result) + : new MessageHeader(messageId.Result, topic.Result, messageType.Result); - message = new Message( - timeStamp.Success ? new MessageHeader(messageId.Result, topic.Result, messageType.Result, timeStamp.Result, handledCount.Result, delayedMilliseconds.Result) : new MessageHeader(messageId.Result, topic.Result, messageType.Result), - new MessageBody(body)); + message = new Message(messageHeader, new MessageBody(Encoding.UTF8.GetString(fromQueue.Body))); - headers.Each(header => message.Header.Bag.Add(header.Key, Encoding.UTF8.GetString((byte[])header.Value))); + headers.Each(header => message.Header.Bag.Add(header.Key, ParseHeaderValue(header.Value))); } } catch (Exception e) @@ -114,6 +114,12 @@ public Message CreateMessage(BasicDeliverEventArgs fromQueue) message = FailureMessage(topic, messageId); } + if (headers.ContainsKey(HeaderNames.CORRELATION_ID)) + { + var correlationId = Encoding.UTF8.GetString((byte[])headers[HeaderNames.CORRELATION_ID]); + message.Header.CorrelationId = Guid.Parse(correlationId); + } + message.SetDeliveryTag(fromQueue.DeliveryTag); return message; @@ -214,5 +220,11 @@ private HeaderResult ReadMessageId(string messageId) _logger.DebugFormat("Could not parse message MessageId, new message id is {0}", Guid.Empty); return new HeaderResult(Guid.Empty, false); } + + private static object ParseHeaderValue(object value) + { + var bytes = value as byte[]; + return bytes != null ? Encoding.UTF8.GetString(bytes) : value; + } } } \ No newline at end of file diff --git a/Brighter/paramore.brighter.commandprocessor.messaginggateway.rmq/RmqMessagePublisher.cs b/Brighter/paramore.brighter.commandprocessor.messaginggateway.rmq/RmqMessagePublisher.cs index c0eac41ca1..3aea6f6e5b 100644 --- a/Brighter/paramore.brighter.commandprocessor.messaginggateway.rmq/RmqMessagePublisher.cs +++ b/Brighter/paramore.brighter.commandprocessor.messaginggateway.rmq/RmqMessagePublisher.cs @@ -92,26 +92,28 @@ public RmqMessagePublisher(IModel channel, string exchangeName, ILog logger) /// Publishes the message. /// /// The message. - /// User specified message headers. - /// Generate new unique message identifier. + /// The delay in ms. public void PublishMessage(Message message, int delayMilliseconds) { var messageId = message.Id; var deliveryTag = message.Header.Bag.ContainsKey(HeaderNames.DELIVERY_TAG) ? message.GetDeliveryTag().ToString() : null; var headers = new Dictionary - { - {HeaderNames.MESSAGE_TYPE, message.Header.MessageType.ToString()}, - {HeaderNames.TOPIC, message.Header.Topic}, - {HeaderNames.HANDLED_COUNT, message.Header.HandledCount.ToString(CultureInfo.InvariantCulture)}, - }; + { + {HeaderNames.MESSAGE_TYPE, message.Header.MessageType.ToString()}, + {HeaderNames.TOPIC, message.Header.Topic}, + {HeaderNames.HANDLED_COUNT, message.Header.HandledCount.ToString(CultureInfo.InvariantCulture)} + }; - message.Header.Bag.Each((header) => + if (message.Header.CorrelationId != Guid.Empty) + headers.Add(HeaderNames.CORRELATION_ID, message.Header.CorrelationId.ToString()); + + message.Header.Bag.Each(header => { if (!HeadersToReset.Any(htr => htr.Equals(header.Key))) headers.Add(header.Key, header.Value); }); - if (!String.IsNullOrEmpty(deliveryTag)) + if (!string.IsNullOrEmpty(deliveryTag)) headers.Add(HeaderNames.DELIVERY_TAG, deliveryTag); if (delayMilliseconds > 0) @@ -129,6 +131,7 @@ public void PublishMessage(Message message, int delayMilliseconds) /// Requeues the message. /// /// The message. + /// The queue name. /// Delay in ms. public void RequeueMessage(Message message, string queueName, int delayMilliseconds) { @@ -139,11 +142,14 @@ public void RequeueMessage(Message message, string queueName, int delayMilliseco _logger.InfoFormat("RmqMessagePublisher: Regenerating message {0} with DeliveryTag of {1} to {2} with DeliveryTag of {3}", message.Id, DeliveryTag, messageId, 1); var headers = new Dictionary - { - {HeaderNames.MESSAGE_TYPE, message.Header.MessageType.ToString()}, - {HeaderNames.TOPIC, message.Header.Topic}, - {HeaderNames.HANDLED_COUNT, message.Header.HandledCount.ToString(CultureInfo.InvariantCulture)}, - }; + { + {HeaderNames.MESSAGE_TYPE, message.Header.MessageType.ToString()}, + {HeaderNames.TOPIC, message.Header.Topic}, + {HeaderNames.HANDLED_COUNT, message.Header.HandledCount.ToString(CultureInfo.InvariantCulture)}, + }; + + if (message.Header.CorrelationId != Guid.Empty) + headers.Add(HeaderNames.CORRELATION_ID, message.Header.CorrelationId.ToString()); message.Header.Bag.Each((header) => {