Skip to content

Commit

Permalink
Deleteacls support (#1174)
Browse files Browse the repository at this point in the history
* support deleteacls

* add deleteacls_test

* add protocol test

* test that acl was deleted

* trigger build
  • Loading branch information
petedannemann authored Sep 13, 2023
1 parent f4ca0b4 commit 9ecb9d2
Show file tree
Hide file tree
Showing 4 changed files with 465 additions and 0 deletions.
114 changes: 114 additions & 0 deletions deleteacls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package kafka

import (
"context"
"fmt"
"net"
"time"

"github.com/segmentio/kafka-go/protocol/deleteacls"
)

// DeleteACLsRequest represents a request sent to a kafka broker to delete
// ACLs.
type DeleteACLsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr

// List of ACL filters to use for deletion.
Filters []DeleteACLsFilter
}

type DeleteACLsFilter struct {
ResourceTypeFilter ResourceType
ResourceNameFilter string
ResourcePatternTypeFilter PatternType
PrincipalFilter string
HostFilter string
Operation ACLOperationType
PermissionType ACLPermissionType
}

// DeleteACLsResponse represents a response from a kafka broker to an ACL
// deletion request.
type DeleteACLsResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration

// List of the results from the deletion request.
Results []DeleteACLsResult
}

type DeleteACLsResult struct {
Error error
MatchingACLs []DeleteACLsMatchingACLs
}

type DeleteACLsMatchingACLs struct {
Error error
ResourceType ResourceType
ResourceName string
ResourcePatternType PatternType
Principal string
Host string
Operation ACLOperationType
PermissionType ACLPermissionType
}

// DeleteACLs sends ACLs deletion request to a kafka broker and returns the
// response.
func (c *Client) DeleteACLs(ctx context.Context, req *DeleteACLsRequest) (*DeleteACLsResponse, error) {
filters := make([]deleteacls.RequestFilter, 0, len(req.Filters))

for _, filter := range req.Filters {
filters = append(filters, deleteacls.RequestFilter{
ResourceTypeFilter: int8(filter.ResourceTypeFilter),
ResourceNameFilter: filter.ResourceNameFilter,
ResourcePatternTypeFilter: int8(filter.ResourcePatternTypeFilter),
PrincipalFilter: filter.PrincipalFilter,
HostFilter: filter.HostFilter,
Operation: int8(filter.Operation),
PermissionType: int8(filter.PermissionType),
})
}

m, err := c.roundTrip(ctx, req.Addr, &deleteacls.Request{
Filters: filters,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).DeleteACLs: %w", err)
}

res := m.(*deleteacls.Response)

results := make([]DeleteACLsResult, 0, len(res.FilterResults))

for _, result := range res.FilterResults {
matchingACLs := make([]DeleteACLsMatchingACLs, 0, len(result.MatchingACLs))

for _, matchingACL := range result.MatchingACLs {
matchingACLs = append(matchingACLs, DeleteACLsMatchingACLs{
Error: makeError(matchingACL.ErrorCode, matchingACL.ErrorMessage),
ResourceType: ResourceType(matchingACL.ResourceType),
ResourceName: matchingACL.ResourceName,
ResourcePatternType: PatternType(matchingACL.ResourcePatternType),
Principal: matchingACL.Principal,
Host: matchingACL.Host,
Operation: ACLOperationType(matchingACL.Operation),
PermissionType: ACLPermissionType(matchingACL.PermissionType),
})
}

results = append(results, DeleteACLsResult{
Error: makeError(result.ErrorCode, result.ErrorMessage),
MatchingACLs: matchingACLs,
})
}

ret := &DeleteACLsResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Results: results,
}

return ret, nil
}
112 changes: 112 additions & 0 deletions deleteacls_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package kafka

import (
"context"
"testing"

ktesting "github.com/segmentio/kafka-go/testing"
"github.com/stretchr/testify/assert"
)

