forked from IBM/sarama
-
Notifications
You must be signed in to change notification settings - Fork 0
/
request.go
100 lines (89 loc) · 1.95 KB
/
request.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package sarama
import (
"encoding/binary"
"fmt"
"io"
)
type requestBody interface {
encoder
decoder
key() int16
version() int16
}
type request struct {
correlationID int32
clientID string
body requestBody
}
func (r *request) encode(pe packetEncoder) (err error) {
pe.push(&lengthField{})
pe.putInt16(r.body.key())
pe.putInt16(r.body.version())
pe.putInt32(r.correlationID)
err = pe.putString(r.clientID)
if err != nil {
return err
}
err = r.body.encode(pe)
if err != nil {
return err
}
return pe.pop()
}
func (r *request) decode(pd packetDecoder) (err error) {
var key int16
if key, err = pd.getInt16(); err != nil {
return err
}
var version int16
if version, err = pd.getInt16(); err != nil {
return err
}
if r.correlationID, err = pd.getInt32(); err != nil {
return err
}
r.clientID, err = pd.getString()
r.body = allocateBody(key, version)
if r.body == nil {
return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
}
return r.body.decode(pd)
}
func decodeRequest(r io.Reader) (req *request, err error) {
lengthBytes := make([]byte, 4)
if _, err := io.ReadFull(r, lengthBytes); err != nil {
return nil, err
}
length := int32(binary.BigEndian.Uint32(lengthBytes))
if length <= 4 || length > MaxRequestSize {
return nil, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
}
encodedReq := make([]byte, length)
if _, err := io.ReadFull(r, encodedReq); err != nil {
return nil, err
}
req = &request{}
if err := decode(encodedReq, req); err != nil {
return nil, err
}
return req, nil
}
func allocateBody(key, version int16) requestBody {
switch key {
case 0:
return &ProduceRequest{}
case 1:
return &FetchRequest{}
case 2:
return &OffsetRequest{}
case 3:
return &MetadataRequest{}
case 8:
return &OffsetCommitRequest{Version: version}
case 9:
return &OffsetFetchRequest{}
case 10:
return &ConsumerMetadataRequest{}
}
return nil
}