From 51a11797d83c84c2e2b05b1fd3687129aa2cf287 Mon Sep 17 00:00:00 2001 From: hzxuzhonghu Date: Tue, 31 Oct 2017 19:08:29 +0800 Subject: [PATCH 1/2] cache admission webhook restClient Kubernetes-commit: 42d9153a03e971453ccf8e46e149a20a9ff3d656 --- pkg/admission/plugin/webhook/admission.go | 51 +++- .../plugin/webhook/admission_test.go | 274 +++++++++++++----- 2 files changed, 248 insertions(+), 77 deletions(-) diff --git a/pkg/admission/plugin/webhook/admission.go b/pkg/admission/plugin/webhook/admission.go index 1238f4a5c..af4c3a9c2 100644 --- a/pkg/admission/plugin/webhook/admission.go +++ b/pkg/admission/plugin/webhook/admission.go @@ -19,13 +19,15 @@ package webhook import ( "context" + "encoding/json" "fmt" "io" + "net" "net/url" - "path" "sync" "github.com/golang/glog" + lru "github.com/hashicorp/golang-lru" admissionv1alpha1 "k8s.io/api/admission/v1alpha1" "k8s.io/api/admissionregistration/v1alpha1" @@ -45,7 +47,8 @@ import ( const ( // Name of admission plug-in - PluginName = "GenericAdmissionWebhook" + PluginName = "GenericAdmissionWebhook" + defaultCacheSize = 200 ) type ErrCallingWebhook struct { @@ -96,6 +99,11 @@ func NewGenericAdmissionWebhook(configFile io.Reader) (*GenericAdmissionWebhook, return nil, err } + cache, err := lru.New(defaultCacheSize) + if err != nil { + return nil, err + } + return &GenericAdmissionWebhook{ Handler: admission.NewHandler( admission.Connect, @@ -105,6 +113,7 @@ func NewGenericAdmissionWebhook(configFile io.Reader) (*GenericAdmissionWebhook, ), authInfoResolver: authInfoResolver, serviceResolver: defaultServiceResolver{}, + cache: cache, }, nil } @@ -116,6 +125,7 @@ type GenericAdmissionWebhook struct { negotiatedSerializer runtime.NegotiatedSerializer authInfoResolver AuthenticationInfoResolver + cache *lru.Cache } // serviceResolver knows how to convert a service reference into an actual location. @@ -300,23 +310,48 @@ func toStatusErr(name string, result *metav1.Status) *apierrors.StatusError { } func (a *GenericAdmissionWebhook) hookClient(h *v1alpha1.Webhook) (*rest.RESTClient, error) { - serverName := h.ClientConfig.Service.Name + "." + h.ClientConfig.Service.Namespace + ".svc" - u, err := a.serviceResolver.ResolveEndpoint(h.ClientConfig.Service.Namespace, h.ClientConfig.Service.Name) + cacheKey, err := json.Marshal(h.ClientConfig) if err != nil { return nil, err } + if client, ok := a.cache.Get(string(cacheKey)); ok { + return client.(*rest.RESTClient), nil + } - // TODO: cache these instead of constructing one each time + serverName := h.ClientConfig.Service.Name + "." + h.ClientConfig.Service.Namespace + ".svc" restConfig, err := a.authInfoResolver.ClientConfigFor(serverName) if err != nil { return nil, err } + cfg := rest.CopyConfig(restConfig) - cfg.Host = u.Host - cfg.APIPath = path.Join(u.Path, h.ClientConfig.URLPath) + host := serverName + ":443" + cfg.Host = "https://" + host + cfg.APIPath = h.ClientConfig.URLPath cfg.TLSClientConfig.ServerName = serverName cfg.TLSClientConfig.CAData = h.ClientConfig.CABundle cfg.ContentConfig.NegotiatedSerializer = a.negotiatedSerializer cfg.ContentConfig.ContentType = runtime.ContentTypeJSON - return rest.UnversionedRESTClientFor(cfg) + + delegateDialer := cfg.Dial + if delegateDialer == nil { + delegateDialer = net.Dial + } + + cfg.Dial = func(network, addr string) (net.Conn, error) { + if addr == host { + u, err := a.serviceResolver.ResolveEndpoint(h.ClientConfig.Service.Namespace, h.ClientConfig.Service.Name) + if err != nil { + return nil, err + } + addr = u.Host + } + return delegateDialer(network, addr) + } + + client, err := rest.UnversionedRESTClientFor(cfg) + if err == nil { + a.cache.Add(string(cacheKey), client) + } + return client, err } diff --git a/pkg/admission/plugin/webhook/admission_test.go b/pkg/admission/plugin/webhook/admission_test.go index ae4205bb7..39ad0c34f 100644 --- a/pkg/admission/plugin/webhook/admission_test.go +++ b/pkg/admission/plugin/webhook/admission_test.go @@ -25,6 +25,7 @@ import ( "net/http/httptest" "net/url" "strings" + "sync/atomic" "testing" "k8s.io/api/admission/v1alpha1" @@ -70,19 +71,7 @@ func TestAdmit(t *testing.T) { v1alpha1.AddToScheme(scheme) api.AddToScheme(scheme) - // Create the test webhook server - sCert, err := tls.X509KeyPair(serverCert, serverKey) - if err != nil { - t.Fatal(err) - } - rootCAs := x509.NewCertPool() - rootCAs.AppendCertsFromPEM(caCert) - testServer := httptest.NewUnstartedServer(http.HandlerFunc(webhookHandler)) - testServer.TLS = &tls.Config{ - Certificates: []tls.Certificate{sCert}, - ClientCAs: rootCAs, - ClientAuth: tls.RequireAndVerifyClientCert, - } + testServer := newTestServer(t) testServer.StartTLS() defer testServer.Close() serverURL, err := url.ParseRequestURI(testServer.URL) @@ -93,15 +82,9 @@ func TestAdmit(t *testing.T) { if err != nil { t.Fatal(err) } - wh.authInfoResolver = &fakeAuthenticationInfoResolver{ - restConfig: &rest.Config{ - TLSClientConfig: rest.TLSClientConfig{ - CAData: caCert, - CertData: clientCert, - KeyData: clientKey, - }, - }, - } + wh.authInfoResolver = newFakeAuthenticationInfoResolver() + wh.serviceResolver = fakeServiceResolver{base: *serverURL} + wh.SetScheme(scheme) // Set up a test object for the call kind := api.SchemeGroupVersion.WithKind("Pod") @@ -137,25 +120,6 @@ func TestAdmit(t *testing.T) { expectAllow bool errorContains string } - ccfg := func(urlPath string) registrationv1alpha1.WebhookClientConfig { - return registrationv1alpha1.WebhookClientConfig{ - Service: registrationv1alpha1.ServiceReference{ - Name: "webhook-test", - Namespace: "default", - }, - URLPath: urlPath, - CABundle: caCert, - } - } - - matchEverythingRules := []registrationv1alpha1.RuleWithOperations{{ - Operations: []registrationv1alpha1.OperationType{registrationv1alpha1.OperationAll}, - Rule: registrationv1alpha1.Rule{ - APIGroups: []string{"*"}, - APIVersions: []string{"*"}, - Resources: []string{"*/*"}, - }, - }} policyFail := registrationv1alpha1.Fail policyIgnore := registrationv1alpha1.Ignore @@ -165,7 +129,7 @@ func TestAdmit(t *testing.T) { hookSource: fakeHookSource{ hooks: []registrationv1alpha1.Webhook{{ Name: "nomatch", - ClientConfig: ccfg("disallow"), + ClientConfig: newFakeHookClientConfig("disallow"), Rules: []registrationv1alpha1.RuleWithOperations{{ Operations: []registrationv1alpha1.OperationType{registrationv1alpha1.Create}, }}, @@ -177,8 +141,8 @@ func TestAdmit(t *testing.T) { hookSource: fakeHookSource{ hooks: []registrationv1alpha1.Webhook{{ Name: "allow", - ClientConfig: ccfg("allow"), - Rules: matchEverythingRules, + ClientConfig: newFakeHookClientConfig("allow"), + Rules: newMatchEverythingRules(), }}, }, expectAllow: true, @@ -187,8 +151,8 @@ func TestAdmit(t *testing.T) { hookSource: fakeHookSource{ hooks: []registrationv1alpha1.Webhook{{ Name: "disallow", - ClientConfig: ccfg("disallow"), - Rules: matchEverythingRules, + ClientConfig: newFakeHookClientConfig("disallow"), + Rules: newMatchEverythingRules(), }}, }, errorContains: "without explanation", @@ -197,8 +161,8 @@ func TestAdmit(t *testing.T) { hookSource: fakeHookSource{ hooks: []registrationv1alpha1.Webhook{{ Name: "disallowReason", - ClientConfig: ccfg("disallowReason"), - Rules: matchEverythingRules, + ClientConfig: newFakeHookClientConfig("disallowReason"), + Rules: newMatchEverythingRules(), }}, }, errorContains: "you shall not pass", @@ -207,18 +171,18 @@ func TestAdmit(t *testing.T) { hookSource: fakeHookSource{ hooks: []registrationv1alpha1.Webhook{{ Name: "internalErr A", - ClientConfig: ccfg("internalErr"), - Rules: matchEverythingRules, + ClientConfig: newFakeHookClientConfig("internalErr"), + Rules: newMatchEverythingRules(), FailurePolicy: &policyIgnore, }, { Name: "internalErr B", - ClientConfig: ccfg("internalErr"), - Rules: matchEverythingRules, + ClientConfig: newFakeHookClientConfig("internalErr"), + Rules: newMatchEverythingRules(), FailurePolicy: &policyIgnore, }, { Name: "internalErr C", - ClientConfig: ccfg("internalErr"), - Rules: matchEverythingRules, + ClientConfig: newFakeHookClientConfig("internalErr"), + Rules: newMatchEverythingRules(), FailurePolicy: &policyIgnore, }}, }, @@ -228,16 +192,16 @@ func TestAdmit(t *testing.T) { hookSource: fakeHookSource{ hooks: []registrationv1alpha1.Webhook{{ Name: "internalErr A", - ClientConfig: ccfg("internalErr"), - Rules: matchEverythingRules, + ClientConfig: newFakeHookClientConfig("internalErr"), + Rules: newMatchEverythingRules(), }, { Name: "internalErr B", - ClientConfig: ccfg("internalErr"), - Rules: matchEverythingRules, + ClientConfig: newFakeHookClientConfig("internalErr"), + Rules: newMatchEverythingRules(), }, { Name: "internalErr C", - ClientConfig: ccfg("internalErr"), - Rules: matchEverythingRules, + ClientConfig: newFakeHookClientConfig("internalErr"), + Rules: newMatchEverythingRules(), }}, }, expectAllow: false, @@ -246,18 +210,18 @@ func TestAdmit(t *testing.T) { hookSource: fakeHookSource{ hooks: []registrationv1alpha1.Webhook{{ Name: "internalErr A", - ClientConfig: ccfg("internalErr"), - Rules: matchEverythingRules, + ClientConfig: newFakeHookClientConfig("internalErr"), + Rules: newMatchEverythingRules(), FailurePolicy: &policyFail, }, { Name: "internalErr B", - ClientConfig: ccfg("internalErr"), - Rules: matchEverythingRules, + ClientConfig: newFakeHookClientConfig("internalErr"), + Rules: newMatchEverythingRules(), FailurePolicy: &policyFail, }, { Name: "internalErr C", - ClientConfig: ccfg("internalErr"), - Rules: matchEverythingRules, + ClientConfig: newFakeHookClientConfig("internalErr"), + Rules: newMatchEverythingRules(), FailurePolicy: &policyFail, }}, }, @@ -268,8 +232,6 @@ func TestAdmit(t *testing.T) { for name, tt := range table { t.Run(name, func(t *testing.T) { wh.hookSource = &tt.hookSource - wh.serviceResolver = fakeServiceResolver{base: *serverURL} - wh.SetScheme(scheme) err = wh.Admit(admission.NewAttributesRecord(&object, &oldObject, kind, namespace, name, resource, subResource, operation, &userInfo)) if tt.expectAllow != (err == nil) { @@ -288,6 +250,144 @@ func TestAdmit(t *testing.T) { } } +// TestAdmitCachedClient tests that GenericAdmissionWebhook#Admit should cache restClient +func TestAdmitCachedClient(t *testing.T) { + scheme := runtime.NewScheme() + v1alpha1.AddToScheme(scheme) + api.AddToScheme(scheme) + + testServer := newTestServer(t) + testServer.StartTLS() + defer testServer.Close() + serverURL, err := url.ParseRequestURI(testServer.URL) + if err != nil { + t.Fatalf("this should never happen? %v", err) + } + wh, err := NewGenericAdmissionWebhook(nil) + if err != nil { + t.Fatal(err) + } + wh.authInfoResolver = newFakeAuthenticationInfoResolver() + wh.serviceResolver = fakeServiceResolver{base: *serverURL} + wh.SetScheme(scheme) + + // Set up a test object for the call + kind := api.SchemeGroupVersion.WithKind("Pod") + name := "my-pod" + namespace := "webhook-test" + object := api.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "pod.name": name, + }, + Name: name, + Namespace: namespace, + }, + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + } + oldObject := api.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}, + } + operation := admission.Update + resource := api.Resource("pods").WithVersion("v1") + subResource := "" + userInfo := user.DefaultInfo{ + Name: "webhook-test", + UID: "webhook-test", + } + + type test struct { + name string + hookSource fakeHookSource + expectAllow bool + expectCache bool + } + + policyIgnore := registrationv1alpha1.Ignore + cases := []test{ + { + name: "cache 1", + hookSource: fakeHookSource{ + hooks: []registrationv1alpha1.Webhook{{ + Name: "cache1", + ClientConfig: newFakeHookClientConfig("allow"), + Rules: newMatchEverythingRules(), + FailurePolicy: &policyIgnore, + }}, + }, + expectAllow: true, + expectCache: true, + }, + { + name: "cache 2", + hookSource: fakeHookSource{ + hooks: []registrationv1alpha1.Webhook{{ + Name: "cache2", + ClientConfig: newFakeHookClientConfig("internalErr"), + Rules: newMatchEverythingRules(), + FailurePolicy: &policyIgnore, + }}, + }, + expectAllow: true, + expectCache: true, + }, + { + name: "cache 3", + hookSource: fakeHookSource{ + hooks: []registrationv1alpha1.Webhook{{ + Name: "cache3", + ClientConfig: newFakeHookClientConfig("allow"), + Rules: newMatchEverythingRules(), + FailurePolicy: &policyIgnore, + }}, + }, + expectAllow: true, + expectCache: false, + }, + } + + for _, testcase := range cases { + t.Run(testcase.name, func(t *testing.T) { + wh.hookSource = &testcase.hookSource + wh.authInfoResolver.(*fakeAuthenticationInfoResolver).cachedCount = 0 + + err = wh.Admit(admission.NewAttributesRecord(&object, &oldObject, kind, namespace, testcase.name, resource, subResource, operation, &userInfo)) + if testcase.expectAllow != (err == nil) { + t.Errorf("expected allowed=%v, but got err=%v", testcase.expectAllow, err) + } + + if testcase.expectCache && wh.authInfoResolver.(*fakeAuthenticationInfoResolver).cachedCount != 1 { + t.Errorf("expected cacheclient, but got none") + } + + if !testcase.expectCache && wh.authInfoResolver.(*fakeAuthenticationInfoResolver).cachedCount != 0 { + t.Errorf("expected not cacheclient, but got cache") + } + }) + } + +} + +func newTestServer(t *testing.T) *httptest.Server { + // Create the test webhook server + sCert, err := tls.X509KeyPair(serverCert, serverKey) + if err != nil { + t.Fatal(err) + } + rootCAs := x509.NewCertPool() + rootCAs.AppendCertsFromPEM(caCert) + testServer := httptest.NewUnstartedServer(http.HandlerFunc(webhookHandler)) + testServer.TLS = &tls.Config{ + Certificates: []tls.Certificate{sCert}, + ClientCAs: rootCAs, + ClientAuth: tls.RequireAndVerifyClientCert, + } + return testServer +} + func webhookHandler(w http.ResponseWriter, r *http.Request) { fmt.Printf("got req: %v\n", r.URL.Path) switch r.URL.Path { @@ -330,11 +430,25 @@ func webhookHandler(w http.ResponseWriter, r *http.Request) { } } +func newFakeAuthenticationInfoResolver() *fakeAuthenticationInfoResolver { + return &fakeAuthenticationInfoResolver{ + restConfig: &rest.Config{ + TLSClientConfig: rest.TLSClientConfig{ + CAData: caCert, + CertData: clientCert, + KeyData: clientKey, + }, + }, + } +} + type fakeAuthenticationInfoResolver struct { - restConfig *rest.Config + restConfig *rest.Config + cachedCount int32 } func (c *fakeAuthenticationInfoResolver) ClientConfigFor(server string) (*rest.Config, error) { + atomic.AddInt32(&c.cachedCount, 1) return c.restConfig, nil } @@ -386,3 +500,25 @@ func TestToStatusErr(t *testing.T) { } } } + +func newFakeHookClientConfig(urlPath string) registrationv1alpha1.WebhookClientConfig { + return registrationv1alpha1.WebhookClientConfig{ + Service: registrationv1alpha1.ServiceReference{ + Name: "webhook-test", + Namespace: "default", + }, + URLPath: urlPath, + CABundle: caCert, + } +} + +func newMatchEverythingRules() []registrationv1alpha1.RuleWithOperations { + return []registrationv1alpha1.RuleWithOperations{{ + Operations: []registrationv1alpha1.OperationType{registrationv1alpha1.OperationAll}, + Rule: registrationv1alpha1.Rule{ + APIGroups: []string{"*"}, + APIVersions: []string{"*"}, + Resources: []string{"*/*"}, + }, + }} +} From af817771108cd99596437246dcdd3f80f9ad813e Mon Sep 17 00:00:00 2001 From: hzxuzhonghu Date: Fri, 3 Nov 2017 10:06:07 +0800 Subject: [PATCH 2/2] update bazel Kubernetes-commit: b845e26983741c5d12e621604b5e064e03d4fed1 --- pkg/admission/plugin/webhook/BUILD | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/admission/plugin/webhook/BUILD b/pkg/admission/plugin/webhook/BUILD index a0ec3b055..13a80fd3f 100644 --- a/pkg/admission/plugin/webhook/BUILD +++ b/pkg/admission/plugin/webhook/BUILD @@ -15,6 +15,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//vendor/github.com/golang/glog:go_default_library", + "//vendor/github.com/hashicorp/golang-lru:go_default_library", "//vendor/k8s.io/api/admission/v1alpha1:go_default_library", "//vendor/k8s.io/api/admissionregistration/v1alpha1:go_default_library", "//vendor/k8s.io/api/authentication/v1:go_default_library",