func TestClientDeleteACLs(t *testing.T) {
if !ktesting.KafkaIsAtLeast("2.0.1") {
return
}

client, shutdown := newLocalClient()
defer shutdown()

topic := makeTopic()
group := makeGroupID()

createRes, err := client.CreateACLs(context.Background(), &CreateACLsRequest{
ACLs: []ACLEntry{
{
Principal: "User:alice",
PermissionType: ACLPermissionTypeAllow,
Operation: ACLOperationTypeRead,
ResourceType: ResourceTypeTopic,
ResourcePatternType: PatternTypeLiteral,
ResourceName: topic,
Host: "*",
},
{
Principal: "User:bob",
PermissionType: ACLPermissionTypeAllow,
Operation: ACLOperationTypeRead,
ResourceType: ResourceTypeGroup,
ResourcePatternType: PatternTypeLiteral,
ResourceName: group,
Host: "*",
},
},
})
if err != nil {
t.Fatal(err)
}

for _, err := range createRes.Errors {
if err != nil {
t.Error(err)
}
}

deleteResp, err := client.DeleteACLs(context.Background(), &DeleteACLsRequest{
Filters: []DeleteACLsFilter{
{
ResourceTypeFilter: ResourceTypeTopic,
ResourceNameFilter: topic,
ResourcePatternTypeFilter: PatternTypeLiteral,
Operation: ACLOperationTypeRead,
PermissionType: ACLPermissionTypeAllow,
},
},
})
if err != nil {
t.Fatal(err)
}

expectedDeleteResp := DeleteACLsResponse{
Throttle: 0,
Results: []DeleteACLsResult{
{
Error: makeError(0, ""),
MatchingACLs: []DeleteACLsMatchingACLs{
{
Error: makeError(0, ""),
ResourceType: ResourceTypeTopic,
ResourceName: topic,
ResourcePatternType: PatternTypeLiteral,
Principal: "User:alice",
Host: "*",
Operation: ACLOperationTypeRead,
PermissionType: ACLPermissionTypeAllow,
},
},
},
},
}

assert.Equal(t, expectedDeleteResp, *deleteResp)

describeResp, err := client.DescribeACLs(context.Background(), &DescribeACLsRequest{
Filter: ACLFilter{
ResourceTypeFilter: ResourceTypeTopic,
ResourceNameFilter: topic,
ResourcePatternTypeFilter: PatternTypeLiteral,
Operation: ACLOperationTypeRead,
PermissionType: ACLPermissionTypeAllow,
},
})
if err != nil {
t.Fatal(err)
}

expectedDescribeResp := DescribeACLsResponse{
Throttle: 0,
Error: makeError(0, ""),
Resources: []ACLResource{},
}

assert.Equal(t, expectedDescribeResp, *describeResp)
}
74 changes: 74 additions & 0 deletions protocol/deleteacls/deleteacls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package deleteacls

import "github.com/segmentio/kafka-go/protocol"

func init() {
protocol.Register(&Request{}, &Response{})
}

type Request struct {
// We need at least one tagged field to indicate that v2+ uses "flexible"
// messages.
_ struct{} `kafka:"min=v2,max=v3,tag"`

Filters []RequestFilter `kafka:"min=v0,max=v3"`
}

func (r *Request) ApiKey() protocol.ApiKey { return protocol.DeleteAcls }

func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
return cluster.Brokers[cluster.Controller], nil
}

type RequestFilter struct {
// We need at least one tagged field to indicate that v2+ uses "flexible"
// messages.
_ struct{} `kafka:"min=v2,max=v3,tag"`

ResourceTypeFilter int8 `kafka:"min=v0,max=v3"`
ResourceNameFilter string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"`
ResourcePatternTypeFilter int8 `kafka:"min=v1,max=v3"`
PrincipalFilter string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"`
HostFilter string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"`
Operation int8 `kafka:"min=v0,max=v3"`
PermissionType int8 `kafka:"min=v0,max=v3"`
}

type Response struct {
// We need at least one tagged field to indicate that v2+ uses "flexible"
// messages.
_ struct{} `kafka:"min=v2,max=v3,tag"`

ThrottleTimeMs int32 `kafka:"min=v0,max=v3"`
FilterResults []FilterResult `kafka:"min=v0,max=v3"`
}

func (r *Response) ApiKey() protocol.ApiKey { return protocol.DeleteAcls }

type FilterResult struct {
// We need at least one tagged field to indicate that v2+ uses "flexible"
// messages.
_ struct{} `kafka:"min=v2,max=v3,tag"`

ErrorCode int16 `kafka:"min=v0,max=v3"`
ErrorMessage string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"`
MatchingACLs []MatchingACL `kafka:"min=v0,max=v3"`
}

type MatchingACL struct {
// We need at least one tagged field to indicate that v2+ uses "flexible"
// messages.
_ struct{} `kafka:"min=v2,max=v3,tag"`

ErrorCode int16 `kafka:"min=v0,max=v3"`
ErrorMessage string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"`
ResourceType int8 `kafka:"min=v0,max=v3"`
ResourceName string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"`
ResourcePatternType int8 `kafka:"min=v1,max=v3"`
Principal string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"`
Host string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"`
Operation int8 `kafka:"min=v0,max=v3"`
PermissionType int8 `kafka:"min=v0,max=v3"`
}

var _ protocol.BrokerMessage = (*Request)(nil)
Loading

0 comments on commit 9ecb9d2

Please sign in to comment.