-
Notifications
You must be signed in to change notification settings - Fork 1
/
payload.go
146 lines (117 loc) · 2.84 KB
/
payload.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package pubsub
import (
"errors"
"fmt"
"net"
"github.com/rs/zerolog/log"
)
// DataType is Data in payload
type DataType uint16
const (
NoneDataType DataType = 0x0000
ConnectToken DataType = 0x4001
MessageID DataType = 0x4002
MessageBody DataType = 0x4003
TopicID DataType = 0x4004
SubscriberID DataType = 0x4005
)
func NewDataType(input uint16) DataType {
return DataType(input)
}
func NewDataTypeFromByte(input []byte) (DataType, error) {
if len(input) != 2 {
return NoneDataType, errors.New("Can't parse byte to payload header")
}
return NewDataType(byte2uint16(input)), nil
}
const PayloadHeaderSize = 4
type Payload interface {
SetType(DataType) error
GetType() (DataType, error)
SetLength(uint16) error
GetLength() (uint16, error)
SetValue([]byte) error
GetValue() ([]byte, error)
ToBytes() ([]byte, error)
}
type payload struct {
typ DataType
length uint16
value []byte
}
func (p *payload) SetType(m DataType) error {
p.typ = m
return nil
}
func (p payload) GetType() (DataType, error) {
return p.typ, nil
}
func (p *payload) SetLength(length uint16) error {
p.length = length
return nil
}
func (p payload) GetLength() (uint16, error) {
return p.length, nil
}
func (p *payload) SetValue(value []byte) error {
p.value = value
return nil
}
func (p payload) GetValue() ([]byte, error) {
return p.value, nil
}
func (p *payload) ToBytes() ([]byte, error) {
var bytes []byte
bytes = append(bytes, uint16tobyte(uint16(p.typ))...)
bytes = append(bytes, uint16tobyte(p.length)...)
bytes = append(bytes, p.value...)
return bytes, nil
}
func NewPayload(d DataType) (Payload, error) {
return &payload{typ: d}, nil
}
func readPayloadHeaderFromConn(conn net.Conn) (Payload, error) {
buf := make([]byte, PayloadHeaderSize)
n, err := conn.Read(buf)
if n < PayloadHeaderSize {
err := errors.New("Can't read payload header")
log.Error().Err(err).Send()
return &payload{}, err
}
if err != nil {
fmt.Printf("Payload header read error: %s\n", err)
}
dataType, err := NewDataTypeFromByte(buf[:2])
p, err := NewPayload(dataType)
p.SetLength(byte2uint16(buf[2:4]))
return p, nil
}
func readPayloadDataFromConn(conn net.Conn, p Payload) (Payload, error) {
dataLength, err := p.GetLength()
if dataLength == 0 {
return &payload{}, errors.New("payload data length must not be 0")
}
buf := make([]byte, dataLength)
n, err := conn.Read(buf)
if n < int(dataLength) {
err := errors.New("Can't read payload data")
log.Error().Err(err).Send()
return &payload{}, err
}
if err != nil {
fmt.Printf("Payload data read error: %s\n", err)
}
p.SetValue(buf)
return p, nil
}
func ReadPayload(conn net.Conn) (Payload, error) {
p, err := readPayloadHeaderFromConn(conn)
if err != nil {
return &payload{}, err
}
p, err = readPayloadDataFromConn(conn, p)
if err != nil {
return &payload{}, err
}
return p, nil
}