-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathPulsarEvent.cs
67 lines (59 loc) · 2.17 KB
/
PulsarEvent.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
using System;
using System.Text.Json;
using Avro;
using Pulsar.Client.Common;
using Snd.Sdk.Storage.Streaming.MessageProtocolExtensions;
namespace Snd.Sdk.Storage.Streaming.Models;
/// <summary>
/// A generic record to capture data from a Pulsar message
/// </summary>
/// <typeparam name="TKey">Type to deserialize key to</typeparam>
/// <typeparam name="TData">Type to deserialize data to</typeparam>
public record PulsarEvent<TKey, TData>
{
private PulsarEvent()
{
}
/// <summary>
/// Parses a Pulsar message into a PulsarDataCapture object for further processing using supplied AVRO schemas
/// </summary>
/// <param name="message">Pulsar Message</param>
/// <param name="keySchema">AVRO Key Schema</param>
/// <param name="valueSchema">AVRO Data Schema</param>
/// <returns></returns>
public static PulsarEvent<TKey, TData> FromPulsarMessage(Message<byte[]> message, Schema keySchema,
Schema valueSchema)
{
return new PulsarEvent<TKey, TData>
{
MessageId = message.MessageId,
MessagePublishTime = message.PublishTime,
Key = JsonSerializer.Deserialize<TKey>(AvroExtensions.AvroToJson(Convert.FromBase64String(message.Key),
keySchema, true)),
Data = JsonSerializer.Deserialize<TData>(message.Data.Length > 0
? AvroExtensions.AvroToJson(message.Data, valueSchema, true)
: "{}")
};
}
/// <summary>
/// Creates a PulsarEvent with empty values
/// </summary>
public static PulsarEvent<TKey, TData> Empty { get; } = new()
{ Data = default, Key = default, MessagePublishTime = long.MinValue, MessageId = null };
/// <summary>
/// Pulsar message id
/// </summary>
public MessageId MessageId { get; init; }
/// <summary>
/// The time the message was published in unix timestamp format
/// </summary>
public long MessagePublishTime { get; init; }
/// <summary>
/// Deserialized "Key" part of the message
/// </summary>
public TKey Key { get; init; }
/// <summary>
/// Deserialized "Data" part of the message
/// </summary>
public TData Data { get; init; }
}