-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathAvroSerializerFactory.cs
81 lines (71 loc) · 2.4 KB
/
AvroSerializerFactory.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
using Avro.Specific;
using System;
using System.Reflection;
using System.IO;
using Avro;
using Avro.IO;
namespace SchemaRegistry.Tests.Serialization
{
public class AvroSerializerFactory<T> : ISerializerFactory<T>
{
private Schema _schema;
private string _schemaJson;
public AvroSerializerFactory()
{
if (typeof(T) == typeof(byte[]))
{
_schema = PrimitiveSchema.NewInstance("bytes");
}
else if (typeof(T) == typeof(int))
{
_schema = PrimitiveSchema.NewInstance("int");
}
else if (typeof(T) == typeof(long))
{
_schema = PrimitiveSchema.NewInstance("long");
}
else if (typeof(T).IsPrimitive || typeof(T) == typeof(string))
{
_schema = PrimitiveSchema.NewInstance(typeof(T).Name.ToLowerInvariant());
}
else if (typeof(T).IsClass)
{
var schemaField = typeof(T).GetField("_SCHEMA", BindingFlags.Public | BindingFlags.Static);
if (schemaField != null)
{
_schema = (Schema)typeof(T).GetField("_SCHEMA", BindingFlags.Public | BindingFlags.Static).GetValue(null);
}
}
if (_schema == null)
{
throw new ApplicationException($"Can't decide AVRO schema for type {typeof(T).FullName}");
}
_schemaJson = _schema.ToString();
}
public Func<Stream, T> BuildDeserializer(string writerSchemaJson)
{
var writerSchema = Schema.Parse(writerSchemaJson);
var avroReader = new SpecificReader<T>(writerSchema, _schema);
return (Stream stream) =>
{
var result = default(T);
var decoder = new BinaryDecoder(stream);
result = avroReader.Read(result, decoder);
return result;
};
}
public Action<Stream, T> BuildSerializer()
{
var avroWriter = new SpecificWriter<T>(_schema);
return (Stream stream, T obj) =>
{
var encoder = new BinaryEncoder(stream);
avroWriter.Write(obj, encoder);
};
}
public string GetSchema()
{
return _schemaJson;
}
}
}