diff --git a/kafka/client.go b/kafka/client.go index 93293a70..a2184470 100644 --- a/kafka/client.go +++ b/kafka/client.go @@ -21,6 +21,28 @@ type void struct{} var member void +type aclCache struct { + acls []*sarama.ResourceAcls + mutex sync.RWMutex + valid bool +} + +type aclDeletionQueue struct { + filters []*sarama.AclFilter + after time.Duration + timer *time.Timer + mutex sync.Mutex + waitChans []chan error +} + +type aclCreationQueue struct { + creations []*sarama.AclCreation + after time.Duration + timer *time.Timer + mutex sync.Mutex + waitChans []chan error +} + type Client struct { client sarama.Client kafkaConfig *sarama.Config @@ -28,6 +50,9 @@ type Client struct { supportedAPIs map[int]int topics map[string]void topicsMutex sync.RWMutex + aclCache + aclDeletionQueue + aclCreationQueue } func NewClient(config *Config) (*Client, error) { @@ -63,6 +88,12 @@ func NewClient(config *Config) (*Client, error) { client: c, config: config, kafkaConfig: kc, + aclDeletionQueue: aclDeletionQueue{ + after: time.Millisecond * 500, + }, + aclCreationQueue: aclCreationQueue{ + after: time.Millisecond * 500, + }, } err = client.populateAPIVersions() @@ -543,7 +574,7 @@ func (c *Client) versionForKey(apiKey, wantedMaxVersion int) int { return 0 } -//topicConfig retrives the non-default config map for a topic +// topicConfig retrives the non-default config map for a topic func (c *Client) topicConfig(topic string) (map[string]*string, error) { conf := map[string]*string{} request := &sarama.DescribeConfigsRequest{ diff --git a/kafka/kafka_acls.go b/kafka/kafka_acls.go index 4ca0b072..32a4269d 100644 --- a/kafka/kafka_acls.go +++ b/kafka/kafka_acls.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "strings" + "time" "github.com/IBM/sarama" ) @@ -115,47 +116,140 @@ func stringToACLPrefix(s string) sarama.AclResourcePatternType { return unknownConversion } +func (c *Client) enqueueDeleteACL(broker *sarama.Broker, filter *sarama.AclFilter) error { + c.aclDeletionQueue.mutex.Lock() + log.Printf("[DEBUG] Enqueueing ACL Deletion %v", *filter) + if c.aclDeletionQueue.timer != nil { + c.aclDeletionQueue.timer.Stop() + } + c.aclDeletionQueue.filters = append(c.aclDeletionQueue.filters, filter) + c.aclDeletionQueue.waitChans = append(c.aclDeletionQueue.waitChans, make(chan error)) + var waitChan = c.aclDeletionQueue.waitChans[len(c.aclDeletionQueue.waitChans)-1] + + c.aclDeletionQueue.timer = time.AfterFunc(c.aclDeletionQueue.after, func() { + c.aclDeletionQueue.mutex.Lock() + defer c.aclDeletionQueue.mutex.Unlock() + log.Printf("[INFO] Deleting ACLs %v", c.aclDeletionQueue.filters) + defer func() { + c.aclDeletionQueue.timer = nil + c.aclDeletionQueue.filters = nil + c.aclDeletionQueue.waitChans = nil + }() + req := &sarama.DeleteAclsRequest{ + Version: int(c.getDeleteAclsRequestAPIVersion()), + Filters: c.aclDeletionQueue.filters, + } + + res, err := broker.DeleteAcls(req) + if err != nil { + for _, wc := range c.aclDeletionQueue.waitChans { + wc <- err + } + return + } + + if len(res.FilterResponses) != len(c.aclDeletionQueue.waitChans) { + for _, ch := range c.aclDeletionQueue.waitChans { + ch <- fmt.Errorf("unexpectedly got a different length (%d) of FilterResponses compared to queued requests (%d) - this shouldn't be possible", len(res.FilterResponses), len(c.aclDeletionQueue.waitChans)) + } + return + } + + c.aclCache.mutex.Lock() + c.aclCache.valid = false + c.aclCache.acls = nil + c.aclCache.mutex.Unlock() + for i, r := range res.FilterResponses { + if r.Err != sarama.ErrNoError { + c.aclDeletionQueue.waitChans[i] <- r.Err + } else { + c.aclDeletionQueue.waitChans[i] <- nil + } + } + }) + + c.aclDeletionQueue.mutex.Unlock() + return <-waitChan +} + func (c *Client) DeleteACL(s StringlyTypedACL) error { - log.Printf("[INFO] Deleting ACL %v", s) broker, err := c.client.Controller() if err != nil { return err } - aclsBeforeDelete, err := c.ListACLs() - if err != nil { - return fmt.Errorf("Unable to list acls before deleting -- can't be sure we're doing the right thing: %s", err) - } - - log.Printf("[INFO] Acls before deletion: %d", len(aclsBeforeDelete)) - for _, acl := range aclsBeforeDelete { - log.Printf("[DEBUG] ACL: %v", acl) - } filter, err := tfToAclFilter(s) if err != nil { return err } - req := &sarama.DeleteAclsRequest{ - Version: int(c.getDeleteAclsRequestAPIVersion()), - Filters: []*sarama.AclFilter{&filter}, - } + err = c.enqueueDeleteACL(broker, &filter) - res, err := broker.DeleteAcls(req) if err != nil { return err } - for _, r := range res.FilterResponses { - if r.Err != sarama.ErrNoError { - return r.Err - } - } return nil } +func (c *Client) enqueueCreateACL(broker *sarama.Broker, create *sarama.AclCreation) error { + c.aclCreationQueue.mutex.Lock() + if c.aclCreationQueue.timer != nil { + c.aclCreationQueue.timer.Stop() + } + log.Printf("[DEBUG] Enqueueing ACL Creation %v", create.Acl) + c.aclCreationQueue.creations = append(c.aclCreationQueue.creations, create) + c.aclCreationQueue.waitChans = append(c.aclCreationQueue.waitChans, make(chan error)) + + var waitChan = c.aclCreationQueue.waitChans[len(c.aclCreationQueue.waitChans)-1] + c.aclCreationQueue.timer = time.AfterFunc(c.aclCreationQueue.after, func() { + c.aclCreationQueue.mutex.Lock() + defer c.aclCreationQueue.mutex.Unlock() + log.Printf("[INFO] Creating ACLs %v", c.aclCreationQueue.creations) + defer func() { + c.aclCreationQueue.timer = nil + c.aclCreationQueue.creations = nil + c.aclCreationQueue.waitChans = nil + }() + req := &sarama.CreateAclsRequest{ + Version: c.getCreateAclsRequestAPIVersion(), + AclCreations: c.aclCreationQueue.creations, + } + + res, err := broker.CreateAcls(req) + if err != nil { + for _, wc := range c.aclCreationQueue.waitChans { + wc <- err + } + return + } + if len(res.AclCreationResponses) != len(c.aclCreationQueue.waitChans) { + for _, ch := range c.aclCreationQueue.waitChans { + ch <- fmt.Errorf("unexpectedly got a different length (%d) of AclCreationResponses compared to queued requests (%d) - this shouldn't be possible", len(res.AclCreationResponses), len(c.aclCreationQueue.waitChans)) + } + return + } + + c.aclCache.mutex.Lock() + c.aclCache.valid = false + c.aclCache.acls = nil + c.aclCache.mutex.Unlock() + + for i, r := range res.AclCreationResponses { + if r.Err != sarama.ErrNoError { + c.aclCreationQueue.waitChans[i] <- r.Err + } else { + c.aclCreationQueue.waitChans[i] <- nil + } + } + + }) + + c.aclCreationQueue.mutex.Unlock() + return <-waitChan +} + func (c *Client) CreateACL(s StringlyTypedACL) error { - log.Printf("[DEBUG] Creating ACL %s", s) broker, err := c.client.Controller() if err != nil { return err @@ -165,21 +259,12 @@ func (c *Client) CreateACL(s StringlyTypedACL) error { if err != nil { return err } - req := &sarama.CreateAclsRequest{ - Version: c.getCreateAclsRequestAPIVersion(), - AclCreations: []*sarama.AclCreation{ac}, - } - res, err := broker.CreateAcls(req) + err = c.enqueueCreateACL(broker, ac) if err != nil { return err } - for _, r := range res.AclCreationResponses { - if r.Err != sarama.ErrNoError { - return r.Err - } - } log.Printf("[DEBUG] Created ACL %s", s) return nil @@ -350,6 +435,16 @@ func (c *Client) DescribeACLs(s StringlyTypedACL) ([]*sarama.ResourceAcls, error } func (c *Client) ListACLs() ([]*sarama.ResourceAcls, error) { + c.aclCache.mutex.RLock() + if c.aclCache.valid { + c.aclCache.mutex.RUnlock() + log.Printf("[INFO] Using cached ACL list") + return c.aclCache.acls, nil + } + c.aclCache.mutex.RUnlock() + + c.aclCache.mutex.Lock() + defer c.aclCache.mutex.Unlock() log.Printf("[INFO] Listing all ACLS") broker, err := c.client.Controller() if err != nil { @@ -414,6 +509,7 @@ func (c *Client) ListACLs() ([]*sarama.ResourceAcls, error) { res = append(res, aclsR.ResourceAcls...) } - + c.aclCache.valid = true + c.aclCache.acls = res return res, err }