Skip to content

Commit

Permalink
feat: wip on plugin system using hasicorp go-plugin
Browse files Browse the repository at this point in the history
Signed-off-by: zachaller <[email protected]>
  • Loading branch information
zachaller committed Jan 16, 2023
1 parent 72ab46d commit 2b1e125
Show file tree
Hide file tree
Showing 26 changed files with 1,929 additions and 550 deletions.
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -279,3 +279,10 @@ trivy:
.PHONY: checksums
checksums:
shasum -a 256 ./dist/kubectl-argo-rollouts-* | awk -F './dist/' '{print $$1 $$2}' > ./dist/argo-rollouts-checksums.txt

# Build sample plugin with debug info
# https://www.jetbrains.com/help/go/attach-to-running-go-processes-with-debugger.html
.PHONY: build-sample-plugin-debug
build-sample-plugin-debug:
go build -gcflags="all=-N -l" -o metrics-plugin cmd/sample-metrics-plugin/main.go

15 changes: 13 additions & 2 deletions cmd/rollouts-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"strings"
"time"

"github.com/argoproj/argo-rollouts/utils/plugin"

"github.com/argoproj/pkg/kubeclientmetrics"
smiclientset "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand All @@ -20,8 +23,6 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
"k8s.io/client-go/tools/clientcmd"

"github.com/argoproj/pkg/kubeclientmetrics"

"github.com/argoproj/argo-rollouts/controller"
"github.com/argoproj/argo-rollouts/controller/metrics"
jobprovider "github.com/argoproj/argo-rollouts/metricproviders/job"
Expand Down Expand Up @@ -70,6 +71,8 @@ func newCommand() *cobra.Command {
awsVerifyTargetGroup bool
namespaced bool
printVersion bool
metricPluginLocation string
metricPluginSha256 string
)
electOpts := controller.NewLeaderElectionOptions()
var command = cobra.Command{
Expand Down Expand Up @@ -199,6 +202,12 @@ func newCommand() *cobra.Command {
controllerNamespaceInformerFactory,
jobInformerFactory)

defaults.SetMetricPluginLocation(metricPluginLocation)
err = plugin.InitMetricsPlugin(metricPluginLocation, plugin.FileDownloaderImpl{}, metricPluginSha256)
if err != nil {
log.Fatalf("Failed to init metric plugin: %v", err)
}

if err = cm.Run(ctx, rolloutThreads, serviceThreads, ingressThreads, experimentThreads, analysisThreads, electOpts); err != nil {
log.Fatalf("Error running controller: %s", err.Error())
}
Expand Down Expand Up @@ -240,6 +249,8 @@ func newCommand() *cobra.Command {
command.Flags().DurationVar(&electOpts.LeaderElectionLeaseDuration, "leader-election-lease-duration", controller.DefaultLeaderElectionLeaseDuration, "The duration that non-leader candidates will wait after observing a leadership renewal until attempting to acquire leadership of a led but unrenewed leader slot. This is effectively the maximum duration that a leader can be stopped before it is replaced by another candidate. This is only applicable if leader election is enabled.")
command.Flags().DurationVar(&electOpts.LeaderElectionRenewDeadline, "leader-election-renew-deadline", controller.DefaultLeaderElectionRenewDeadline, "The interval between attempts by the acting master to renew a leadership slot before it stops leading. This must be less than or equal to the lease duration. This is only applicable if leader election is enabled.")
command.Flags().DurationVar(&electOpts.LeaderElectionRetryPeriod, "leader-election-retry-period", controller.DefaultLeaderElectionRetryPeriod, "The duration the clients should wait between attempting acquisition and renewal of a leadership. This is only applicable if leader election is enabled.")
command.Flags().StringVar(&metricPluginLocation, "metric-plugin-location", defaults.DefaultMetricsPluginLocation, "The file path to the location of the metric plugin binary")
command.Flags().StringVar(&metricPluginSha256, "metric-plugin-sha256", "", "The expected sha256 of the metric plugin binary")
return &command
}

Expand Down
183 changes: 183 additions & 0 deletions cmd/sample-metrics-plugin/internal/plugin/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package plugin

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
"time"

"github.com/argoproj/argo-rollouts/metricproviders/plugin"
"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/utils/evaluate"
metricutil "github.com/argoproj/argo-rollouts/utils/metric"
timeutil "github.com/argoproj/argo-rollouts/utils/time"
"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
log "github.com/sirupsen/logrus"
)

const EnvVarArgoRolloutsPrometheusAddress string = "ARGO_ROLLOUTS_PROMETHEUS_ADDRESS"

// Here is a real implementation of MetricsPlugin
type RpcPlugin struct {
LogCtx log.Entry
api v1.API
}

type Config struct {
// Address is the HTTP address and port of the prometheus server
Address string `json:"address,omitempty" protobuf:"bytes,1,opt,name=address"`
// Query is a raw prometheus query to perform
Query string `json:"query,omitempty" protobuf:"bytes,2,opt,name=query"`
}

