Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support all types supported by rabbit in message headers instead of just strings #180

Merged
merged 2 commits into from
Apr 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public class HeaderNames
/// </summary>
public const string MESSAGE_ID = "MessageId";
/// <summary>
/// The correlation id
/// </summary>
public const string CORRELATION_ID = "CorrelationId";
/// <summary>
/// The topic{CC2D43FA-BBC4-448A-9D0B-7B57ADF2655C}
/// </summary>
public const string TOPIC = "Topic";
Expand All @@ -72,7 +76,7 @@ public class HeaderNames
/// </summary>
public const string ORIGINAL_MESSAGE_ID = Message.OriginalMessageIdHeaderName;
/// <summary>
/// Tag used to identify this message in the sequence against it's Id (used to perform multiple ack against Id upto Tag).
/// Tag used to identify this message in the sequence against its Id (used to perform multiple ack against Id upto Tag).
/// </summary>
public const string DELIVERY_TAG = Message.DeliveryTagHeaderName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ public Message CreateMessage(BasicDeliverEventArgs fromQueue)
}
else
{
string body = Encoding.UTF8.GetString(fromQueue.Body);
var messageHeader = timeStamp.Success
? new MessageHeader(messageId.Result, topic.Result, messageType.Result, timeStamp.Result, handledCount.Result, delayedMilliseconds.Result)
: new MessageHeader(messageId.Result, topic.Result, messageType.Result);

message = new Message(
timeStamp.Success ? new MessageHeader(messageId.Result, topic.Result, messageType.Result, timeStamp.Result, handledCount.Result, delayedMilliseconds.Result) : new MessageHeader(messageId.Result, topic.Result, messageType.Result),
new MessageBody(body));
message = new Message(messageHeader, new MessageBody(Encoding.UTF8.GetString(fromQueue.Body)));

headers.Each(header => message.Header.Bag.Add(header.Key, Encoding.UTF8.GetString((byte[])header.Value)));
headers.Each(header => message.Header.Bag.Add(header.Key, ParseHeaderValue(header.Value)));
}
}
catch (Exception e)
Expand All @@ -114,6 +114,12 @@ public Message CreateMessage(BasicDeliverEventArgs fromQueue)
message = FailureMessage(topic, messageId);
}

if (headers.ContainsKey(HeaderNames.CORRELATION_ID))
{
var correlationId = Encoding.UTF8.GetString((byte[])headers[HeaderNames.CORRELATION_ID]);
message.Header.CorrelationId = Guid.Parse(correlationId);
}

message.SetDeliveryTag(fromQueue.DeliveryTag);

return message;
Expand Down Expand Up @@ -214,5 +220,11 @@ private HeaderResult<Guid> ReadMessageId(string messageId)
_logger.DebugFormat("Could not parse message MessageId, new message id is {0}", Guid.Empty);
return new HeaderResult<Guid>(Guid.Empty, false);
}

private static object ParseHeaderValue(object value)
{
var bytes = value as byte[];
return bytes != null ? Encoding.UTF8.GetString(bytes) : value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,26 +92,28 @@ public RmqMessagePublisher(IModel channel, string exchangeName, ILog logger)
/// Publishes the message.
/// </summary>
/// <param name="message">The message.</param>
/// <param name="headers">User specified message headers.</param>
/// <param name="regenerate">Generate new unique message identifier.</param>
/// <param name="delayMilliseconds">The delay in ms.</param>
public void PublishMessage(Message message, int delayMilliseconds)
{
var messageId = message.Id;
var deliveryTag = message.Header.Bag.ContainsKey(HeaderNames.DELIVERY_TAG) ? message.GetDeliveryTag().ToString() : null;

var headers = new Dictionary<string, object>
{
{HeaderNames.MESSAGE_TYPE, message.Header.MessageType.ToString()},
{HeaderNames.TOPIC, message.Header.Topic},
{HeaderNames.HANDLED_COUNT, message.Header.HandledCount.ToString(CultureInfo.InvariantCulture)},
};
{
{HeaderNames.MESSAGE_TYPE, message.Header.MessageType.ToString()},
{HeaderNames.TOPIC, message.Header.Topic},
{HeaderNames.HANDLED_COUNT, message.Header.HandledCount.ToString(CultureInfo.InvariantCulture)}
};

message.Header.Bag.Each((header) =>
if (message.Header.CorrelationId != Guid.Empty)
headers.Add(HeaderNames.CORRELATION_ID, message.Header.CorrelationId.ToString());

message.Header.Bag.Each(header =>
{
if (!HeadersToReset.Any(htr => htr.Equals(header.Key))) headers.Add(header.Key, header.Value);
});

if (!String.IsNullOrEmpty(deliveryTag))
if (!string.IsNullOrEmpty(deliveryTag))
headers.Add(HeaderNames.DELIVERY_TAG, deliveryTag);

if (delayMilliseconds > 0)
Expand All @@ -129,6 +131,7 @@ public void PublishMessage(Message message, int delayMilliseconds)
/// Requeues the message.
/// </summary>
/// <param name="message">The message.</param>
/// <param name="queueName">The queue name.</param>
/// <param name="delayMilliseconds">Delay in ms.</param>
public void RequeueMessage(Message message, string queueName, int delayMilliseconds)
{
Expand All @@ -139,11 +142,14 @@ public void RequeueMessage(Message message, string queueName, int delayMilliseco
_logger.InfoFormat("RmqMessagePublisher: Regenerating message {0} with DeliveryTag of {1} to {2} with DeliveryTag of {3}", message.Id, DeliveryTag, messageId, 1);

var headers = new Dictionary<string, object>
{
{HeaderNames.MESSAGE_TYPE, message.Header.MessageType.ToString()},
{HeaderNames.TOPIC, message.Header.Topic},
{HeaderNames.HANDLED_COUNT, message.Header.HandledCount.ToString(CultureInfo.InvariantCulture)},
};
{
{HeaderNames.MESSAGE_TYPE, message.Header.MessageType.ToString()},
{HeaderNames.TOPIC, message.Header.Topic},
{HeaderNames.HANDLED_COUNT, message.Header.HandledCount.ToString(CultureInfo.InvariantCulture)},
};

if (message.Header.CorrelationId != Guid.Empty)
headers.Add(HeaderNames.CORRELATION_ID, message.Header.CorrelationId.ToString());

message.Header.Bag.Each((header) =>
{
Expand Down