From 61f8a76fda0a995e84effaa83eb092010b287080 Mon Sep 17 00:00:00 2001 From: Yifei Zhang Date: Sun, 12 Dec 2021 22:09:20 +0800 Subject: [PATCH] enable yurthub to handle upgrade request (#673) --- go.sum | 1 + pkg/yurthub/proxy/remote/remote.go | 46 +++++++++++++++++++++++++----- pkg/yurthub/proxy/util/util.go | 7 +++++ 3 files changed, 47 insertions(+), 7 deletions(-) diff --git a/go.sum b/go.sum index e2d0e65b301..904d54e06ad 100644 --- a/go.sum +++ b/go.sum @@ -449,6 +449,7 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mvdan/xurls v1.1.0/go.mod h1:tQlNn3BED8bE/15hnSL2HLkDeLWpNPAwtw7wkEq44oU= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0= github.com/naoina/toml v0.1.1/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E= diff --git a/pkg/yurthub/proxy/remote/remote.go b/pkg/yurthub/proxy/remote/remote.go index e6d3aec7d8e..0cb33d598bb 100644 --- a/pkg/yurthub/proxy/remote/remote.go +++ b/pkg/yurthub/proxy/remote/remote.go @@ -26,6 +26,8 @@ import ( "strings" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/httpstream" + "k8s.io/apimachinery/pkg/util/proxy" apirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/klog/v2" @@ -45,9 +47,17 @@ type RemoteProxy struct { filterChain filter.Interface currentTransport http.RoundTripper bearerTransport http.RoundTripper + upgradeHandler *proxy.UpgradeAwareHandler stopCh <-chan struct{} } +type responder struct{} + +func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) { + klog.Errorf("failed while proxying request %s, %v", req.URL, err) + http.Error(w, err.Error(), http.StatusInternalServerError) +} + // NewRemoteProxy creates an *RemoteProxy object, and will be used by LoadBalancer func NewRemoteProxy(remoteServer *url.URL, cacheMgr cachemanager.CacheManager, @@ -64,6 +74,9 @@ func NewRemoteProxy(remoteServer *url.URL, return nil, fmt.Errorf("could not get bearer transport when init proxy backend(%s)", remoteServer.String()) } + upgradeAwareHandler := proxy.NewUpgradeAwareHandler(remoteServer, nil, false, true, &responder{}) + upgradeAwareHandler.UseRequestLocation = true + proxyBackend := &RemoteProxy{ checker: healthChecker, reverseProxy: httputil.NewSingleHostReverseProxy(remoteServer), @@ -72,6 +85,7 @@ func NewRemoteProxy(remoteServer *url.URL, filterChain: filterChain, currentTransport: currentTransport, bearerTransport: bearerTransport, + upgradeHandler: upgradeAwareHandler, stopCh: stopCh, } @@ -89,17 +103,23 @@ func (rp *RemoteProxy) Name() string { } func (rp *RemoteProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + if httpstream.IsUpgradeRequest(req) { + klog.V(5).Infof("get upgrade request %s", req.URL) + if isBearerRequest(req) { + rp.upgradeHandler.UpgradeTransport = proxy.NewUpgradeRequestRoundTripper(rp.bearerTransport, proxy.MirrorRequest) + } else { + rp.upgradeHandler.UpgradeTransport = proxy.NewUpgradeRequestRoundTripper(rp.currentTransport, proxy.MirrorRequest) + } + rp.upgradeHandler.ServeHTTP(rw, req) + return + } + rp.reverseProxy.Transport = rp.currentTransport // when edge client(like kube-proxy, flannel, etc) use service account(default InClusterConfig) to access yurthub, // Authorization header will be set in request. and when edge client(like kubelet) use x509 certificate to access // yurthub, Authorization header in request will be empty. - auth := strings.TrimSpace(req.Header.Get("Authorization")) - if auth != "" { - parts := strings.Split(auth, " ") - if len(parts) == 2 && strings.ToLower(parts[0]) == "bearer" { - klog.V(5).Infof("request: %s with bearer token: %s", util.ReqString(req), parts[1]) - rp.reverseProxy.Transport = rp.bearerTransport - } + if isBearerRequest(req) { + rp.reverseProxy.Transport = rp.bearerTransport } rp.reverseProxy.ServeHTTP(rw, req) } @@ -202,3 +222,15 @@ func (rp *RemoteProxy) errorHandler(rw http.ResponseWriter, req *http.Request, e } rw.WriteHeader(http.StatusBadGateway) } + +func isBearerRequest(req *http.Request) bool { + auth := strings.TrimSpace(req.Header.Get("Authorization")) + if auth != "" { + parts := strings.Split(auth, " ") + if len(parts) == 2 && strings.ToLower(parts[0]) == "bearer" { + klog.V(5).Infof("request: %s with bearer token: %s", util.ReqString(req), parts[1]) + return true + } + } + return false +} diff --git a/pkg/yurthub/proxy/util/util.go b/pkg/yurthub/proxy/util/util.go index a74ca5915ee..aa9c208c2a7 100644 --- a/pkg/yurthub/proxy/util/util.go +++ b/pkg/yurthub/proxy/util/util.go @@ -170,6 +170,7 @@ type wrapperResponseWriter struct { http.ResponseWriter http.Flusher http.CloseNotifier + http.Hijacker statusCode int } @@ -184,10 +185,16 @@ func newWrapperResponseWriter(w http.ResponseWriter) *wrapperResponseWriter { klog.Error("can not get http.Flusher") } + hijacker, ok := w.(http.Hijacker) + if !ok { + klog.Error("can not get http.Hijacker") + } + return &wrapperResponseWriter{ ResponseWriter: w, Flusher: flusher, CloseNotifier: cn, + Hijacker: hijacker, } }