Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update plugin ordering to latest Sonobuoy version #1753

Merged
merged 1 commit into from
Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 47 additions & 3 deletions pkg/plugin/aggregation/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"net"
"net/http"
"path/filepath"
"sort"
"sync"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -190,9 +192,27 @@ func Run(client kubernetes.Interface, plugins []plugin.Interface, cfg plugin.Agg
return errors.Wrapf(err, "couldn't get aggregator pod")
}

for _, p := range plugins {
logrus.WithField("plugin", p.GetName()).Info("Running plugin")
go aggr.RunAndMonitorPlugin(context.Background(), time.Duration(cfg.TimeoutSeconds)*time.Second, p, client, nodes.Items, cfg.AdvertiseAddress, certs[p.GetName()], aggregatorPod, progressPort, pluginResultsDir)
orderedPlugins := getOrderedPlugins(plugins)

logrus.Info("Received plugin launch order:")
for _, pluginGroup := range orderedPlugins {
for _, plugin := range pluginGroup {
logrus.Infof("%v: %v", plugin.GetName(), plugin.GetOrder())
}
}

for _, pluginGroup := range orderedPlugins {
var waitGroup sync.WaitGroup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 to using a WaitGroup. As someone new to Go I'm really happy to see you explored the options and found this. It really simplifies the logic.

waitGroup.Add(len(pluginGroup))
for _, plug := range pluginGroup {
logrus.WithField("plugin", plug.GetName()).Info("Running plugin")
go func(plugin plugin.Interface) {
logrus.Infof("Launching plugin %v with order %v", plugin.GetName(), plugin.GetOrder())
aggr.RunAndMonitorPlugin(context.Background(), time.Duration(cfg.TimeoutSeconds)*time.Second, plugin, client, nodes.Items, cfg.AdvertiseAddress, certs[plugin.GetName()], aggregatorPod, progressPort, pluginResultsDir)
waitGroup.Done()
}(plug)
}
waitGroup.Wait()
}

// 6. Wait for aggr to show that all results are accounted for
Expand All @@ -212,6 +232,30 @@ func Run(client kubernetes.Interface, plugins []plugin.Interface, cfg plugin.Agg
}
}

func getOrderedPlugins(plugins []plugin.Interface) [][]plugin.Interface {
andrewyunt marked this conversation as resolved.
Show resolved Hide resolved
pluginGroups := map[int][]plugin.Interface{}
for _, p := range plugins {
pluginGroups[p.GetOrder()] = append(pluginGroups[p.GetOrder()], p)
}

orderedPlugins := [][]plugin.Interface{}
for _, pluginGroup := range pluginGroups {
orderedPlugins = append(orderedPlugins, pluginGroup)
}

sort.Slice(orderedPlugins, func(i, j int) bool {
return orderedPlugins[i][0].GetOrder() < orderedPlugins[j][0].GetOrder()
})

for _, plugins := range orderedPlugins {
sort.Slice(plugins, func(i, j int) bool {
return plugins[i].GetName() < plugins[j].GetName()
})
}

return orderedPlugins
}

// Cleanup calls cleanup on all plugins
func Cleanup(client kubernetes.Interface, plugins []plugin.Interface) {
for _, p := range plugins {
Expand Down
192 changes: 151 additions & 41 deletions pkg/plugin/aggregation/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io/ioutil"
"math/big"
"os"
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -182,95 +183,204 @@ func TestRunAndMonitorPlugin(t *testing.T) {
}
}

func getTestCert() (*tls.Certificate, error) {
privKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
return nil, errors.Wrap(err, "couldn't generate private key")
}
tmpl := &x509.Certificate{
SerialNumber: big.NewInt(0),
}
certDER, err := x509.CreateCertificate(rand.Reader, tmpl, tmpl, &privKey.PublicKey, privKey)
if err != nil {
return nil, errors.Wrap(err, "couldn't create certificate")
}

return &tls.Certificate{
Certificate: [][]byte{certDER},
PrivateKey: privKey,
}, nil
}

