forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
leavegroup.go
147 lines (118 loc) · 3.83 KB
/
leavegroup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package kafka
import (
"bufio"
"context"
"fmt"
"net"
"time"
"github.com/segmentio/kafka-go/protocol/leavegroup"
)
// LeaveGroupRequest is the request structure for the LeaveGroup function.
type LeaveGroupRequest struct {
// Address of the kafka broker to sent he request to.
Addr net.Addr
// GroupID of the group to leave.
GroupID string
// List of leaving member identities.
Members []LeaveGroupRequestMember
}
// LeaveGroupRequestMember represents the indentify of a member leaving a group.
type LeaveGroupRequestMember struct {
// The member ID to remove from the group.
ID string
// The group instance ID to remove from the group.
GroupInstanceID string
}
// LeaveGroupResponse is the response structure for the LeaveGroup function.
type LeaveGroupResponse struct {
// An error that may have occurred when attempting to leave the group.
//
// The errors contain the kafka error code. Programs may use the standard
// errors.Is function to test the error against kafka error codes.
Error error
// The amount of time that the broker throttled the request.
Throttle time.Duration
// List of leaving member responses.
Members []LeaveGroupResponseMember
}
// LeaveGroupResponseMember represents a member leaving the group.
type LeaveGroupResponseMember struct {
// The member ID of the member leaving the group.
ID string
// The group instance ID to remove from the group.
GroupInstanceID string
// An error that may have occured when attempting to remove the member from the group.
//
// The errors contain the kafka error code. Programs may use the standard
// errors.Is function to test the error against kafka error codes.
Error error
}
func (c *Client) LeaveGroup(ctx context.Context, req *LeaveGroupRequest) (*LeaveGroupResponse, error) {
leaveGroup := leavegroup.Request{
GroupID: req.GroupID,
Members: make([]leavegroup.RequestMember, 0, len(req.Members)),
}
for _, member := range req.Members {
leaveGroup.Members = append(leaveGroup.Members, leavegroup.RequestMember{
MemberID: member.ID,
GroupInstanceID: member.GroupInstanceID,
})
}
m, err := c.roundTrip(ctx, req.Addr, &leaveGroup)
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).LeaveGroup: %w", err)
}
r := m.(*leavegroup.Response)
res := &LeaveGroupResponse{
Error: makeError(r.ErrorCode, ""),
Throttle: makeDuration(r.ThrottleTimeMS),
}
if len(r.Members) == 0 {
// If we're using a version of the api without the
// members array in the response, just add a member
// so the api is consistent across versions.
r.Members = []leavegroup.ResponseMember{
{
MemberID: req.Members[0].ID,
GroupInstanceID: req.Members[0].GroupInstanceID,
},
}
}
res.Members = make([]LeaveGroupResponseMember, 0, len(r.Members))
for _, member := range r.Members {
res.Members = append(res.Members, LeaveGroupResponseMember{
ID: member.MemberID,
GroupInstanceID: member.GroupInstanceID,
Error: makeError(member.ErrorCode, ""),
})
}
return res, nil
}
type leaveGroupRequestV0 struct {
// GroupID holds the unique group identifier
GroupID string
// MemberID assigned by the group coordinator or the zero string if joining
// for the first time.
MemberID string
}
func (t leaveGroupRequestV0) size() int32 {
return sizeofString(t.GroupID) + sizeofString(t.MemberID)
}
func (t leaveGroupRequestV0) writeTo(wb *writeBuffer) {
wb.writeString(t.GroupID)
wb.writeString(t.MemberID)
}
type leaveGroupResponseV0 struct {
// ErrorCode holds response error code
ErrorCode int16
}
func (t leaveGroupResponseV0) size() int32 {
return sizeofInt16(t.ErrorCode)
}
func (t leaveGroupResponseV0) writeTo(wb *writeBuffer) {
wb.writeInt16(t.ErrorCode)
}
func (t *leaveGroupResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
remain, err = readInt16(r, size, &t.ErrorCode)
return
}