Skip to content

Commit

Permalink
ext: lock all drivers
Browse files Browse the repository at this point in the history
  • Loading branch information
jzelinskie committed Jan 23, 2017
1 parent 78cef02 commit f9b3190
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 43 deletions.
38 changes: 32 additions & 6 deletions ext/notification/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package notification

import (
"sync"

"github.com/coreos/pkg/capnslog"

"github.com/coreos/clair/config"
Expand All @@ -30,8 +32,8 @@ import (
var (
log = capnslog.NewPackageLogger("github.com/coreos/clair", "ext/notification")

// Senders is the list of registered Senders.
Senders = make(map[string]Sender)
sendersM sync.RWMutex
senders = make(map[string]Sender)
)

// Sender represents anything that can transmit notifications.
Expand All @@ -46,8 +48,8 @@ type Sender interface {

// RegisterSender makes a Sender available by the provided name.
//
// If RegisterSender is called twice with the same name, the name is blank, or
// if the provided Sender is nil, this function panics.
// If called twice with the same name, the name is blank, or if the provided
// Sender is nil, this function panics.
func RegisterSender(name string, s Sender) {
if name == "" {
panic("notification: could not register a Sender with an empty name")
Expand All @@ -57,9 +59,33 @@ func RegisterSender(name string, s Sender) {
panic("notification: could not register a nil Sender")
}

if _, dup := Senders[name]; dup {
sendersM.Lock()
defer sendersM.Unlock()

if _, dup := senders[name]; dup {
panic("notification: RegisterSender called twice for " + name)
}

Senders[name] = s
senders[name] = s
}

// Senders returns the list of the registered Senders.
func Senders() map[string]Sender {
sendersM.RLock()
defer sendersM.RUnlock()

ret := make(map[string]Sender)
for k, v := range senders {
ret[k] = v
}

return ret
}

// UnregisterSender removes a Sender with a particular name from the list.
func UnregisterSender(name string) {
sendersM.Lock()
defer sendersM.Unlock()

delete(senders, name)
}
10 changes: 6 additions & 4 deletions ext/versionfmt/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,20 @@ type Parser interface {
// if the provided Parser is nil, this function panics.
func RegisterParser(name string, p Parser) {
if name == "" {
panic("Could not register a Parser with an empty name")
panic("versionfmt: could not register a Parser with an empty name")
}

if p == nil {
panic("Could not register a nil Parser")
panic("versionfmt: could not register a nil Parser")
}

parsersM.Lock()
defer parsersM.Unlock()

if _, alreadyExists := parsers[name]; alreadyExists {
panic("Parser '" + name + "' is already registered")
if _, dup := parsers[name]; dup {
panic("versionfmt: RegisterParser called twice for " + name)
}

parsers[name] = p
}

Expand Down
35 changes: 28 additions & 7 deletions ext/vulnmdsrc/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
package vulnmdsrc

import (
"sync"

"github.com/coreos/clair/database"
"github.com/coreos/clair/utils/types"
)

// Appenders is the list of registered Appenders.
var Appenders = make(map[string]Appender)
var (
appendersM sync.RWMutex
appenders = make(map[string]Appender)
)

// AppendFunc is the type of a callback provided to an Appender.
type AppendFunc func(metadataKey string, metadata interface{}, severity types.Priority)
Expand All @@ -49,20 +53,37 @@ type Appender interface {
}

// RegisterAppender makes an Appender available by the provided name.
// If Register is called twice with the same name or if driver is nil,
// it panics.
//
// If called twice with the same name, the name is blank, or if the provided
// Appender is nil, this function panics.
func RegisterAppender(name string, a Appender) {
if name == "" {
panic("updater: could not register an Appender with an empty name")
panic("vulnmdsrc: could not register an Appender with an empty name")
}

if a == nil {
panic("vulnmdsrc: could not register a nil Appender")
}

if _, dup := Appenders[name]; dup {
appendersM.Lock()
defer appendersM.Unlock()

if _, dup := appenders[name]; dup {
panic("vulnmdsrc: RegisterAppender called twice for " + name)
}

Appenders[name] = a
appenders[name] = a
}

// Appenders returns the list of the registered Appenders.
func Appenders() map[string]Appender {
appendersM.RLock()
defer appendersM.RUnlock()

ret := make(map[string]Appender)
for k, v := range appenders {
ret[k] = v
}

return ret
}
47 changes: 32 additions & 15 deletions ext/vulnsrc/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,30 @@ package vulnsrc

import (
"errors"
"sync"

"github.com/coreos/clair/database"
)

var (
// Updaters is the list of registered Updaters.
Updaters = make(map[string]Updater)

// ErrFilesystem is returned when a fetcher fails to interact with the local filesystem.
ErrFilesystem = errors.New("vulnsrc: something went wrong when interacting with the fs")

// ErrGitFailure is returned when a fetcher fails to interact with git.
ErrGitFailure = errors.New("vulnsrc: something went wrong when interacting with git")

updatersM sync.RWMutex
updaters = make(map[string]Updater)
)

// UpdateResponse represents the sum of results of an update.
type UpdateResponse struct {
FlagName string
FlagValue string
Notes []string
Vulnerabilities []database.Vulnerability
}

// Updater represents anything that can fetch vulnerabilities and insert them
// into a Clair datastore.
type Updater interface {
Expand All @@ -44,18 +53,10 @@ type Updater interface {
Clean()
}

// UpdateResponse represents the sum of results of an update.
type UpdateResponse struct {
FlagName string
FlagValue string
Notes []string
Vulnerabilities []database.Vulnerability
}

// RegisterUpdater makes an Updater available by the provided name.
//
// If RegisterUpdater is called twice with the same name, the name is blank, or
// if the provided Updater is nil, this function panics.
// If called twice with the same name, the name is blank, or if the provided
// Updater is nil, this function panics.
func RegisterUpdater(name string, u Updater) {
if name == "" {
panic("vulnsrc: could not register an Updater with an empty name")
Expand All @@ -65,9 +66,25 @@ func RegisterUpdater(name string, u Updater) {
panic("vulnsrc: could not register a nil Updater")
}

if _, dup := Updaters[name]; dup {
updatersM.Lock()
defer updatersM.Unlock()

if _, dup := updaters[name]; dup {
panic("vulnsrc: RegisterUpdater called twice for " + name)
}

Updaters[name] = u
updaters[name] = u
}

// Updaters returns the list of the registered Updaters.
func Updaters() map[string]Updater {
updatersM.RLock()
defer updatersM.RUnlock()

ret := make(map[string]Updater)
for k, v := range updaters {
ret[k] = v
}

return ret
}
8 changes: 4 additions & 4 deletions notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,19 @@ func Run(config *config.NotifierConfig, datastore database.Datastore, stopper *u
defer stopper.End()

// Configure registered notifiers.
for senderName, sender := range notification.Senders {
for senderName, sender := range notification.Senders() {
if configured, err := sender.Configure(config); configured {
log.Infof("sender '%s' configured\n", senderName)
} else {
delete(notification.Senders, senderName)
notification.UnregisterSender(senderName)
if err != nil {
log.Errorf("could not configure notifier '%s': %s", senderName, err)
}
}
}

// Do not run the updater if there is no notifier enabled.
if len(notification.Senders) == 0 {
if len(notification.Senders()) == 0 {
log.Infof("notifier service is disabled")
return
}
Expand Down Expand Up @@ -149,7 +149,7 @@ func findTask(datastore database.Datastore, renotifyInterval time.Duration, whoA

func handleTask(n database.VulnerabilityNotification, st *utils.Stopper, maxAttempts int) (bool, bool) {
// Send notification.
for senderName, sender := range notification.Senders {
for senderName, sender := range notification.Senders() {
var attempts int
var backOff time.Duration
for {
Expand Down
14 changes: 7 additions & 7 deletions updater/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,10 @@ func Run(config *config.UpdaterConfig, datastore database.Datastore, st *utils.S
}

// Clean resources.
for _, appenders := range vulnmdsrc.Appenders {
for _, appenders := range vulnmdsrc.Appenders() {
appenders.Clean()
}
for _, updaters := range vulnsrc.Updaters {
for _, updaters := range vulnsrc.Updaters() {
updaters.Clean()
}

Expand Down Expand Up @@ -215,7 +215,7 @@ func fetch(datastore database.Datastore) (bool, []database.Vulnerability, map[st
// Fetch updates in parallel.
log.Info("fetching vulnerability updates")
var responseC = make(chan *vulnsrc.UpdateResponse, 0)
for n, u := range vulnsrc.Updaters {
for n, u := range vulnsrc.Updaters() {
go func(name string, u vulnsrc.Updater) {
response, err := u.Update(datastore)
if err != nil {
Expand All @@ -231,7 +231,7 @@ func fetch(datastore database.Datastore) (bool, []database.Vulnerability, map[st
}

// Collect results of updates.
for i := 0; i < len(vulnsrc.Updaters); i++ {
for i := 0; i < len(vulnsrc.Updaters()); i++ {
resp := <-responseC
if resp != nil {
vulnerabilities = append(vulnerabilities, doVulnerabilitiesNamespacing(resp.Vulnerabilities)...)
Expand All @@ -248,7 +248,7 @@ func fetch(datastore database.Datastore) (bool, []database.Vulnerability, map[st

// Add metadata to the specified vulnerabilities using the registered MetadataFetchers, in parallel.
func addMetadata(datastore database.Datastore, vulnerabilities []database.Vulnerability) []database.Vulnerability {
if len(vulnmdsrc.Appenders) == 0 {
if len(vulnmdsrc.Appenders()) == 0 {
return vulnerabilities
}

Expand All @@ -264,9 +264,9 @@ func addMetadata(datastore database.Datastore, vulnerabilities []database.Vulner
}

var wg sync.WaitGroup
wg.Add(len(vulnmdsrc.Appenders))
wg.Add(len(vulnmdsrc.Appenders()))

for n, a := range vulnmdsrc.Appenders {
for n, a := range vulnmdsrc.Appenders() {
go func(name string, appender vulnmdsrc.Appender) {
defer wg.Done()

Expand Down

0 comments on commit f9b3190

Please sign in to comment.