Skip to content

Commit

Permalink
enable yurthub to handle upgrade request (#673)
Browse files Browse the repository at this point in the history
  • Loading branch information
Congrool authored Dec 12, 2021
1 parent 630d295 commit 61f8a76
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 7 deletions.
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mvdan/xurls v1.1.0/go.mod h1:tQlNn3BED8bE/15hnSL2HLkDeLWpNPAwtw7wkEq44oU=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0=
github.com/naoina/toml v0.1.1/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E=
Expand Down
46 changes: 39 additions & 7 deletions pkg/yurthub/proxy/remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"strings"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/proxy"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/klog/v2"

Expand All @@ -45,9 +47,17 @@ type RemoteProxy struct {
filterChain filter.Interface
currentTransport http.RoundTripper
bearerTransport http.RoundTripper
upgradeHandler *proxy.UpgradeAwareHandler
stopCh <-chan struct{}
}

type responder struct{}

func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) {
klog.Errorf("failed while proxying request %s, %v", req.URL, err)
http.Error(w, err.Error(), http.StatusInternalServerError)
}

// NewRemoteProxy creates an *RemoteProxy object, and will be used by LoadBalancer
func NewRemoteProxy(remoteServer *url.URL,
cacheMgr cachemanager.CacheManager,
Expand All @@ -64,6 +74,9 @@ func NewRemoteProxy(remoteServer *url.URL,
return nil, fmt.Errorf("could not get bearer transport when init proxy backend(%s)", remoteServer.String())
}

upgradeAwareHandler := proxy.NewUpgradeAwareHandler(remoteServer, nil, false, true, &responder{})
upgradeAwareHandler.UseRequestLocation = true

proxyBackend := &RemoteProxy{
checker: healthChecker,
reverseProxy: httputil.NewSingleHostReverseProxy(remoteServer),
Expand All @@ -72,6 +85,7 @@ func NewRemoteProxy(remoteServer *url.URL,
filterChain: filterChain,
currentTransport: currentTransport,
bearerTransport: bearerTransport,
upgradeHandler: upgradeAwareHandler,
stopCh: stopCh,
}

Expand All @@ -89,17 +103,23 @@ func (rp *RemoteProxy) Name() string {
}

func (rp *RemoteProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
if httpstream.IsUpgradeRequest(req) {
klog.V(5).Infof("get upgrade request %s", req.URL)
if isBearerRequest(req) {
rp.upgradeHandler.UpgradeTransport = proxy.NewUpgradeRequestRoundTripper(rp.bearerTransport, proxy.MirrorRequest)
} else {
rp.upgradeHandler.UpgradeTransport = proxy.NewUpgradeRequestRoundTripper(rp.currentTransport, proxy.MirrorRequest)
}
rp.upgradeHandler.ServeHTTP(rw, req)
return
}

rp.reverseProxy.Transport = rp.currentTransport
// when edge client(like kube-proxy, flannel, etc) use service account(default InClusterConfig) to access yurthub,
// Authorization header will be set in request. and when edge client(like kubelet) use x509 certificate to access
// yurthub, Authorization header in request will be empty.
auth := strings.TrimSpace(req.Header.Get("Authorization"))
if auth != "" {
parts := strings.Split(auth, " ")
if len(parts) == 2 && strings.ToLower(parts[0]) == "bearer" {
klog.V(5).Infof("request: %s with bearer token: %s", util.ReqString(req), parts[1])
rp.reverseProxy.Transport = rp.bearerTransport
}
if isBearerRequest(req) {
rp.reverseProxy.Transport = rp.bearerTransport
}
rp.reverseProxy.ServeHTTP(rw, req)
}
Expand Down Expand Up @@ -202,3 +222,15 @@ func (rp *RemoteProxy) errorHandler(rw http.ResponseWriter, req *http.Request, e
}
rw.WriteHeader(http.StatusBadGateway)
}

func isBearerRequest(req *http.Request) bool {
auth := strings.TrimSpace(req.Header.Get("Authorization"))
if auth != "" {
parts := strings.Split(auth, " ")
if len(parts) == 2 && strings.ToLower(parts[0]) == "bearer" {
klog.V(5).Infof("request: %s with bearer token: %s", util.ReqString(req), parts[1])
return true
}
}
return false
}
7 changes: 7 additions & 0 deletions pkg/yurthub/proxy/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ type wrapperResponseWriter struct {
http.ResponseWriter
http.Flusher
http.CloseNotifier
http.Hijacker
statusCode int
}

Expand All @@ -184,10 +185,16 @@ func newWrapperResponseWriter(w http.ResponseWriter) *wrapperResponseWriter {
klog.Error("can not get http.Flusher")
}

hijacker, ok := w.(http.Hijacker)
if !ok {
klog.Error("can not get http.Hijacker")
}

return &wrapperResponseWriter{
ResponseWriter: w,
Flusher: flusher,
CloseNotifier: cn,
Hijacker: hijacker,
}
}

Expand Down

0 comments on commit 61f8a76

Please sign in to comment.