Skip to content

Commit

Permalink
1. add stop replica protocol
Browse files Browse the repository at this point in the history
2. add const api key variable
  • Loading branch information
WanliTian authored and travisjeffery committed Dec 17, 2016
1 parent 6ff11be commit b4176fe
Show file tree
Hide file tree
Showing 19 changed files with 176 additions and 29 deletions.
25 changes: 25 additions & 0 deletions protocol/api_key.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package protocol

const (
ProduceKey = iota
FetchKey
OffsetsKey
MetadataKey
LeaderAndISRKey
StopReplicaKey
UpdateMetadataKey
ControlledShutdownKey
OffsetCommitKey
OffsetFetchKey
GroupCoordinatorKey
JoinGroupKey
HeartbeatKey
LeaveGroupKey
SyncGroupKey
DescribeGroupsKey
ListGroupsKey
SaslHandshakeKey
ApiVersionsKey
CreateTopicsKey
DeleteTopicsKey
)
2 changes: 1 addition & 1 deletion protocol/create_topic_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (c *CreateTopicRequests) Decode(d PacketDecoder) error {
}

func (c *CreateTopicRequests) Key() int16 {
return 19
return CreateTopicsKey
}

func (c *CreateTopicRequests) Version() int16 {
Expand Down
2 changes: 1 addition & 1 deletion protocol/delete_topic_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (c *DeleteTopicsRequest) Decode(d PacketDecoder) (err error) {
}

func (c *DeleteTopicsRequest) Key() int16 {
return 20
return DeleteTopicsKey
}

func (c *DeleteTopicsRequest) Version() int16 {
Expand Down
2 changes: 1 addition & 1 deletion protocol/describe_groups_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func (r *DescribeGroupsRequest) Decode(d PacketDecoder) (err error) {
}

func (r *DescribeGroupsRequest) Key() int16 {
return 15
return DescribeGroupsKey
}

func (r *DescribeGroupsRequest) Version() int16 {
Expand Down
2 changes: 1 addition & 1 deletion protocol/fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (r *FetchRequest) Decode(d PacketDecoder) error {
}

func (r *FetchRequest) Key() int16 {
return 1
return FetchKey
}

func (r *FetchRequest) Version() int16 {
Expand Down
2 changes: 1 addition & 1 deletion protocol/group_coordinator_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ func (r *GroupCoordinatorRequest) Version() int16 {
}

func (r *GroupCoordinatorRequest) Key() int16 {
return 10
return GroupCoordinatorKey
}
2 changes: 1 addition & 1 deletion protocol/heartbeat_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (r *HeartbeatRequest) Decode(d PacketDecoder) (err error) {
}

func (r *HeartbeatRequest) Key() int16 {
return 12
return HeartbeatKey
}

func (r *HeartbeatRequest) Version() int16 {
Expand Down
2 changes: 1 addition & 1 deletion protocol/join_group_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (r *JoinGroupRequest) Decode(d PacketDecoder) error {
}

func (r *JoinGroupRequest) Key() int16 {
return 11
return JoinGroupKey
}

func (r *JoinGroupRequest) Version() int16 {
Expand Down
2 changes: 1 addition & 1 deletion protocol/leader_and_isr_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (r *LeaderAndISRRequest) Decode(d PacketDecoder) error {
}

func (r *LeaderAndISRRequest) Key() int16 {
return 4
return LeaderAndISRKey
}

func (r *LeaderAndISRRequest) Version() int16 {
Expand Down
6 changes: 3 additions & 3 deletions protocol/leave_group_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ func (r *LeaveGroupRequest) Decode(d PacketDecoder) (err error) {
return err
}

func (r *LeaveGroupRequest) key() int16 {
return 13
func (r *LeaveGroupRequest) Key() int16 {
return LeaveGroupKey
}

func (r *LeaveGroupRequest) version() int16 {
func (r *LeaveGroupRequest) Version() int16 {
return 0
}
2 changes: 1 addition & 1 deletion protocol/list_groups_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func (r *ListGroupsRequest) Decode(d PacketDecoder) (err error) {
}

func (r *ListGroupsRequest) Key() int16 {
return 16
return ListGroupsKey
}

func (r *ListGroupsRequest) Version() int16 {
Expand Down
2 changes: 1 addition & 1 deletion protocol/metadata_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func (r *MetadataRequest) Decode(d PacketDecoder) (err error) {
}

func (r *MetadataRequest) Key() int16 {
return 3
return MetadataKey
}

func (r *MetadataRequest) Version() int16 {
Expand Down
8 changes: 8 additions & 0 deletions protocol/offsets_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,11 @@ func (r *OffsetsRequest) Decode(d PacketDecoder) error {
r.MaxNumOffsets, err = d.Int32()
return err
}

func (r *OffsetsRequest) Key() int16 {
return OffsetsKey
}

func (r *OffsetsRequest) Version() int16 {
return 0
}
8 changes: 0 additions & 8 deletions protocol/offsets_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,3 @@ func (r *OffsetsResponse) Decode(d PacketDecoder) error {
}
return nil
}

func (r *OffsetsResponse) Version() int16 {
return 0
}

func (r *OffsetResponse) Key() int16 {
return 2
}
2 changes: 1 addition & 1 deletion protocol/produce_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (r *ProduceRequest) Decode(d PacketDecoder) (err error) {
}

func (r *ProduceRequest) Key() int16 {
return 0
return ProduceKey
}

func (r *ProduceRequest) Version() int16 {
Expand Down
73 changes: 73 additions & 0 deletions protocol/stop_replica_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package protocol

type StopReplicaPartition struct {
Topic string
Partition int32
}

type StopReplicaRequest struct {
ControllerID int32
ControllerEpoch int32
DeletePartitions bool
Partitions []*StopReplicaPartition
}

func (r *StopReplicaRequest) Encode(e PacketEncoder) (err error) {
e.PutInt32(r.ControllerID)
e.PutInt32(r.ControllerEpoch)
if r.DeletePartitions {
e.PutInt8(1)
} else {
e.PutInt8(0)
}
if err = e.PutArrayLength(len(r.Partitions)); err != nil {
return
}
for _, partition := range r.Partitions {
if err = e.PutString(partition.Topic); err != nil {
return
}
e.PutInt32(partition.Partition)
}
return
}

func (r *StopReplicaRequest) Decode(d PacketDecoder) (err error) {
if r.ControllerID, err = d.Int32(); err != nil {
return
}
if r.ControllerEpoch, err = d.Int32(); err != nil {
return
}
dp, err := d.Int8()
if err != nil {
return
} else if dp == 1 {
r.DeletePartitions = true
} else {
r.DeletePartitions = false
}
length, err := d.ArrayLength()
if err != nil {
return
}
r.Partitions = make([]*StopReplicaPartition, length)
for index := range r.Partitions {
r.Partitions[index] = new(StopReplicaPartition)
if r.Partitions[index].Topic, err = d.String(); err != nil {
return
}
if r.Partitions[index].Partition, err = d.Int32(); err != nil {
return
}
}
return nil
}

func (r *StopReplicaRequest) Key() int16 {
return StopReplicaKey
}

func (r *StopReplicaRequest) Version() int16 {
return 0
}
49 changes: 49 additions & 0 deletions protocol/stop_replica_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package protocol

type StopReplicaPartitionAndErrorCode struct {
StopReplicaPartition
ErrorCode int16
}

type StopReplicaResponse struct {
ErrorCode int16
Partitions []*StopReplicaPartitionAndErrorCode
}

func (r *StopReplicaResponse) Encode(e PacketEncoder) (err error) {
e.PutInt16(r.ErrorCode)
if err = e.PutArrayLength(len(r.Partitions)); err != nil {
return
}
for _, partition := range r.Partitions {
if err = e.PutString(partition.Topic); err != nil {
return
}
e.PutInt32(partition.Partition)
e.PutInt16(partition.ErrorCode)
}
return
}

func (r *StopReplicaResponse) Decode(d PacketDecoder) (err error) {
if r.ErrorCode, err = d.Int16(); err != nil {
return
}
length, err := d.ArrayLength()
if err != nil {
return
}
r.Partitions = make([]*StopReplicaPartitionAndErrorCode, length)
for index := range r.Partitions {
if r.Partitions[index].Topic, err = d.String(); err != nil {
return
}
if r.Partitions[index].Partition, err = d.Int32(); err != nil {
return
}
if r.Partitions[index].ErrorCode, err = d.Int16(); err != nil {
return
}
}
return
}
2 changes: 1 addition & 1 deletion protocol/sync_group_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (r *SyncGroupRequest) Decode(d PacketDecoder) (err error) {
}

func (r *SyncGroupRequest) Key() int16 {
return 14
return SyncGroupKey
}

func (r *SyncGroupRequest) Version() int16 {
Expand Down
12 changes: 6 additions & 6 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,37 +138,37 @@ func (s *Server) handleRequest(conn net.Conn) {
s.logger.Debug("correlation id [%d], request size [%d], key [%d]", header.CorrelationID, size, header.APIKey)

switch header.APIKey {
case 0:
case protocol.ProduceKey:
req := &protocol.ProduceRequest{}
s.decode(header, req, d)
if err = s.handleProduce(conn, header, req); err != nil {
s.logger.Info("Produce failed: %s", err)
}
case 1:
case protocol.FetchKey:
req := &protocol.FetchRequest{}
s.decode(header, req, d)
if err = s.handleFetch(conn, header, req); err != nil {
s.logger.Info("Fetch failed: %s", err)
}
case 2:
case protocol.OffsetsKey:
req := &protocol.OffsetsRequest{}
s.decode(header, req, d)
if err = s.handleOffsets(conn, header, req); err != nil {
s.logger.Info("Offsets failed: %s", err)
}
case 3:
case protocol.MetadataKey:
req := &protocol.MetadataRequest{}
s.decode(header, req, d)
if err = s.handleMetadata(conn, header, req); err != nil {
s.logger.Info("Metadata request failed: %s", err)
}
case 19:
case protocol.CreateTopicsKey:
req := &protocol.CreateTopicRequests{}
s.decode(header, req, d)
if err = s.handleCreateTopic(conn, header, req); err != nil {
s.logger.Info("Create topic failed: %s", err)
}
case 20:
case protocol.DeleteTopicsKey:
req := &protocol.DeleteTopicsRequest{}
s.decode(header, req, d)
if err = s.handleDeleteTopics(conn, header, req); err != nil {
Expand Down

0 comments on commit b4176fe

Please sign in to comment.