func (g *RpcPlugin) NewMetricsPlugin(metric v1alpha1.Metric) error {
config := Config{}
err := json.Unmarshal(metric.Provider.Plugin.Config, &config)
if err != nil {
return err
}

api, err := newPrometheusAPI(config.Address)
g.api = api

return err
}

func (g *RpcPlugin) Run(anaysisRun *v1alpha1.AnalysisRun, metric v1alpha1.Metric) v1alpha1.Measurement {
startTime := timeutil.MetaNow()
newMeasurement := v1alpha1.Measurement{
StartedAt: &startTime,
}

config := Config{}
json.Unmarshal(metric.Provider.Plugin.Config, &config)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

response, warnings, err := g.api.Query(ctx, config.Query, time.Now())
if err != nil {
return metricutil.MarkMeasurementError(newMeasurement, err)
}

newValue, newStatus, err := g.processResponse(metric, response)
if err != nil {
return metricutil.MarkMeasurementError(newMeasurement, err)

}
newMeasurement.Value = newValue
if len(warnings) > 0 {
warningMetadata := ""
for _, warning := range warnings {
warningMetadata = fmt.Sprintf(`%s"%s", `, warningMetadata, warning)
}
warningMetadata = warningMetadata[:len(warningMetadata)-2]
if warningMetadata != "" {
newMeasurement.Metadata = map[string]string{"warnings": warningMetadata}
g.LogCtx.Warnf("Prometheus returned the following warnings: %s", warningMetadata)
}
}

newMeasurement.Phase = newStatus
finishedTime := timeutil.MetaNow()
newMeasurement.FinishedAt = &finishedTime
return newMeasurement
}

func (g *RpcPlugin) Resume(analysisRun *v1alpha1.AnalysisRun, metric v1alpha1.Metric, measurement v1alpha1.Measurement) v1alpha1.Measurement {
return measurement
}

func (g *RpcPlugin) Terminate(analysisRun *v1alpha1.AnalysisRun, metric v1alpha1.Metric, measurement v1alpha1.Measurement) v1alpha1.Measurement {
return measurement
}

func (g *RpcPlugin) GarbageCollect(*v1alpha1.AnalysisRun, v1alpha1.Metric, int) error {
return nil
}

func (g *RpcPlugin) Type() string {
return plugin.ProviderType
}

func (g *RpcPlugin) GetMetadata(metric v1alpha1.Metric) map[string]string {
metricsMetadata := make(map[string]string)

config := Config{}
json.Unmarshal(metric.Provider.Plugin.Config, &config)
if config.Query != "" {
metricsMetadata["ResolvedPrometheusQuery"] = config.Query
}
return metricsMetadata
}

func (g *RpcPlugin) processResponse(metric v1alpha1.Metric, response model.Value) (string, v1alpha1.AnalysisPhase, error) {
switch value := response.(type) {
case *model.Scalar:
valueStr := value.Value.String()
result := float64(value.Value)
newStatus, err := evaluate.EvaluateResult(result, metric, g.LogCtx)
return valueStr, newStatus, err
case model.Vector:
results := make([]float64, 0, len(value))
valueStr := "["
for _, s := range value {
if s != nil {
valueStr = valueStr + s.Value.String() + ","
results = append(results, float64(s.Value))
}
}
// if we appended to the string, we should remove the last comma on the string
if len(valueStr) > 1 {
valueStr = valueStr[:len(valueStr)-1]
}
valueStr = valueStr + "]"
newStatus, err := evaluate.EvaluateResult(results, metric, g.LogCtx)
return valueStr, newStatus, err
default:
return "", v1alpha1.AnalysisPhaseError, fmt.Errorf("Prometheus metric type not supported")
}
}

func newPrometheusAPI(address string) (v1.API, error) {
envValuesByKey := make(map[string]string)
if value, ok := os.LookupEnv(fmt.Sprintf("%s", EnvVarArgoRolloutsPrometheusAddress)); ok {
envValuesByKey[EnvVarArgoRolloutsPrometheusAddress] = value
log.Debugf("ARGO_ROLLOUTS_PROMETHEUS_ADDRESS: %v", envValuesByKey[EnvVarArgoRolloutsPrometheusAddress])
}
if len(address) != 0 {
if !isUrl(address) {
return nil, errors.New("prometheus address is not is url format")
}
} else if envValuesByKey[EnvVarArgoRolloutsPrometheusAddress] != "" {
if isUrl(envValuesByKey[EnvVarArgoRolloutsPrometheusAddress]) {
address = envValuesByKey[EnvVarArgoRolloutsPrometheusAddress]
} else {
return nil, errors.New("prometheus address is not is url format")
}
} else {
return nil, errors.New("prometheus address is not configured")
}
client, err := api.NewClient(api.Config{
Address: address,
})
if err != nil {
log.Errorf("Error in getting prometheus client: %v", err)
return nil, err
}
return v1.NewAPI(client), nil
}

