-
Notifications
You must be signed in to change notification settings - Fork 907
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: metrics plugin system based on hashicorp go-plugin
Signed-off-by: zachaller <[email protected]>
- Loading branch information
Showing
31 changed files
with
2,091 additions
and
552 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,183 @@ | ||
package plugin | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"net/url" | ||
"os" | ||
"time" | ||
|
||
"github.com/argoproj/argo-rollouts/metricproviders/plugin" | ||
"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" | ||
"github.com/argoproj/argo-rollouts/utils/evaluate" | ||
metricutil "github.com/argoproj/argo-rollouts/utils/metric" | ||
timeutil "github.com/argoproj/argo-rollouts/utils/time" | ||
"github.com/prometheus/client_golang/api" | ||
v1 "github.com/prometheus/client_golang/api/prometheus/v1" | ||
"github.com/prometheus/common/model" | ||
log "github.com/sirupsen/logrus" | ||
) | ||
|
||
const EnvVarArgoRolloutsPrometheusAddress string = "ARGO_ROLLOUTS_PROMETHEUS_ADDRESS" | ||
|
||
// Here is a real implementation of MetricsPlugin | ||
type RpcPlugin struct { | ||
LogCtx log.Entry | ||
api v1.API | ||
} | ||
|
||
type Config struct { | ||
// Address is the HTTP address and port of the prometheus server | ||
Address string `json:"address,omitempty" protobuf:"bytes,1,opt,name=address"` | ||
// Query is a raw prometheus query to perform | ||
Query string `json:"query,omitempty" protobuf:"bytes,2,opt,name=query"` | ||
} | ||
|
||
func (g *RpcPlugin) NewMetricsPlugin(metric v1alpha1.Metric) error { | ||
config := Config{} | ||
err := json.Unmarshal(metric.Provider.Plugin.Config, &config) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
api, err := newPrometheusAPI(config.Address) | ||
g.api = api | ||
|
||
return err | ||
} | ||
|
||
func (g *RpcPlugin) Run(anaysisRun *v1alpha1.AnalysisRun, metric v1alpha1.Metric) v1alpha1.Measurement { | ||
startTime := timeutil.MetaNow() | ||
newMeasurement := v1alpha1.Measurement{ | ||
StartedAt: &startTime, | ||
} | ||
|
||
config := Config{} | ||
json.Unmarshal(metric.Provider.Plugin.Config, &config) | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | ||
defer cancel() | ||
|
||
response, warnings, err := g.api.Query(ctx, config.Query, time.Now()) | ||
if err != nil { | ||
return metricutil.MarkMeasurementError(newMeasurement, err) | ||
} | ||
|
||
newValue, newStatus, err := g.processResponse(metric, response) | ||
if err != nil { | ||
return metricutil.MarkMeasurementError(newMeasurement, err) | ||
|
||
} | ||
newMeasurement.Value = newValue | ||
if len(warnings) > 0 { | ||
warningMetadata := "" | ||
for _, warning := range warnings { | ||
warningMetadata = fmt.Sprintf(`%s"%s", `, warningMetadata, warning) | ||
} | ||
warningMetadata = warningMetadata[:len(warningMetadata)-2] | ||
if warningMetadata != "" { | ||
newMeasurement.Metadata = map[string]string{"warnings": warningMetadata} | ||
g.LogCtx.Warnf("Prometheus returned the following warnings: %s", warningMetadata) | ||
} | ||
} | ||
|
||
newMeasurement.Phase = newStatus | ||
finishedTime := timeutil.MetaNow() | ||
newMeasurement.FinishedAt = &finishedTime | ||
return newMeasurement | ||
} | ||
|
||
func (g *RpcPlugin) Resume(analysisRun *v1alpha1.AnalysisRun, metric v1alpha1.Metric, measurement v1alpha1.Measurement) v1alpha1.Measurement { | ||
return measurement | ||
} | ||
|
||
func (g *RpcPlugin) Terminate(analysisRun *v1alpha1.AnalysisRun, metric v1alpha1.Metric, measurement v1alpha1.Measurement) v1alpha1.Measurement { | ||
return measurement | ||
} | ||
|
||
func (g *RpcPlugin) GarbageCollect(*v1alpha1.AnalysisRun, v1alpha1.Metric, int) error { | ||
return nil | ||
} | ||
|
||
func (g *RpcPlugin) Type() string { | ||
return plugin.ProviderType | ||
} | ||
|
||
func (g *RpcPlugin) GetMetadata(metric v1alpha1.Metric) map[string]string { | ||
metricsMetadata := make(map[string]string) | ||
|
||
config := Config{} | ||
json.Unmarshal(metric.Provider.Plugin.Config, &config) | ||
if config.Query != "" { | ||
metricsMetadata["ResolvedPrometheusQuery"] = config.Query | ||
} | ||
return metricsMetadata | ||
} | ||
|
||
func (g *RpcPlugin) processResponse(metric v1alpha1.Metric, response model.Value) (string, v1alpha1.AnalysisPhase, error) { | ||
switch value := response.(type) { | ||
case *model.Scalar: | ||
valueStr := value.Value.String() | ||
result := float64(value.Value) | ||
newStatus, err := evaluate.EvaluateResult(result, metric, g.LogCtx) | ||
return valueStr, newStatus, err | ||
case model.Vector: | ||
results := make([]float64, 0, len(value)) | ||
valueStr := "[" | ||
for _, s := range value { | ||
if s != nil { | ||
valueStr = valueStr + s.Value.String() + "," | ||
results = append(results, float64(s.Value)) | ||
} | ||
} | ||
// if we appended to the string, we should remove the last comma on the string | ||
if len(valueStr) > 1 { | ||
valueStr = valueStr[:len(valueStr)-1] | ||
} | ||
valueStr = valueStr + "]" | ||
newStatus, err := evaluate.EvaluateResult(results, metric, g.LogCtx) | ||
return valueStr, newStatus, err | ||
default: | ||
return "", v1alpha1.AnalysisPhaseError, fmt.Errorf("Prometheus metric type not supported") | ||
} | ||
} | ||
|
||
func newPrometheusAPI(address string) (v1.API, error) { | ||
envValuesByKey := make(map[string]string) | ||
if value, ok := os.LookupEnv(fmt.Sprintf("%s", EnvVarArgoRolloutsPrometheusAddress)); ok { | ||
envValuesByKey[EnvVarArgoRolloutsPrometheusAddress] = value | ||
log.Debugf("ARGO_ROLLOUTS_PROMETHEUS_ADDRESS: %v", envValuesByKey[EnvVarArgoRolloutsPrometheusAddress]) | ||
} | ||
if len(address) != 0 { | ||
if !isUrl(address) { | ||
return nil, errors.New("prometheus address is not is url format") | ||
} | ||
} else if envValuesByKey[EnvVarArgoRolloutsPrometheusAddress] != "" { | ||
if isUrl(envValuesByKey[EnvVarArgoRolloutsPrometheusAddress]) { | ||
address = envValuesByKey[EnvVarArgoRolloutsPrometheusAddress] | ||
} else { | ||
return nil, errors.New("prometheus address is not is url format") | ||
} | ||
} else { | ||
return nil, errors.New("prometheus address is not configured") | ||
} | ||
client, err := api.NewClient(api.Config{ | ||
Address: address, | ||
}) | ||
if err != nil { | ||
log.Errorf("Error in getting prometheus client: %v", err) | ||
return nil, err | ||
} | ||
return v1.NewAPI(client), nil | ||
} | ||
|
||
func isUrl(str string) bool { | ||
u, err := url.Parse(str) | ||
if err != nil { | ||
log.Errorf("Error in parsing url: %v", err) | ||
} | ||
log.Debugf("Parsed url: %v", u) | ||
return err == nil && u.Scheme != "" && u.Host != "" | ||
} |
107 changes: 107 additions & 0 deletions
107
cmd/sample-metrics-plugin/internal/plugin/plugin_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
package plugin | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"testing" | ||
"time" | ||
|
||
rolloutsPlugin "github.com/argoproj/argo-rollouts/metricproviders/plugin" | ||
"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" | ||
log "github.com/sirupsen/logrus" | ||
|
||
goPlugin "github.com/hashicorp/go-plugin" | ||
) | ||
|
||
var testHandshake = goPlugin.HandshakeConfig{ | ||
ProtocolVersion: 1, | ||
MagicCookieKey: "ARGO_ROLLOUTS_RPC_PLUGIN", | ||
MagicCookieValue: "metrics", | ||
} | ||
|
||
// This is just an example of how to test a plugin. | ||
func TestRunSuccessfully(t *testing.T) { | ||
//Skip test because this is just an example of how to test a plugin. | ||
t.Skip("Skipping test because it requires a running prometheus server") | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
logCtx := *log.WithFields(log.Fields{"plugin-test": "prometheus"}) | ||
|
||
rpcPluginImp := &RpcPlugin{ | ||
LogCtx: logCtx, | ||
} | ||
|
||
// pluginMap is the map of plugins we can dispense. | ||
var pluginMap = map[string]goPlugin.Plugin{ | ||
"RpcMetricsPlugin": &rolloutsPlugin.RpcMetricsPlugin{Impl: rpcPluginImp}, | ||
} | ||
|
||
ch := make(chan *goPlugin.ReattachConfig, 1) | ||
closeCh := make(chan struct{}) | ||
go goPlugin.Serve(&goPlugin.ServeConfig{ | ||
HandshakeConfig: testHandshake, | ||
Plugins: pluginMap, | ||
Test: &goPlugin.ServeTestConfig{ | ||
Context: ctx, | ||
ReattachConfigCh: ch, | ||
CloseCh: closeCh, | ||
}, | ||
}) | ||
|
||
// We should get a config | ||
var config *goPlugin.ReattachConfig | ||
select { | ||
case config = <-ch: | ||
case <-time.After(2000 * time.Millisecond): | ||
t.Fatal("should've received reattach") | ||
} | ||
if config == nil { | ||
t.Fatal("config should not be nil") | ||
} | ||
|
||
// Connect! | ||
c := goPlugin.NewClient(&goPlugin.ClientConfig{ | ||
Cmd: nil, | ||
HandshakeConfig: testHandshake, | ||
Plugins: pluginMap, | ||
Reattach: config, | ||
}) | ||
client, err := c.Client() | ||
if err != nil { | ||
t.Fatalf("err: %s", err) | ||
} | ||
|
||
// Pinging should work | ||
if err := client.Ping(); err != nil { | ||
t.Fatalf("should not err: %s", err) | ||
} | ||
|
||
// Kill which should do nothing | ||
c.Kill() | ||
if err := client.Ping(); err != nil { | ||
t.Fatalf("should not err: %s", err) | ||
} | ||
|
||
// Request the plugin | ||
raw, err := client.Dispense("RpcMetricsPlugin") | ||
if err != nil { | ||
t.Fail() | ||
} | ||
|
||
plugin := raw.(rolloutsPlugin.MetricsPlugin) | ||
|
||
err = plugin.NewMetricsPlugin(v1alpha1.Metric{ | ||
Provider: v1alpha1.MetricProvider{ | ||
Plugin: &v1alpha1.PluginMetric{Config: json.RawMessage(`{"address":"http://prometheus.local", "query":"machine_cpu_cores"}`)}, | ||
}, | ||
}) | ||
if err != nil { | ||
t.Fail() | ||
} | ||
|
||
// Canceling should cause an exit | ||
cancel() | ||
<-closeCh | ||
} |
Oops, something went wrong.