From d3a75e9e009064ab3825bb78a4678d56f282f2e7 Mon Sep 17 00:00:00 2001 From: Anonymous Date: Tue, 7 Nov 2023 17:03:46 +0000 Subject: [PATCH 1/4] Add ACL cache and debouncing queues for creation/deletion requests to make parallelism useful/cheap --- kafka/client.go | 31 +++++++++ kafka/kafka_acls.go | 160 +++++++++++++++++++++++++++++++++++--------- 2 files changed, 159 insertions(+), 32 deletions(-) diff --git a/kafka/client.go b/kafka/client.go index a7affbdf..a2184470 100644 --- a/kafka/client.go +++ b/kafka/client.go @@ -21,6 +21,28 @@ type void struct{} var member void +type aclCache struct { + acls []*sarama.ResourceAcls + mutex sync.RWMutex + valid bool +} + +type aclDeletionQueue struct { + filters []*sarama.AclFilter + after time.Duration + timer *time.Timer + mutex sync.Mutex + waitChans []chan error +} + +type aclCreationQueue struct { + creations []*sarama.AclCreation + after time.Duration + timer *time.Timer + mutex sync.Mutex + waitChans []chan error +} + type Client struct { client sarama.Client kafkaConfig *sarama.Config @@ -28,6 +50,9 @@ type Client struct { supportedAPIs map[int]int topics map[string]void topicsMutex sync.RWMutex + aclCache + aclDeletionQueue + aclCreationQueue } func NewClient(config *Config) (*Client, error) { @@ -63,6 +88,12 @@ func NewClient(config *Config) (*Client, error) { client: c, config: config, kafkaConfig: kc, + aclDeletionQueue: aclDeletionQueue{ + after: time.Millisecond * 500, + }, + aclCreationQueue: aclCreationQueue{ + after: time.Millisecond * 500, + }, } err = client.populateAPIVersions() diff --git a/kafka/kafka_acls.go b/kafka/kafka_acls.go index 4ca0b072..32a4269d 100644 --- a/kafka/kafka_acls.go +++ b/kafka/kafka_acls.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "strings" + "time" "github.com/IBM/sarama" ) @@ -115,47 +116,140 @@ func stringToACLPrefix(s string) sarama.AclResourcePatternType { return unknownConversion } +func (c *Client) enqueueDeleteACL(broker *sarama.Broker, filter *sarama.AclFilter) error { + c.aclDeletionQueue.mutex.Lock() + log.Printf("[DEBUG] Enqueueing ACL Deletion %v", *filter) + if c.aclDeletionQueue.timer != nil { + c.aclDeletionQueue.timer.Stop() + } + c.aclDeletionQueue.filters = append(c.aclDeletionQueue.filters, filter) + c.aclDeletionQueue.waitChans = append(c.aclDeletionQueue.waitChans, make(chan error)) + var waitChan = c.aclDeletionQueue.waitChans[len(c.aclDeletionQueue.waitChans)-1] + + c.aclDeletionQueue.timer = time.AfterFunc(c.aclDeletionQueue.after, func() { + c.aclDeletionQueue.mutex.Lock() + defer c.aclDeletionQueue.mutex.Unlock() + log.Printf("[INFO] Deleting ACLs %v", c.aclDeletionQueue.filters) + defer func() { + c.aclDeletionQueue.timer = nil + c.aclDeletionQueue.filters = nil + c.aclDeletionQueue.waitChans = nil + }() + req := &sarama.DeleteAclsRequest{ + Version: int(c.getDeleteAclsRequestAPIVersion()), + Filters: c.aclDeletionQueue.filters, + } + + res, err := broker.DeleteAcls(req) + if err != nil { + for _, wc := range c.aclDeletionQueue.waitChans { + wc <- err + } + return + } + + if len(res.FilterResponses) != len(c.aclDeletionQueue.waitChans) { + for _, ch := range c.aclDeletionQueue.waitChans { + ch <- fmt.Errorf("unexpectedly got a different length (%d) of FilterResponses compared to queued requests (%d) - this shouldn't be possible", len(res.FilterResponses), len(c.aclDeletionQueue.waitChans)) + } + return + } + + c.aclCache.mutex.Lock() + c.aclCache.valid = false + c.aclCache.acls = nil + c.aclCache.mutex.Unlock() + for i, r := range res.FilterResponses { + if r.Err != sarama.ErrNoError { + c.aclDeletionQueue.waitChans[i] <- r.Err + } else { + c.aclDeletionQueue.waitChans[i] <- nil + } + } + }) + + c.aclDeletionQueue.mutex.Unlock() + return <-waitChan +} + func (c *Client) DeleteACL(s StringlyTypedACL) error { - log.Printf("[INFO] Deleting ACL %v", s) broker, err := c.client.Controller() if err != nil { return err } - aclsBeforeDelete, err := c.ListACLs() - if err != nil { - return fmt.Errorf("Unable to list acls before deleting -- can't be sure we're doing the right thing: %s", err) - } - - log.Printf("[INFO] Acls before deletion: %d", len(aclsBeforeDelete)) - for _, acl := range aclsBeforeDelete { - log.Printf("[DEBUG] ACL: %v", acl) - } filter, err := tfToAclFilter(s) if err != nil { return err } - req := &sarama.DeleteAclsRequest{ - Version: int(c.getDeleteAclsRequestAPIVersion()), - Filters: []*sarama.AclFilter{&filter}, - } + err = c.enqueueDeleteACL(broker, &filter) - res, err := broker.DeleteAcls(req) if err != nil { return err } - for _, r := range res.FilterResponses { - if r.Err != sarama.ErrNoError { - return r.Err - } - } return nil } +func (c *Client) enqueueCreateACL(broker *sarama.Broker, create *sarama.AclCreation) error { + c.aclCreationQueue.mutex.Lock() + if c.aclCreationQueue.timer != nil { + c.aclCreationQueue.timer.Stop() + } + log.Printf("[DEBUG] Enqueueing ACL Creation %v", create.Acl) + c.aclCreationQueue.creations = append(c.aclCreationQueue.creations, create) + c.aclCreationQueue.waitChans = append(c.aclCreationQueue.waitChans, make(chan error)) + + var waitChan = c.aclCreationQueue.waitChans[len(c.aclCreationQueue.waitChans)-1] + c.aclCreationQueue.timer = time.AfterFunc(c.aclCreationQueue.after, func() { + c.aclCreationQueue.mutex.Lock() + defer c.aclCreationQueue.mutex.Unlock() + log.Printf("[INFO] Creating ACLs %v", c.aclCreationQueue.creations) + defer func() { + c.aclCreationQueue.timer = nil + c.aclCreationQueue.creations = nil + c.aclCreationQueue.waitChans = nil + }() + req := &sarama.CreateAclsRequest{ + Version: c.getCreateAclsRequestAPIVersion(), + AclCreations: c.aclCreationQueue.creations, + } + + res, err := broker.CreateAcls(req) + if err != nil { + for _, wc := range c.aclCreationQueue.waitChans { + wc <- err + } + return + } + if len(res.AclCreationResponses) != len(c.aclCreationQueue.waitChans) { + for _, ch := range c.aclCreationQueue.waitChans { + ch <- fmt.Errorf("unexpectedly got a different length (%d) of AclCreationResponses compared to queued requests (%d) - this shouldn't be possible", len(res.AclCreationResponses), len(c.aclCreationQueue.waitChans)) + } + return + } + + c.aclCache.mutex.Lock() + c.aclCache.valid = false + c.aclCache.acls = nil + c.aclCache.mutex.Unlock() + + for i, r := range res.AclCreationResponses { + if r.Err != sarama.ErrNoError { + c.aclCreationQueue.waitChans[i] <- r.Err + } else { + c.aclCreationQueue.waitChans[i] <- nil + } + } + + }) + + c.aclCreationQueue.mutex.Unlock() + return <-waitChan +} + func (c *Client) CreateACL(s StringlyTypedACL) error { - log.Printf("[DEBUG] Creating ACL %s", s) broker, err := c.client.Controller() if err != nil { return err @@ -165,21 +259,12 @@ func (c *Client) CreateACL(s StringlyTypedACL) error { if err != nil { return err } - req := &sarama.CreateAclsRequest{ - Version: c.getCreateAclsRequestAPIVersion(), - AclCreations: []*sarama.AclCreation{ac}, - } - res, err := broker.CreateAcls(req) + err = c.enqueueCreateACL(broker, ac) if err != nil { return err } - for _, r := range res.AclCreationResponses { - if r.Err != sarama.ErrNoError { - return r.Err - } - } log.Printf("[DEBUG] Created ACL %s", s) return nil @@ -350,6 +435,16 @@ func (c *Client) DescribeACLs(s StringlyTypedACL) ([]*sarama.ResourceAcls, error } func (c *Client) ListACLs() ([]*sarama.ResourceAcls, error) { + c.aclCache.mutex.RLock() + if c.aclCache.valid { + c.aclCache.mutex.RUnlock() + log.Printf("[INFO] Using cached ACL list") + return c.aclCache.acls, nil + } + c.aclCache.mutex.RUnlock() + + c.aclCache.mutex.Lock() + defer c.aclCache.mutex.Unlock() log.Printf("[INFO] Listing all ACLS") broker, err := c.client.Controller() if err != nil { @@ -414,6 +509,7 @@ func (c *Client) ListACLs() ([]*sarama.ResourceAcls, error) { res = append(res, aclsR.ResourceAcls...) } - + c.aclCache.valid = true + c.aclCache.acls = res return res, err } From 4126d5afa037fa6cbc49862860f6155ec3dc712e Mon Sep 17 00:00:00 2001 From: Anonymous Date: Mon, 11 Dec 2023 19:35:35 +0000 Subject: [PATCH 2/4] Invalidate ACL cache in tests which list ACLs --- kafka/kafka_acls.go | 17 +++++++++-------- kafka/lazy_client.go | 9 +++++++++ kafka/resource_kafka_acl_test.go | 11 +++++++++-- 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/kafka/kafka_acls.go b/kafka/kafka_acls.go index 32a4269d..3a241a60 100644 --- a/kafka/kafka_acls.go +++ b/kafka/kafka_acls.go @@ -155,10 +155,7 @@ func (c *Client) enqueueDeleteACL(broker *sarama.Broker, filter *sarama.AclFilte return } - c.aclCache.mutex.Lock() - c.aclCache.valid = false - c.aclCache.acls = nil - c.aclCache.mutex.Unlock() + c.InvalidateACLCache() for i, r := range res.FilterResponses { if r.Err != sarama.ErrNoError { c.aclDeletionQueue.waitChans[i] <- r.Err @@ -230,10 +227,7 @@ func (c *Client) enqueueCreateACL(broker *sarama.Broker, create *sarama.AclCreat return } - c.aclCache.mutex.Lock() - c.aclCache.valid = false - c.aclCache.acls = nil - c.aclCache.mutex.Unlock() + c.InvalidateACLCache() for i, r := range res.AclCreationResponses { if r.Err != sarama.ErrNoError { @@ -434,6 +428,13 @@ func (c *Client) DescribeACLs(s StringlyTypedACL) ([]*sarama.ResourceAcls, error return aclsR.ResourceAcls, err } +func (c *Client) InvalidateACLCache() { + c.aclCache.mutex.Lock() + c.aclCache.valid = false + c.aclCache.acls = nil + c.aclCache.mutex.Unlock() +} + func (c *Client) ListACLs() ([]*sarama.ResourceAcls, error) { c.aclCache.mutex.RLock() if c.aclCache.valid { diff --git a/kafka/lazy_client.go b/kafka/lazy_client.go index 7840c1a5..617f7e14 100644 --- a/kafka/lazy_client.go +++ b/kafka/lazy_client.go @@ -130,6 +130,15 @@ func (c *LazyClient) CreateACL(s StringlyTypedACL) error { return c.inner.CreateACL(s) } +func (c *LazyClient) InvalidateACLCache() error { + err := c.init() + if err != nil { + return err + } + c.inner.InvalidateACLCache() + return nil +} + func (c *LazyClient) ListACLs() ([]*sarama.ResourceAcls, error) { err := c.init() if err != nil { diff --git a/kafka/resource_kafka_acl_test.go b/kafka/resource_kafka_acl_test.go index ca230dcf..ac9d9990 100644 --- a/kafka/resource_kafka_acl_test.go +++ b/kafka/resource_kafka_acl_test.go @@ -99,6 +99,10 @@ func TestAcc_ACLDeletedOutsideOfTerraform(t *testing.T) { func testAccCheckAclDestroy(name string) error { client := testProvider.Meta().(*LazyClient) + err := client.InvalidateACLCache() + if err != nil { + return err + } acls, err := client.ListACLs() if err != nil { return err @@ -165,7 +169,10 @@ func testResourceACL_initialCheck(s *terraform.State) error { func testResourceACL_updateCheck(s *terraform.State) error { client := testProvider.Meta().(*LazyClient) - + err := client.InvalidateACLCache() + if err != nil { + return err + } acls, err := client.ListACLs() if err != nil { return err @@ -215,7 +222,7 @@ func testResourceACL_updateCheck(s *terraform.State) error { return fmt.Errorf("Should be for *") } if acl.Acls[0].PermissionType != sarama.AclPermissionDeny { - return fmt.Errorf("Should be Deny, not %v", acl.Acls[0].PermissionType) + return fmt.Errorf("should be Deny, not %v", acl.Acls[0].PermissionType.String()) } if acl.Resource.ResourcePatternType != sarama.AclPatternPrefixed { From b356daa30996e2950c2925ae131e0887518b5321 Mon Sep 17 00:00:00 2001 From: Anonymous Date: Mon, 19 Feb 2024 11:42:10 +0000 Subject: [PATCH 3/4] Bump Go to 1.21, set and use KAFKA_BOOTSTRAP_SERVERS correctly in makefile to use internal advertised listeners when running testacc under docker --- Dockerfile | 2 +- GNUmakefile | 3 ++- docker-compose.testacc.yaml | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Dockerfile b/Dockerfile index e060796f..cbef087a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.16 +FROM golang:1.21 WORKDIR /go/src/github.com/Mongey/terraform-provider-kafka/ diff --git a/GNUmakefile b/GNUmakefile index 7d44c9f3..ce123d8d 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -1,4 +1,5 @@ GOFMT_FILES?=$$(find . -name '*.go' |grep -v vendor) +KAFKA_BOOTSTRAP_SERVERS ?= localhost:9092 default: build build: @@ -9,7 +10,7 @@ test: testacc: GODEBUG=x509ignoreCN=0 \ - KAFKA_BOOTSTRAP_SERVERS=localhost:9092 \ + KAFKA_BOOTSTRAP_SERVERS=$(KAFKA_BOOTSTRAP_SERVERS) \ KAFKA_CA_CERT=../secrets/ca.crt \ KAFKA_CLIENT_CERT=../secrets/client.pem \ KAFKA_CLIENT_KEY=../secrets/client.key \ diff --git a/docker-compose.testacc.yaml b/docker-compose.testacc.yaml index a30d5975..f4c5a933 100644 --- a/docker-compose.testacc.yaml +++ b/docker-compose.testacc.yaml @@ -4,7 +4,7 @@ services: testacc: build: . environment: - KAFKA_BOOTSTRAP_SERVER: kafka1:9090,kafka2:9090,kafka3:9090 + KAFKA_BOOTSTRAP_SERVERS: kafka1:9090,kafka2:9090,kafka3:9090 entrypoint: - make - testacc From e94ca555c46dcbda8a1e5108ee94453892b7d8a5 Mon Sep 17 00:00:00 2001 From: Anonymous Date: Thu, 29 Feb 2024 15:47:02 +0000 Subject: [PATCH 4/4] Set read/write/metadata timeouts to the provider configured timeout --- kafka/config.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/kafka/config.go b/kafka/config.go index 0d58de24..13dbd3f5 100644 --- a/kafka/config.go +++ b/kafka/config.go @@ -50,6 +50,10 @@ func (c *Config) newKafkaConfig() (*sarama.Config, error) { kafkaConfig.Net.Proxy.Enable = true kafkaConfig.Net.Proxy.Dialer = proxy.FromEnvironment() + kafkaConfig.Net.ReadTimeout = time.Duration(c.Timeout) * time.Second + kafkaConfig.Net.WriteTimeout = time.Duration(c.Timeout) * time.Second + kafkaConfig.Metadata.Timeout = time.Duration(c.Timeout) * time.Second + if c.saslEnabled() { switch c.SASLMechanism { case "scram-sha512":