type MockCleanupPlugin struct {
skipCleanup bool
cleanedUp bool
}

func (cp *MockCleanupPlugin) Run(_ kubernetes.Interface, _ string, _ *tls.Certificate, _ *corev1.Pod, _, _ string) error {
func (cp *MockRunPlugin) Run(_ kubernetes.Interface, _ string, _ *tls.Certificate, _ *corev1.Pod, _, _ string) error {
return nil
}

func (cp *MockCleanupPlugin) Cleanup(_ kubernetes.Interface) {
func (cp *MockRunPlugin) Cleanup(_ kubernetes.Interface) {
cp.cleanedUp = true
}

func (cp *MockCleanupPlugin) Monitor(_ context.Context, _ kubernetes.Interface, _ []corev1.Node, _ chan<- *plugin.Result) {
func (cp *MockRunPlugin) Monitor(_ context.Context, _ kubernetes.Interface, _ []corev1.Node, _ chan<- *plugin.Result) {
return
}

func (cp *MockCleanupPlugin) ExpectedResults(_ []corev1.Node) []plugin.ExpectedResult {
func (cp *MockRunPlugin) ExpectedResults(_ []corev1.Node) []plugin.ExpectedResult {
return []plugin.ExpectedResult{}
}

func (cp *MockCleanupPlugin) FillTemplate(_ string, _ *tls.Certificate) ([]byte, error) {
func (cp *MockRunPlugin) FillTemplate(_ string, _ *tls.Certificate) ([]byte, error) {
return []byte{}, nil
}

func (cp *MockCleanupPlugin) GetName() string {
return "mock-cleanup-plugin"
func (cp *MockRunPlugin) GetName() string {
return cp.name
}

func (cp *MockCleanupPlugin) SkipCleanup() bool {
func (cp *MockRunPlugin) SkipCleanup() bool {
return cp.skipCleanup
}

func (cp *MockCleanupPlugin) GetResultFormat() string {
func (cp *MockRunPlugin) GetResultFormat() string {
return ""
}

func (cp *MockCleanupPlugin) GetResultFiles() []string {
func (cp *MockRunPlugin) GetResultFiles() []string {
return []string{}
}

func (cp *MockCleanupPlugin) GetDescription() string {
func (cp *MockRunPlugin) GetDescription() string {
return "A mock plugin used for testing purposes"
}

func (cp *MockCleanupPlugin) GetSourceURL() string {
func (cp *MockRunPlugin) GetSourceURL() string {
return ""
}

func (cp *MockRunPlugin) GetOrder() int {
return cp.order
}

type MockRunPlugin struct {
skipCleanup bool
cleanedUp bool
order int
name string
}

func TestGetOrderedPlugins(t *testing.T) {
testCases := []struct {
desc string
input []plugin.Interface
expected [][]plugin.Interface
}{
{
desc: "Ensure lists of plugin.Interface are ordered by plugin order",
andrewyunt marked this conversation as resolved.
Show resolved Hide resolved
input: []plugin.Interface{
&MockRunPlugin{
order: 1,
},
&MockRunPlugin{
order: 0,
},
&MockRunPlugin{
order: 1,
},
&MockRunPlugin{
order: 1,
},
&MockRunPlugin{
order: 2,
},
},
expected: [][]plugin.Interface{
{
&MockRunPlugin{
order: 0,
},
},
{
&MockRunPlugin{
order: 1,
},
&MockRunPlugin{
order: 1,
},
&MockRunPlugin{
order: 1,
},
},
{
&MockRunPlugin{
order: 2,
},
},
},
},
{
desc: "Ensure lists of plugin.Interface are sorted by plugin name",
input: []plugin.Interface{
&MockRunPlugin{
order: 0,
name: "Test plugin",
},
&MockRunPlugin{
order: 0,
name: "Another ordering test plugin",
},
&MockRunPlugin{
order: 0,
name: "Ordering test plugin",
},
},
expected: [][]plugin.Interface{
{
&MockRunPlugin{
order: 0,
name: "Another ordering test plugin",
},
&MockRunPlugin{
order: 0,
name: "Ordering test plugin",
},
&MockRunPlugin{
order: 0,
name: "Test plugin",
},
},
},
},
{
desc: "Ensure empty slice of plugin.Interface returns correct result",
input: []plugin.Interface{},
expected: [][]plugin.Interface{},
},
{
desc: "Ensure null input returns correct result",
input: nil,
expected: [][]plugin.Interface{},
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
if !reflect.DeepEqual(getOrderedPlugins(tc.input), tc.expected) {
t.Errorf("Expected output: %v", tc.expected)
}
})
}
}

func getTestCert() (*tls.Certificate, error) {
andrewyunt marked this conversation as resolved.
Show resolved Hide resolved
privKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
return nil, errors.Wrap(err, "couldn't generate private key")
}
tmpl := &x509.Certificate{
SerialNumber: big.NewInt(0),
}
certDER, err := x509.CreateCertificate(rand.Reader, tmpl, tmpl, &privKey.PublicKey, privKey)
if err != nil {
return nil, errors.Wrap(err, "couldn't create certificate")
}

return &tls.Certificate{
Certificate: [][]byte{certDER},
PrivateKey: privKey,
}, nil
}

func TestCleanup(t *testing.T) {
createPlugin := func(skipCleanup bool) *MockCleanupPlugin {
return &MockCleanupPlugin{
createPlugin := func(skipCleanup bool) *MockRunPlugin {
return &MockRunPlugin{
skipCleanup: skipCleanup,
cleanedUp: false,
}
}

testCases := []struct {
desc string
plugins []*MockCleanupPlugin
plugins []*MockRunPlugin
expectedCleanedUpValues []bool
}{
{
desc: "plugins without skip cleanup are all cleaned up",
plugins: []*MockCleanupPlugin{createPlugin(false), createPlugin(false)},
plugins: []*MockRunPlugin{createPlugin(false), createPlugin(false)},
expectedCleanedUpValues: []bool{true, true},
},
{
desc: "plugins with skip cleanup are not cleaned up",
plugins: []*MockCleanupPlugin{createPlugin(true), createPlugin(false), createPlugin(true)},
plugins: []*MockRunPlugin{createPlugin(true), createPlugin(false), createPlugin(true)},
expectedCleanedUpValues: []bool{false, true, false},
},
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/plugin/driver/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func (b *Base) GetName() string {
return b.Definition.SonobuoyConfig.PluginName
}

// GetOrder returns the order of this Job plugin.
func (b *Base) GetOrder() int {
return b.Definition.SonobuoyConfig.Order
}

// GetSecretName gets a name for a secret based on the plugin name and session ID.
func (b *Base) GetSecretName() string {
return fmt.Sprintf("sonobuoy-plugin-%s-%s", b.GetName(), b.GetSessionID())
Expand Down
1 change: 1 addition & 0 deletions pkg/plugin/driver/daemonset/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (p *Plugin) ExpectedResults(nodes []v1.Node) []plugin.ExpectedResult {
ret = append(ret, plugin.ExpectedResult{
NodeName: node.Name,
ResultType: p.GetName(),
Order: p.Definition.SonobuoyConfig.Order,
})
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/plugin/driver/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ func NewPlugin(dfn manifest.Manifest, namespace, sonobuoyImage, imagePullPolicy,
// a Job only launches one pod, only one result type is expected.
func (p *Plugin) ExpectedResults(nodes []v1.Node) []plugin.ExpectedResult {
return []plugin.ExpectedResult{
{ResultType: p.GetName(), NodeName: plugin.GlobalResult},
{
ResultType: p.GetName(),
NodeName: plugin.GlobalResult,
Order: p.Definition.SonobuoyConfig.Order,
},
}
}

Expand Down
Loading