Skip to content

Commit

Permalink
Support resource_pattern_type_filter
Browse files Browse the repository at this point in the history
  • Loading branch information
Mongey committed Apr 14, 2019
1 parent 254257b commit 58a9fb0
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 22 deletions.
11 changes: 11 additions & 0 deletions kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,17 @@ func (c *Client) topicConfig(topic string) (map[string]*string, error) {
return conf, nil
}

func (c *Client) getDescribeAclsRequestAPIVersion() int16 {
return int16(c.versionForKey(29, 1))
}
func (c *Client) getCreateAclsRequestAPIVersion() int16 {
return int16(c.versionForKey(30, 1))
}

func (c *Client) getDeleteAclsRequestAPIVersion() int16 {
return int16(c.versionForKey(31, 1))
}

func (c *Client) getDescribeConfigAPIVersion() int16 {
return int16(c.versionForKey(32, 1))
}
75 changes: 57 additions & 18 deletions kafka/kafka_acls.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ type ACL struct {
}

type Resource struct {
Type string
Name string
Type string
Name string
PatternTypeFilter string
}

type stringlyTypedACL struct {
Expand All @@ -26,7 +27,7 @@ type stringlyTypedACL struct {
}

func (a stringlyTypedACL) String() string {
return strings.Join([]string{a.ACL.Principal, a.ACL.Host, a.ACL.Operation, a.ACL.PermissionType, a.Resource.Type, a.Resource.Name}, "|")
return strings.Join([]string{a.ACL.Principal, a.ACL.Host, a.ACL.Operation, a.ACL.PermissionType, a.Resource.Type, a.Resource.Name, a.Resource.PatternTypeFilter}, "|")
}

func tfToAclCreation(s stringlyTypedACL) (*sarama.AclCreation, error) {
Expand All @@ -44,6 +45,10 @@ func tfToAclCreation(s stringlyTypedACL) (*sarama.AclCreation, error) {
if rType == unknownConversion {
return acl, fmt.Errorf("Unknown resource type: %s", s.Resource.Type)
}
patternType := stringToACLPrefix(s.Resource.PatternTypeFilter)
if patternType == unknownConversion {
return acl, fmt.Errorf("Unknown pattern type filter: '%s'", s.Resource.PatternTypeFilter)
}

acl.Acl = sarama.Acl{
Principal: s.ACL.Principal,
Expand All @@ -52,8 +57,9 @@ func tfToAclCreation(s stringlyTypedACL) (*sarama.AclCreation, error) {
PermissionType: pType,
}
acl.Resource = sarama.Resource{
ResourceType: rType,
ResourceName: s.Resource.Name,
ResourceType: rType,
ResourceName: s.Resource.Name,
ResoucePatternType: patternType,
}

return acl, nil
Expand Down Expand Up @@ -86,10 +92,31 @@ func tfToAclFilter(s stringlyTypedACL) (sarama.AclFilter, error) {
}
f.ResourceType = rType

patternType := stringToACLPrefix(s.Resource.PatternTypeFilter)
if patternType == unknownConversion {
return f, fmt.Errorf("Unknown pattern type filter: '%s'", s.Resource.PatternTypeFilter)
}
f.ResourcePatternTypeFilter = patternType

return f, nil
}

func stringToACLPrefix(s string) sarama.AclResourcePatternType {
switch s {
case "Any":
return sarama.AclPatternAny
case "Match":
return sarama.AclPatternMatch
case "Literal":
return sarama.AclPatternLiteral
case "Prefixed":
return sarama.AclPatternPrefixed
}
return unknownConversion
}

func (c *Client) DeleteACL(s stringlyTypedACL) error {
log.Printf("[INFO] Deleting ACL %v\n", s)
broker, err := c.client.Controller()
if err != nil {
return err
Expand All @@ -101,9 +128,9 @@ func (c *Client) DeleteACL(s stringlyTypedACL) error {
}

req := &sarama.DeleteAclsRequest{
Version: int(c.getDeleteAclsRequestAPIVersion()),
Filters: []*sarama.AclFilter{&filter},
}
log.Printf("[INFO] Deleting ACL %v\n", s)

res, err := broker.DeleteAcls(req)
if err != nil {
Expand All @@ -119,6 +146,7 @@ func (c *Client) DeleteACL(s stringlyTypedACL) error {
}

func (c *Client) CreateACL(s stringlyTypedACL) error {
log.Printf("[DEBUG] Creating ACL %s", s)
broker, err := c.client.Controller()
if err != nil {
return err
Expand All @@ -129,6 +157,7 @@ func (c *Client) CreateACL(s stringlyTypedACL) error {
return err
}
req := &sarama.CreateAclsRequest{
Version: int(c.getCreateAclsRequestAPIVersion()),
AclCreations: []*sarama.AclCreation{ac},
}

Expand Down Expand Up @@ -211,6 +240,7 @@ func stringToAclPermissionType(in string) sarama.AclPermissionType {
}

func (c *Client) ListACLs() ([]*sarama.ResourceAcls, error) {
log.Printf("[INFO] Listing all ACLS")
broker, err := c.client.Controller()
if err != nil {
return nil, err
Expand All @@ -219,33 +249,42 @@ func (c *Client) ListACLs() ([]*sarama.ResourceAcls, error) {
if err != nil {
return nil, err
}

allResources := []*sarama.DescribeAclsRequest{
&sarama.DescribeAclsRequest{
Version: int(c.getDescribeAclsRequestAPIVersion()),
AclFilter: sarama.AclFilter{
ResourceType: sarama.AclResourceTopic,
PermissionType: sarama.AclPermissionAny,
Operation: sarama.AclOperationAny,
ResourceType: sarama.AclResourceTopic,
ResourcePatternTypeFilter: sarama.AclPatternAny,
PermissionType: sarama.AclPermissionAny,
Operation: sarama.AclOperationAny,
},
},
&sarama.DescribeAclsRequest{
Version: int(c.getDescribeAclsRequestAPIVersion()),
AclFilter: sarama.AclFilter{
ResourceType: sarama.AclResourceGroup,
PermissionType: sarama.AclPermissionAny,
Operation: sarama.AclOperationAny,
ResourceType: sarama.AclResourceGroup,
ResourcePatternTypeFilter: sarama.AclPatternAny,
PermissionType: sarama.AclPermissionAny,
Operation: sarama.AclOperationAny,
},
},
&sarama.DescribeAclsRequest{
Version: int(c.getDescribeAclsRequestAPIVersion()),
AclFilter: sarama.AclFilter{
ResourceType: sarama.AclResourceCluster,
PermissionType: sarama.AclPermissionAny,
Operation: sarama.AclOperationAny,
ResourceType: sarama.AclResourceCluster,
ResourcePatternTypeFilter: sarama.AclPatternAny,
PermissionType: sarama.AclPermissionAny,
Operation: sarama.AclOperationAny,
},
},
&sarama.DescribeAclsRequest{
Version: int(c.getDescribeAclsRequestAPIVersion()),
AclFilter: sarama.AclFilter{
ResourceType: sarama.AclResourceTransactionalID,
PermissionType: sarama.AclPermissionAny,
Operation: sarama.AclOperationAny,
ResourceType: sarama.AclResourceTransactionalID,
ResourcePatternTypeFilter: sarama.AclPatternAny,
PermissionType: sarama.AclPermissionAny,
Operation: sarama.AclOperationAny,
},
},
}
Expand Down
16 changes: 13 additions & 3 deletions kafka/resource_kafka_acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ func kafkaACLResource() *schema.Resource {
Required: true,
ForceNew: true,
},
"resource_pattern_type_filter": {
Type: schema.TypeString,
Required: false,
Optional: true,
Default: "Literal",
ForceNew: true,
},
"acl_principal": {
Type: schema.TypeString,
Required: true,
Expand Down Expand Up @@ -75,20 +82,22 @@ func aclDelete(d *schema.ResourceData, meta interface{}) error {
}

func aclRead(d *schema.ResourceData, meta interface{}) error {
log.Println("[INFO] Reading ACL")
c := meta.(*Client)
a := aclInfo(d)
log.Printf("[INFO] Reading ACL %s", a)

currentAcls, err := c.ListACLs()
if err != nil {
return err
}
for _, c := range currentAcls {
if c.ResourceName == a.Resource.Name {
log.Printf("[INFO] Found ACL %v", c)
return nil
}

}
//panic(fmt.Sprintf("ACL read not implmented: %v", d))
return nil
}

Expand All @@ -101,8 +110,9 @@ func aclInfo(d *schema.ResourceData) stringlyTypedACL {
PermissionType: d.Get("acl_permission_type").(string),
},
Resource: Resource{
Type: d.Get("resource_type").(string),
Name: d.Get("resource_name").(string),
Type: d.Get("resource_type").(string),
Name: d.Get("resource_name").(string),
PatternTypeFilter: d.Get("resource_pattern_type_filter").(string),
},
}
return s
Expand Down
10 changes: 9 additions & 1 deletion kafka/resource_kafka_acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func testResourceACL_initialCheck(s *terraform.State) error {
}

client := testProvider.Meta().(*Client)

acls, err := client.ListACLs()
if err != nil {
return err
Expand All @@ -52,6 +51,10 @@ func testResourceACL_initialCheck(s *terraform.State) error {
if acls[0].Acls[0].PermissionType != sarama.AclPermissionAllow {
return fmt.Errorf("Should be Allow, not %v", acls[0].Acls[0].PermissionType)
}

if acls[0].Resource.ResoucePatternType != sarama.AclPatternLiteral {
return fmt.Errorf("Should be Literal, not %v", acls[0].Resource.ResoucePatternType)
}
return nil
}

Expand Down Expand Up @@ -86,6 +89,9 @@ func testResourceACL_updateCheck(s *terraform.State) error {
return fmt.Errorf("Should be Deny, not %v", acls[0].Acls[0].PermissionType)
}

if acls[0].Resource.ResoucePatternType != sarama.AclPatternPrefixed {
return fmt.Errorf("Should be Prefixed, not %v", acls[0].Resource.ResoucePatternType)
}
return nil
}

Expand All @@ -101,6 +107,7 @@ provider "kafka" {
resource "kafka_acl" "test" {
resource_name = "syslog"
resource_type = "Topic"
resource_pattern_type_filter = "Literal"
acl_principal = "User:Alice"
acl_host = "*"
acl_operation = "Write"
Expand All @@ -120,6 +127,7 @@ provider "kafka" {
resource "kafka_acl" "test" {
resource_name = "syslog"
resource_type = "Topic"
resource_pattern_type_filter = "Prefixed"
acl_principal = "User:Alice"
acl_host = "*"
acl_operation = "Write"
Expand Down

0 comments on commit 58a9fb0

Please sign in to comment.