diff --git a/protocol/api_key.go b/protocol/api_key.go new file mode 100644 index 00000000..98a56d54 --- /dev/null +++ b/protocol/api_key.go @@ -0,0 +1,25 @@ +package protocol + +const ( + ProduceKey = iota + FetchKey + OffsetsKey + MetadataKey + LeaderAndISRKey + StopReplicaKey + UpdateMetadataKey + ControlledShutdownKey + OffsetCommitKey + OffsetFetchKey + GroupCoordinatorKey + JoinGroupKey + HeartbeatKey + LeaveGroupKey + SyncGroupKey + DescribeGroupsKey + ListGroupsKey + SaslHandshakeKey + ApiVersionsKey + CreateTopicsKey + DeleteTopicsKey +) diff --git a/protocol/create_topic_requests.go b/protocol/create_topic_requests.go index 05297dd9..86100b55 100644 --- a/protocol/create_topic_requests.go +++ b/protocol/create_topic_requests.go @@ -105,7 +105,7 @@ func (c *CreateTopicRequests) Decode(d PacketDecoder) error { } func (c *CreateTopicRequests) Key() int16 { - return 19 + return CreateTopicsKey } func (c *CreateTopicRequests) Version() int16 { diff --git a/protocol/delete_topic_request.go b/protocol/delete_topic_request.go index 7dcddf2d..4b5efbdd 100644 --- a/protocol/delete_topic_request.go +++ b/protocol/delete_topic_request.go @@ -23,7 +23,7 @@ func (c *DeleteTopicsRequest) Decode(d PacketDecoder) (err error) { } func (c *DeleteTopicsRequest) Key() int16 { - return 20 + return DeleteTopicsKey } func (c *DeleteTopicsRequest) Version() int16 { diff --git a/protocol/describe_groups_request.go b/protocol/describe_groups_request.go index 5c1e5d0d..06cb7db9 100644 --- a/protocol/describe_groups_request.go +++ b/protocol/describe_groups_request.go @@ -14,7 +14,7 @@ func (r *DescribeGroupsRequest) Decode(d PacketDecoder) (err error) { } func (r *DescribeGroupsRequest) Key() int16 { - return 15 + return DescribeGroupsKey } func (r *DescribeGroupsRequest) Version() int16 { diff --git a/protocol/fetch_request.go b/protocol/fetch_request.go index d2512071..8573ecc2 100644 --- a/protocol/fetch_request.go +++ b/protocol/fetch_request.go @@ -99,7 +99,7 @@ func (r *FetchRequest) Decode(d PacketDecoder) error { } func (r *FetchRequest) Key() int16 { - return 1 + return FetchKey } func (r *FetchRequest) Version() int16 { diff --git a/protocol/group_coordinator_request.go b/protocol/group_coordinator_request.go index a165b1ab..cd1a0e6c 100644 --- a/protocol/group_coordinator_request.go +++ b/protocol/group_coordinator_request.go @@ -18,5 +18,5 @@ func (r *GroupCoordinatorRequest) Version() int16 { } func (r *GroupCoordinatorRequest) Key() int16 { - return 10 + return GroupCoordinatorKey } diff --git a/protocol/heartbeat_request.go b/protocol/heartbeat_request.go index a0ee67ec..464a4ed7 100644 --- a/protocol/heartbeat_request.go +++ b/protocol/heartbeat_request.go @@ -31,7 +31,7 @@ func (r *HeartbeatRequest) Decode(d PacketDecoder) (err error) { } func (r *HeartbeatRequest) Key() int16 { - return 12 + return HeartbeatKey } func (r *HeartbeatRequest) Version() int16 { diff --git a/protocol/join_group_request.go b/protocol/join_group_request.go index 14c4099d..657a8ad4 100644 --- a/protocol/join_group_request.go +++ b/protocol/join_group_request.go @@ -68,7 +68,7 @@ func (r *JoinGroupRequest) Decode(d PacketDecoder) error { } func (r *JoinGroupRequest) Key() int16 { - return 11 + return JoinGroupKey } func (r *JoinGroupRequest) Version() int16 { diff --git a/protocol/leader_and_isr_request.go b/protocol/leader_and_isr_request.go index 90bb5da7..7c204701 100644 --- a/protocol/leader_and_isr_request.go +++ b/protocol/leader_and_isr_request.go @@ -100,7 +100,7 @@ func (r *LeaderAndISRRequest) Decode(d PacketDecoder) error { } func (r *LeaderAndISRRequest) Key() int16 { - return 4 + return LeaderAndISRKey } func (r *LeaderAndISRRequest) Version() int16 { diff --git a/protocol/leave_group_request.go b/protocol/leave_group_request.go index abe45951..74ba9de5 100644 --- a/protocol/leave_group_request.go +++ b/protocol/leave_group_request.go @@ -20,10 +20,10 @@ func (r *LeaveGroupRequest) Decode(d PacketDecoder) (err error) { return err } -func (r *LeaveGroupRequest) key() int16 { - return 13 +func (r *LeaveGroupRequest) Key() int16 { + return LeaveGroupKey } -func (r *LeaveGroupRequest) version() int16 { +func (r *LeaveGroupRequest) Version() int16 { return 0 } diff --git a/protocol/list_groups_request.go b/protocol/list_groups_request.go index a72c095f..c4e5c1c6 100644 --- a/protocol/list_groups_request.go +++ b/protocol/list_groups_request.go @@ -12,7 +12,7 @@ func (r *ListGroupsRequest) Decode(d PacketDecoder) (err error) { } func (r *ListGroupsRequest) Key() int16 { - return 16 + return ListGroupsKey } func (r *ListGroupsRequest) Version() int16 { diff --git a/protocol/metadata_request.go b/protocol/metadata_request.go index 30bf6c92..f51f9c14 100644 --- a/protocol/metadata_request.go +++ b/protocol/metadata_request.go @@ -15,7 +15,7 @@ func (r *MetadataRequest) Decode(d PacketDecoder) (err error) { } func (r *MetadataRequest) Key() int16 { - return 3 + return MetadataKey } func (r *MetadataRequest) Version() int16 { diff --git a/protocol/offsets_request.go b/protocol/offsets_request.go index 4b07d6a3..ce507ae7 100644 --- a/protocol/offsets_request.go +++ b/protocol/offsets_request.go @@ -80,3 +80,11 @@ func (r *OffsetsRequest) Decode(d PacketDecoder) error { r.MaxNumOffsets, err = d.Int32() return err } + +func (r *OffsetsRequest) Key() int16 { + return OffsetsKey +} + +func (r *OffsetsRequest) Version() int16 { + return 0 +} diff --git a/protocol/offsets_response.go b/protocol/offsets_response.go index 91827889..565f75bb 100644 --- a/protocol/offsets_response.go +++ b/protocol/offsets_response.go @@ -81,11 +81,3 @@ func (r *OffsetsResponse) Decode(d PacketDecoder) error { } return nil } - -func (r *OffsetsResponse) Version() int16 { - return 0 -} - -func (r *OffsetResponse) Key() int16 { - return 2 -} diff --git a/protocol/produce_request.go b/protocol/produce_request.go index d15c0985..611d3969 100644 --- a/protocol/produce_request.go +++ b/protocol/produce_request.go @@ -79,7 +79,7 @@ func (r *ProduceRequest) Decode(d PacketDecoder) (err error) { } func (r *ProduceRequest) Key() int16 { - return 0 + return ProduceKey } func (r *ProduceRequest) Version() int16 { diff --git a/protocol/stop_replica_request.go b/protocol/stop_replica_request.go new file mode 100644 index 00000000..2cd96229 --- /dev/null +++ b/protocol/stop_replica_request.go @@ -0,0 +1,73 @@ +package protocol + +type StopReplicaPartition struct { + Topic string + Partition int32 +} + +type StopReplicaRequest struct { + ControllerID int32 + ControllerEpoch int32 + DeletePartitions bool + Partitions []*StopReplicaPartition +} + +func (r *StopReplicaRequest) Encode(e PacketEncoder) (err error) { + e.PutInt32(r.ControllerID) + e.PutInt32(r.ControllerEpoch) + if r.DeletePartitions { + e.PutInt8(1) + } else { + e.PutInt8(0) + } + if err = e.PutArrayLength(len(r.Partitions)); err != nil { + return + } + for _, partition := range r.Partitions { + if err = e.PutString(partition.Topic); err != nil { + return + } + e.PutInt32(partition.Partition) + } + return +} + +func (r *StopReplicaRequest) Decode(d PacketDecoder) (err error) { + if r.ControllerID, err = d.Int32(); err != nil { + return + } + if r.ControllerEpoch, err = d.Int32(); err != nil { + return + } + dp, err := d.Int8() + if err != nil { + return + } else if dp == 1 { + r.DeletePartitions = true + } else { + r.DeletePartitions = false + } + length, err := d.ArrayLength() + if err != nil { + return + } + r.Partitions = make([]*StopReplicaPartition, length) + for index := range r.Partitions { + r.Partitions[index] = new(StopReplicaPartition) + if r.Partitions[index].Topic, err = d.String(); err != nil { + return + } + if r.Partitions[index].Partition, err = d.Int32(); err != nil { + return + } + } + return nil +} + +func (r *StopReplicaRequest) Key() int16 { + return StopReplicaKey +} + +func (r *StopReplicaRequest) Version() int16 { + return 0 +} diff --git a/protocol/stop_replica_response.go b/protocol/stop_replica_response.go new file mode 100644 index 00000000..5c0ac8ad --- /dev/null +++ b/protocol/stop_replica_response.go @@ -0,0 +1,49 @@ +package protocol + +type StopReplicaPartitionAndErrorCode struct { + StopReplicaPartition + ErrorCode int16 +} + +type StopReplicaResponse struct { + ErrorCode int16 + Partitions []*StopReplicaPartitionAndErrorCode +} + +func (r *StopReplicaResponse) Encode(e PacketEncoder) (err error) { + e.PutInt16(r.ErrorCode) + if err = e.PutArrayLength(len(r.Partitions)); err != nil { + return + } + for _, partition := range r.Partitions { + if err = e.PutString(partition.Topic); err != nil { + return + } + e.PutInt32(partition.Partition) + e.PutInt16(partition.ErrorCode) + } + return +} + +func (r *StopReplicaResponse) Decode(d PacketDecoder) (err error) { + if r.ErrorCode, err = d.Int16(); err != nil { + return + } + length, err := d.ArrayLength() + if err != nil { + return + } + r.Partitions = make([]*StopReplicaPartitionAndErrorCode, length) + for index := range r.Partitions { + if r.Partitions[index].Topic, err = d.String(); err != nil { + return + } + if r.Partitions[index].Partition, err = d.Int32(); err != nil { + return + } + if r.Partitions[index].ErrorCode, err = d.Int16(); err != nil { + return + } + } + return +} diff --git a/protocol/sync_group_request.go b/protocol/sync_group_request.go index f28f8471..420dbcdc 100644 --- a/protocol/sync_group_request.go +++ b/protocol/sync_group_request.go @@ -59,7 +59,7 @@ func (r *SyncGroupRequest) Decode(d PacketDecoder) (err error) { } func (r *SyncGroupRequest) Key() int16 { - return 14 + return SyncGroupKey } func (r *SyncGroupRequest) Version() int16 { diff --git a/server/server.go b/server/server.go index 799c6d61..699c9f38 100644 --- a/server/server.go +++ b/server/server.go @@ -138,37 +138,37 @@ func (s *Server) handleRequest(conn net.Conn) { s.logger.Debug("correlation id [%d], request size [%d], key [%d]", header.CorrelationID, size, header.APIKey) switch header.APIKey { - case 0: + case protocol.ProduceKey: req := &protocol.ProduceRequest{} s.decode(header, req, d) if err = s.handleProduce(conn, header, req); err != nil { s.logger.Info("Produce failed: %s", err) } - case 1: + case protocol.FetchKey: req := &protocol.FetchRequest{} s.decode(header, req, d) if err = s.handleFetch(conn, header, req); err != nil { s.logger.Info("Fetch failed: %s", err) } - case 2: + case protocol.OffsetsKey: req := &protocol.OffsetsRequest{} s.decode(header, req, d) if err = s.handleOffsets(conn, header, req); err != nil { s.logger.Info("Offsets failed: %s", err) } - case 3: + case protocol.MetadataKey: req := &protocol.MetadataRequest{} s.decode(header, req, d) if err = s.handleMetadata(conn, header, req); err != nil { s.logger.Info("Metadata request failed: %s", err) } - case 19: + case protocol.CreateTopicsKey: req := &protocol.CreateTopicRequests{} s.decode(header, req, d) if err = s.handleCreateTopic(conn, header, req); err != nil { s.logger.Info("Create topic failed: %s", err) } - case 20: + case protocol.DeleteTopicsKey: req := &protocol.DeleteTopicsRequest{} s.decode(header, req, d) if err = s.handleDeleteTopics(conn, header, req); err != nil {