func isUrl(str string) bool {
u, err := url.Parse(str)
if err != nil {
log.Errorf("Error in parsing url: %v", err)
}
log.Debugf("Parsed url: %v", u)
return err == nil && u.Scheme != "" && u.Host != ""
}
107 changes: 107 additions & 0 deletions cmd/sample-metrics-plugin/internal/plugin/plugin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package plugin

import (
"context"
"encoding/json"
"testing"
"time"

rolloutsPlugin "github.com/argoproj/argo-rollouts/metricproviders/plugin"
"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
log "github.com/sirupsen/logrus"

goPlugin "github.com/hashicorp/go-plugin"
)

var testHandshake = goPlugin.HandshakeConfig{
ProtocolVersion: 1,
MagicCookieKey: "ARGO_ROLLOUTS_RPC_PLUGIN",
MagicCookieValue: "metrics",
}

// This is just an example of how to test a plugin.
func TestRunSuccessfully(t *testing.T) {
//Skip test because this is just an example of how to test a plugin.
t.Skip("Skipping test because it requires a running prometheus server")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

logCtx := *log.WithFields(log.Fields{"plugin-test": "prometheus"})

rpcPluginImp := &RpcPlugin{
LogCtx: logCtx,
}

// pluginMap is the map of plugins we can dispense.
var pluginMap = map[string]goPlugin.Plugin{
"RpcMetricsPlugin": &rolloutsPlugin.RpcMetricsPlugin{Impl: rpcPluginImp},
}

ch := make(chan *goPlugin.ReattachConfig, 1)
closeCh := make(chan struct{})
go goPlugin.Serve(&goPlugin.ServeConfig{
HandshakeConfig: testHandshake,
Plugins: pluginMap,
Test: &goPlugin.ServeTestConfig{
Context: ctx,
ReattachConfigCh: ch,
CloseCh: closeCh,
},
})

// We should get a config
var config *goPlugin.ReattachConfig
select {
case config = <-ch:
case <-time.After(2000 * time.Millisecond):
t.Fatal("should've received reattach")
}
if config == nil {
t.Fatal("config should not be nil")
}

// Connect!
c := goPlugin.NewClient(&goPlugin.ClientConfig{
Cmd: nil,
HandshakeConfig: testHandshake,
Plugins: pluginMap,
Reattach: config,
})
client, err := c.Client()
if err != nil {
t.Fatalf("err: %s", err)
}

// Pinging should work
if err := client.Ping(); err != nil {
t.Fatalf("should not err: %s", err)
}

// Kill which should do nothing
c.Kill()
if err := client.Ping(); err != nil {
t.Fatalf("should not err: %s", err)
}

// Request the plugin
raw, err := client.Dispense("RpcMetricsPlugin")
if err != nil {
t.Fail()
}

plugin := raw.(rolloutsPlugin.MetricsPlugin)

err = plugin.NewMetricsPlugin(v1alpha1.Metric{
Provider: v1alpha1.MetricProvider{
Plugin: &v1alpha1.PluginMetric{Config: json.RawMessage(`{"address":"http://prometheus.local", "query":"machine_cpu_cores"}`)},
},
})
if err != nil {
t.Fail()
}

// Canceling should cause an exit
cancel()
<-closeCh
}
37 changes: 37 additions & 0 deletions cmd/sample-metrics-plugin/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package main

import (
"github.com/argoproj/argo-rollouts/cmd/sample-metrics-plugin/internal/plugin"
rolloutsPlugin "github.com/argoproj/argo-rollouts/metricproviders/plugin"
goPlugin "github.com/hashicorp/go-plugin"
log "github.com/sirupsen/logrus"
)

// handshakeConfigs are used to just do a basic handshake between
// a plugin and host. If the handshake fails, a user friendly error is shown.
// This prevents users from executing bad plugins or executing a plugin
// directory. It is a UX feature, not a security feature.
var handshakeConfig = goPlugin.HandshakeConfig{
ProtocolVersion: 1,
MagicCookieKey: "ARGO_ROLLOUTS_RPC_PLUGIN",
MagicCookieValue: "metrics",
}

func main() {
logCtx := *log.WithFields(log.Fields{"plugin": "prometheus"})

rpcPluginImp := &plugin.RpcPlugin{
LogCtx: logCtx,
}
// pluginMap is the map of plugins we can dispense.
var pluginMap = map[string]goPlugin.Plugin{
"RpcMetricsPlugin": &rolloutsPlugin.RpcMetricsPlugin{Impl: rpcPluginImp},
}

logCtx.Debug("message from plugin", "foo", "bar")

goPlugin.Serve(&goPlugin.ServeConfig{
HandshakeConfig: handshakeConfig,
Plugins: pluginMap,
})
}
Loading

0 comments on commit 2b1e125

Please sign in to comment.