Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ACL cache and debouncing request queues for creation/deletion requests to make parallelism useful #357

Merged
merged 5 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.16
FROM golang:1.21

WORKDIR /go/src/github.com/Mongey/terraform-provider-kafka/

Expand Down
3 changes: 2 additions & 1 deletion GNUmakefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
GOFMT_FILES?=$$(find . -name '*.go' |grep -v vendor)
KAFKA_BOOTSTRAP_SERVERS ?= localhost:9092
default: build

build:
Expand All @@ -9,7 +10,7 @@ test:

testacc:
GODEBUG=x509ignoreCN=0 \
KAFKA_BOOTSTRAP_SERVERS=localhost:9092 \
KAFKA_BOOTSTRAP_SERVERS=$(KAFKA_BOOTSTRAP_SERVERS) \
KAFKA_CA_CERT=../secrets/ca.crt \
KAFKA_CLIENT_CERT=../secrets/client.pem \
KAFKA_CLIENT_KEY=../secrets/client.key \
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.testacc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ services:
testacc:
build: .
environment:
KAFKA_BOOTSTRAP_SERVER: kafka1:9090,kafka2:9090,kafka3:9090
KAFKA_BOOTSTRAP_SERVERS: kafka1:9090,kafka2:9090,kafka3:9090
entrypoint:
- make
- testacc
Expand Down
31 changes: 31 additions & 0 deletions kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,38 @@ type void struct{}

var member void

type aclCache struct {
acls []*sarama.ResourceAcls
mutex sync.RWMutex
valid bool
}

type aclDeletionQueue struct {
filters []*sarama.AclFilter
after time.Duration
timer *time.Timer
mutex sync.Mutex
waitChans []chan error
}

type aclCreationQueue struct {
creations []*sarama.AclCreation
after time.Duration
timer *time.Timer
mutex sync.Mutex
waitChans []chan error
}

type Client struct {
client sarama.Client
kafkaConfig *sarama.Config
config *Config
supportedAPIs map[int]int
topics map[string]void
topicsMutex sync.RWMutex
aclCache
aclDeletionQueue
aclCreationQueue
}

