diff --git a/Dockerfile b/Dockerfile index e060796f..cbef087a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.16 +FROM golang:1.21 WORKDIR /go/src/github.com/Mongey/terraform-provider-kafka/ diff --git a/GNUmakefile b/GNUmakefile index 7d44c9f3..ce123d8d 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -1,4 +1,5 @@ GOFMT_FILES?=$$(find . -name '*.go' |grep -v vendor) +KAFKA_BOOTSTRAP_SERVERS ?= localhost:9092 default: build build: @@ -9,7 +10,7 @@ test: testacc: GODEBUG=x509ignoreCN=0 \ - KAFKA_BOOTSTRAP_SERVERS=localhost:9092 \ + KAFKA_BOOTSTRAP_SERVERS=$(KAFKA_BOOTSTRAP_SERVERS) \ KAFKA_CA_CERT=../secrets/ca.crt \ KAFKA_CLIENT_CERT=../secrets/client.pem \ KAFKA_CLIENT_KEY=../secrets/client.key \ diff --git a/docker-compose.testacc.yaml b/docker-compose.testacc.yaml index a30d5975..f4c5a933 100644 --- a/docker-compose.testacc.yaml +++ b/docker-compose.testacc.yaml @@ -4,7 +4,7 @@ services: testacc: build: . environment: - KAFKA_BOOTSTRAP_SERVER: kafka1:9090,kafka2:9090,kafka3:9090 + KAFKA_BOOTSTRAP_SERVERS: kafka1:9090,kafka2:9090,kafka3:9090 entrypoint: - make - testacc diff --git a/kafka/client.go b/kafka/client.go index 47ef6cb7..de533682 100644 --- a/kafka/client.go +++ b/kafka/client.go @@ -21,6 +21,28 @@ type void struct{} var member void +type aclCache struct { + acls []*sarama.ResourceAcls + mutex sync.RWMutex + valid bool +} + +type aclDeletionQueue struct { + filters []*sarama.AclFilter + after time.Duration + timer *time.Timer + mutex sync.Mutex + waitChans []chan error +} + +type aclCreationQueue struct { + creations []*sarama.AclCreation + after time.Duration + timer *time.Timer + mutex sync.Mutex + waitChans []chan error +} + type Client struct { client sarama.Client kafkaConfig *sarama.Config @@ -28,6 +50,9 @@ type Client struct { supportedAPIs map[int]int topics map[string]void topicsMutex sync.RWMutex + aclCache + aclDeletionQueue + aclCreationQueue } func NewClient(config *Config) (*Client, error) { @@ -63,6 +88,12 @@ func NewClient(config *Config) (*Client, error) { client: c, config: config, kafkaConfig: kc, + aclDeletionQueue: aclDeletionQueue{ + after: time.Millisecond * 500, + }, + aclCreationQueue: aclCreationQueue{ + after: time.Millisecond * 500, + }, } err = client.populateAPIVersions() diff --git a/kafka/config.go b/kafka/config.go index 7f1e77b7..5cee0ced 100644 --- a/kafka/config.go +++ b/kafka/config.go @@ -115,6 +115,10 @@ func (c *Config) newKafkaConfig() (*sarama.Config, error) { kafkaConfig.Net.Proxy.Enable = true kafkaConfig.Net.Proxy.Dialer = proxy.FromEnvironment() + kafkaConfig.Net.ReadTimeout = time.Duration(c.Timeout) * time.Second + kafkaConfig.Net.WriteTimeout = time.Duration(c.Timeout) * time.Second + kafkaConfig.Metadata.Timeout = time.Duration(c.Timeout) * time.Second + if c.saslEnabled() { switch c.SASLMechanism { case "scram-sha512": diff --git a/kafka/kafka_acls.go b/kafka/kafka_acls.go index 4ca0b072..3a241a60 100644 --- a/kafka/kafka_acls.go +++ b/kafka/kafka_acls.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "strings" + "time" "github.com/IBM/sarama" ) @@ -115,47 +116,134 @@ func stringToACLPrefix(s string) sarama.AclResourcePatternType { return unknownConversion } +func (c *Client) enqueueDeleteACL(broker *sarama.Broker, filter *sarama.AclFilter) error { + c.aclDeletionQueue.mutex.Lock() + log.Printf("[DEBUG] Enqueueing ACL Deletion %v", *filter) + if c.aclDeletionQueue.timer != nil { + c.aclDeletionQueue.timer.Stop() + } + c.aclDeletionQueue.filters = append(c.aclDeletionQueue.filters, filter) + c.aclDeletionQueue.waitChans = append(c.aclDeletionQueue.waitChans, make(chan error)) + var waitChan = c.aclDeletionQueue.waitChans[len(c.aclDeletionQueue.waitChans)-1] + + c.aclDeletionQueue.timer = time.AfterFunc(c.aclDeletionQueue.after, func() { + c.aclDeletionQueue.mutex.Lock() + defer c.aclDeletionQueue.mutex.Unlock() + log.Printf("[INFO] Deleting ACLs %v", c.aclDeletionQueue.filters) + defer func() { + c.aclDeletionQueue.timer = nil + c.aclDeletionQueue.filters = nil + c.aclDeletionQueue.waitChans = nil + }() + req := &sarama.DeleteAclsRequest{ + Version: int(c.getDeleteAclsRequestAPIVersion()), + Filters: c.aclDeletionQueue.filters, + } + + res, err := broker.DeleteAcls(req) + if err != nil { + for _, wc := range c.aclDeletionQueue.waitChans { + wc <- err + } + return + } + + if len(res.FilterResponses) != len(c.aclDeletionQueue.waitChans) { + for _, ch := range c.aclDeletionQueue.waitChans { + ch <- fmt.Errorf("unexpectedly got a different length (%d) of FilterResponses compared to queued requests (%d) - this shouldn't be possible", len(res.FilterResponses), len(c.aclDeletionQueue.waitChans)) + } + return + } + + c.InvalidateACLCache() + for i, r := range res.FilterResponses { + if r.Err != sarama.ErrNoError { + c.aclDeletionQueue.waitChans[i] <- r.Err + } else { + c.aclDeletionQueue.waitChans[i] <- nil + } + } + }) + + c.aclDeletionQueue.mutex.Unlock() + return <-waitChan +} + func (c *Client) DeleteACL(s StringlyTypedACL) error { - log.Printf("[INFO] Deleting ACL %v", s) broker, err := c.client.Controller() if err != nil { return err } - aclsBeforeDelete, err := c.ListACLs() - if err != nil { - return fmt.Errorf("Unable to list acls before deleting -- can't be sure we're doing the right thing: %s", err) - } - - log.Printf("[INFO] Acls before deletion: %d", len(aclsBeforeDelete)) - for _, acl := range aclsBeforeDelete { - log.Printf("[DEBUG] ACL: %v", acl) - } filter, err := tfToAclFilter(s) if err != nil { return err } - req := &sarama.DeleteAclsRequest{ - Version: int(c.getDeleteAclsRequestAPIVersion()), - Filters: []*sarama.AclFilter{&filter}, - } + err = c.enqueueDeleteACL(broker, &filter) - res, err := broker.DeleteAcls(req) if err != nil { return err } - for _, r := range res.FilterResponses { - if r.Err != sarama.ErrNoError { - return r.Err - } - } return nil } +func (c *Client) enqueueCreateACL(broker *sarama.Broker, create *sarama.AclCreation) error { + c.aclCreationQueue.mutex.Lock() + if c.aclCreationQueue.timer != nil { + c.aclCreationQueue.timer.Stop() + } + log.Printf("[DEBUG] Enqueueing ACL Creation %v", create.Acl) + c.aclCreationQueue.creations = append(c.aclCreationQueue.creations, create) + c.aclCreationQueue.waitChans = append(c.aclCreationQueue.waitChans, make(chan error)) + + var waitChan = c.aclCreationQueue.waitChans[len(c.aclCreationQueue.waitChans)-1] + c.aclCreationQueue.timer = time.AfterFunc(c.aclCreationQueue.after, func() { + c.aclCreationQueue.mutex.Lock() + defer c.aclCreationQueue.mutex.Unlock() + log.Printf("[INFO] Creating ACLs %v", c.aclCreationQueue.creations) + defer func() { + c.aclCreationQueue.timer = nil + c.aclCreationQueue.creations = nil + c.aclCreationQueue.waitChans = nil + }() + req := &sarama.CreateAclsRequest{ + Version: c.getCreateAclsRequestAPIVersion(), + AclCreations: c.aclCreationQueue.creations, + } + + res, err := broker.CreateAcls(req) + if err != nil { + for _, wc := range c.aclCreationQueue.waitChans { + wc <- err + } + return + } + if len(res.AclCreationResponses) != len(c.aclCreationQueue.waitChans) { + for _, ch := range c.aclCreationQueue.waitChans { + ch <- fmt.Errorf("unexpectedly got a different length (%d) of AclCreationResponses compared to queued requests (%d) - this shouldn't be possible", len(res.AclCreationResponses), len(c.aclCreationQueue.waitChans)) + } + return + } + + c.InvalidateACLCache() + + for i, r := range res.AclCreationResponses { + if r.Err != sarama.ErrNoError { + c.aclCreationQueue.waitChans[i] <- r.Err + } else { + c.aclCreationQueue.waitChans[i] <- nil + } + } + + }) + + c.aclCreationQueue.mutex.Unlock() + return <-waitChan +} + func (c *Client) CreateACL(s StringlyTypedACL) error { - log.Printf("[DEBUG] Creating ACL %s", s) broker, err := c.client.Controller() if err != nil { return err @@ -165,21 +253,12 @@ func (c *Client) CreateACL(s StringlyTypedACL) error { if err != nil { return err } - req := &sarama.CreateAclsRequest{ - Version: c.getCreateAclsRequestAPIVersion(), - AclCreations: []*sarama.AclCreation{ac}, - } - res, err := broker.CreateAcls(req) + err = c.enqueueCreateACL(broker, ac) if err != nil { return err } - for _, r := range res.AclCreationResponses { - if r.Err != sarama.ErrNoError { - return r.Err - } - } log.Printf("[DEBUG] Created ACL %s", s) return nil @@ -349,7 +428,24 @@ func (c *Client) DescribeACLs(s StringlyTypedACL) ([]*sarama.ResourceAcls, error return aclsR.ResourceAcls, err } +func (c *Client) InvalidateACLCache() { + c.aclCache.mutex.Lock() + c.aclCache.valid = false + c.aclCache.acls = nil + c.aclCache.mutex.Unlock() +} + func (c *Client) ListACLs() ([]*sarama.ResourceAcls, error) { + c.aclCache.mutex.RLock() + if c.aclCache.valid { + c.aclCache.mutex.RUnlock() + log.Printf("[INFO] Using cached ACL list") + return c.aclCache.acls, nil + } + c.aclCache.mutex.RUnlock() + + c.aclCache.mutex.Lock() + defer c.aclCache.mutex.Unlock() log.Printf("[INFO] Listing all ACLS") broker, err := c.client.Controller() if err != nil { @@ -414,6 +510,7 @@ func (c *Client) ListACLs() ([]*sarama.ResourceAcls, error) { res = append(res, aclsR.ResourceAcls...) } - + c.aclCache.valid = true + c.aclCache.acls = res return res, err } diff --git a/kafka/lazy_client.go b/kafka/lazy_client.go index 7840c1a5..617f7e14 100644 --- a/kafka/lazy_client.go +++ b/kafka/lazy_client.go @@ -130,6 +130,15 @@ func (c *LazyClient) CreateACL(s StringlyTypedACL) error { return c.inner.CreateACL(s) } +func (c *LazyClient) InvalidateACLCache() error { + err := c.init() + if err != nil { + return err + } + c.inner.InvalidateACLCache() + return nil +} + func (c *LazyClient) ListACLs() ([]*sarama.ResourceAcls, error) { err := c.init() if err != nil { diff --git a/kafka/resource_kafka_acl_test.go b/kafka/resource_kafka_acl_test.go index ca230dcf..ac9d9990 100644 --- a/kafka/resource_kafka_acl_test.go +++ b/kafka/resource_kafka_acl_test.go @@ -99,6 +99,10 @@ func TestAcc_ACLDeletedOutsideOfTerraform(t *testing.T) { func testAccCheckAclDestroy(name string) error { client := testProvider.Meta().(*LazyClient) + err := client.InvalidateACLCache() + if err != nil { + return err + } acls, err := client.ListACLs() if err != nil { return err @@ -165,7 +169,10 @@ func testResourceACL_initialCheck(s *terraform.State) error { func testResourceACL_updateCheck(s *terraform.State) error { client := testProvider.Meta().(*LazyClient) - + err := client.InvalidateACLCache() + if err != nil { + return err + } acls, err := client.ListACLs() if err != nil { return err @@ -215,7 +222,7 @@ func testResourceACL_updateCheck(s *terraform.State) error { return fmt.Errorf("Should be for *") } if acl.Acls[0].PermissionType != sarama.AclPermissionDeny { - return fmt.Errorf("Should be Deny, not %v", acl.Acls[0].PermissionType) + return fmt.Errorf("should be Deny, not %v", acl.Acls[0].PermissionType.String()) } if acl.Resource.ResourcePatternType != sarama.AclPatternPrefixed {