func NewClient(config *Config) (*Client, error) {
Expand Down Expand Up @@ -63,6 +88,12 @@ func NewClient(config *Config) (*Client, error) {
client: c,
config: config,
kafkaConfig: kc,
aclDeletionQueue: aclDeletionQueue{
after: time.Millisecond * 500,
},
aclCreationQueue: aclCreationQueue{
after: time.Millisecond * 500,
},
}

err = client.populateAPIVersions()
Expand Down
4 changes: 4 additions & 0 deletions kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ func (c *Config) newKafkaConfig() (*sarama.Config, error) {
kafkaConfig.Net.Proxy.Enable = true
kafkaConfig.Net.Proxy.Dialer = proxy.FromEnvironment()

kafkaConfig.Net.ReadTimeout = time.Duration(c.Timeout) * time.Second
kafkaConfig.Net.WriteTimeout = time.Duration(c.Timeout) * time.Second
kafkaConfig.Metadata.Timeout = time.Duration(c.Timeout) * time.Second

if c.saslEnabled() {
switch c.SASLMechanism {
case "scram-sha512":
Expand Down
161 changes: 129 additions & 32 deletions kafka/kafka_acls.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"log"
"strings"
"time"

"github.com/IBM/sarama"
)
Expand Down Expand Up @@ -115,47 +116,134 @@ func stringToACLPrefix(s string) sarama.AclResourcePatternType {
return unknownConversion
}

func (c *Client) enqueueDeleteACL(broker *sarama.Broker, filter *sarama.AclFilter) error {
c.aclDeletionQueue.mutex.Lock()
log.Printf("[DEBUG] Enqueueing ACL Deletion %v", *filter)
if c.aclDeletionQueue.timer != nil {
c.aclDeletionQueue.timer.Stop()
}
c.aclDeletionQueue.filters = append(c.aclDeletionQueue.filters, filter)
c.aclDeletionQueue.waitChans = append(c.aclDeletionQueue.waitChans, make(chan error))
var waitChan = c.aclDeletionQueue.waitChans[len(c.aclDeletionQueue.waitChans)-1]

c.aclDeletionQueue.timer = time.AfterFunc(c.aclDeletionQueue.after, func() {
c.aclDeletionQueue.mutex.Lock()
defer c.aclDeletionQueue.mutex.Unlock()
log.Printf("[INFO] Deleting ACLs %v", c.aclDeletionQueue.filters)
defer func() {
c.aclDeletionQueue.timer = nil
c.aclDeletionQueue.filters = nil
c.aclDeletionQueue.waitChans = nil
}()
req := &sarama.DeleteAclsRequest{
Version: int(c.getDeleteAclsRequestAPIVersion()),
Filters: c.aclDeletionQueue.filters,
}

res, err := broker.DeleteAcls(req)
if err != nil {
for _, wc := range c.aclDeletionQueue.waitChans {
wc <- err
}
return
}

if len(res.FilterResponses) != len(c.aclDeletionQueue.waitChans) {
for _, ch := range c.aclDeletionQueue.waitChans {
ch <- fmt.Errorf("unexpectedly got a different length (%d) of FilterResponses compared to queued requests (%d) - this shouldn't be possible", len(res.FilterResponses), len(c.aclDeletionQueue.waitChans))
}
return
}

c.InvalidateACLCache()
for i, r := range res.FilterResponses {
if r.Err != sarama.ErrNoError {
c.aclDeletionQueue.waitChans[i] <- r.Err
} else {
c.aclDeletionQueue.waitChans[i] <- nil
}
}
})

c.aclDeletionQueue.mutex.Unlock()
return <-waitChan
}

func (c *Client) DeleteACL(s StringlyTypedACL) error {
log.Printf("[INFO] Deleting ACL %v", s)
broker, err := c.client.Controller()
if err != nil {
return err
}
aclsBeforeDelete, err := c.ListACLs()
if err != nil {
return fmt.Errorf("Unable to list acls before deleting -- can't be sure we're doing the right thing: %s", err)
}

log.Printf("[INFO] Acls before deletion: %d", len(aclsBeforeDelete))
for _, acl := range aclsBeforeDelete {
log.Printf("[DEBUG] ACL: %v", acl)
}

filter, err := tfToAclFilter(s)
if err != nil {
return err
}

req := &sarama.DeleteAclsRequest{
Version: int(c.getDeleteAclsRequestAPIVersion()),
Filters: []*sarama.AclFilter{&filter},
}
err = c.enqueueDeleteACL(broker, &filter)

res, err := broker.DeleteAcls(req)
if err != nil {
return err
}

for _, r := range res.FilterResponses {
if r.Err != sarama.ErrNoError {
return r.Err
}
}
return nil
}

func (c *Client) enqueueCreateACL(broker *sarama.Broker, create *sarama.AclCreation) error {
c.aclCreationQueue.mutex.Lock()
if c.aclCreationQueue.timer != nil {
c.aclCreationQueue.timer.Stop()
}
log.Printf("[DEBUG] Enqueueing ACL Creation %v", create.Acl)
c.aclCreationQueue.creations = append(c.aclCreationQueue.creations, create)
c.aclCreationQueue.waitChans = append(c.aclCreationQueue.waitChans, make(chan error))

var waitChan = c.aclCreationQueue.waitChans[len(c.aclCreationQueue.waitChans)-1]
c.aclCreationQueue.timer = time.AfterFunc(c.aclCreationQueue.after, func() {
c.aclCreationQueue.mutex.Lock()
defer c.aclCreationQueue.mutex.Unlock()
log.Printf("[INFO] Creating ACLs %v", c.aclCreationQueue.creations)
defer func() {
c.aclCreationQueue.timer = nil
c.aclCreationQueue.creations = nil
c.aclCreationQueue.waitChans = nil
}()
req := &sarama.CreateAclsRequest{
Version: c.getCreateAclsRequestAPIVersion(),
AclCreations: c.aclCreationQueue.creations,
}

res, err := broker.CreateAcls(req)
if err != nil {
for _, wc := range c.aclCreationQueue.waitChans {
wc <- err
}
return
}
if len(res.AclCreationResponses) != len(c.aclCreationQueue.waitChans) {
for _, ch := range c.aclCreationQueue.waitChans {
ch <- fmt.Errorf("unexpectedly got a different length (%d) of AclCreationResponses compared to queued requests (%d) - this shouldn't be possible", len(res.AclCreationResponses), len(c.aclCreationQueue.waitChans))
}
return
}

c.InvalidateACLCache()

for i, r := range res.AclCreationResponses {
if r.Err != sarama.ErrNoError {
c.aclCreationQueue.waitChans[i] <- r.Err
} else {
c.aclCreationQueue.waitChans[i] <- nil
}
}

})

c.aclCreationQueue.mutex.Unlock()
return <-waitChan
}

func (c *Client) CreateACL(s StringlyTypedACL) error {
log.Printf("[DEBUG] Creating ACL %s", s)
broker, err := c.client.Controller()
if err != nil {
return err
Expand All @@ -165,21 +253,12 @@ func (c *Client) CreateACL(s StringlyTypedACL) error {
if err != nil {
return err
}
req := &sarama.CreateAclsRequest{
Version: c.getCreateAclsRequestAPIVersion(),
AclCreations: []*sarama.AclCreation{ac},
}

res, err := broker.CreateAcls(req)
err = c.enqueueCreateACL(broker, ac)
if err != nil {
return err
}

for _, r := range res.AclCreationResponses {
if r.Err != sarama.ErrNoError {
return r.Err
}
}
log.Printf("[DEBUG] Created ACL %s", s)

return nil
Expand Down Expand Up @@ -349,7 +428,24 @@ func (c *Client) DescribeACLs(s StringlyTypedACL) ([]*sarama.ResourceAcls, error
return aclsR.ResourceAcls, err
}

func (c *Client) InvalidateACLCache() {
c.aclCache.mutex.Lock()
c.aclCache.valid = false
c.aclCache.acls = nil
c.aclCache.mutex.Unlock()
}

func (c *Client) ListACLs() ([]*sarama.ResourceAcls, error) {
c.aclCache.mutex.RLock()
if c.aclCache.valid {
c.aclCache.mutex.RUnlock()
log.Printf("[INFO] Using cached ACL list")
return c.aclCache.acls, nil
}
c.aclCache.mutex.RUnlock()

c.aclCache.mutex.Lock()
defer c.aclCache.mutex.Unlock()
log.Printf("[INFO] Listing all ACLS")
broker, err := c.client.Controller()
if err != nil {
Expand Down Expand Up @@ -414,6 +510,7 @@ func (c *Client) ListACLs() ([]*sarama.ResourceAcls, error) {

res = append(res, aclsR.ResourceAcls...)
}

c.aclCache.valid = true
c.aclCache.acls = res
return res, err
}
9 changes: 9 additions & 0 deletions kafka/lazy_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@ func (c *LazyClient) CreateACL(s StringlyTypedACL) error {
return c.inner.CreateACL(s)
}

func (c *LazyClient) InvalidateACLCache() error {
err := c.init()
if err != nil {
return err
}
c.inner.InvalidateACLCache()
return nil
}

func (c *LazyClient) ListACLs() ([]*sarama.ResourceAcls, error) {
err := c.init()
if err != nil {
Expand Down
11 changes: 9 additions & 2 deletions kafka/resource_kafka_acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ func TestAcc_ACLDeletedOutsideOfTerraform(t *testing.T) {

func testAccCheckAclDestroy(name string) error {
client := testProvider.Meta().(*LazyClient)
err := client.InvalidateACLCache()
if err != nil {
return err
}
acls, err := client.ListACLs()
if err != nil {
return err
Expand Down Expand Up @@ -165,7 +169,10 @@ func testResourceACL_initialCheck(s *terraform.State) error {

func testResourceACL_updateCheck(s *terraform.State) error {
client := testProvider.Meta().(*LazyClient)

err := client.InvalidateACLCache()
if err != nil {
return err
}
acls, err := client.ListACLs()
if err != nil {
return err
Expand Down Expand Up @@ -215,7 +222,7 @@ func testResourceACL_updateCheck(s *terraform.State) error {
return fmt.Errorf("Should be for *")
}
if acl.Acls[0].PermissionType != sarama.AclPermissionDeny {
return fmt.Errorf("Should be Deny, not %v", acl.Acls[0].PermissionType)
return fmt.Errorf("should be Deny, not %v", acl.Acls[0].PermissionType.String())
}

if acl.Resource.ResourcePatternType != sarama.AclPatternPrefixed {
Expand Down
Loading