Skip to content

Commit

Permalink
Improve managed transport observability
Browse files Browse the repository at this point in the history
Signed-off-by: Paulo Gomes <[email protected]>
  • Loading branch information
Paulo Gomes committed Mar 25, 2022
1 parent 656539e commit 2e87aa6
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 15 deletions.
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func main() {
}()

if managed.Enabled() {
managed.InitManagedTransport()
managed.InitManagedTransport(ctrl.Log.WithName("managed-transport"))
}

setupLog.Info("starting manager")
Expand Down
26 changes: 17 additions & 9 deletions pkg/git/libgit2/managed/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import (
"net/url"
"sync"

"github.com/fluxcd/source-controller/internal/transport"
pool "github.com/fluxcd/source-controller/internal/transport"
git2go "github.com/libgit2/git2go/v33"
)

Expand All @@ -72,8 +72,10 @@ func registerManagedHTTP() error {
}

func httpSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Transport) (git2go.SmartSubtransport, error) {
traceLog.Info("[http]: httpSmartSubtransportFactory")
sst := &httpSmartSubtransport{
transport: transport,
transport: transport,
httpTransport: pool.NewOrIdle(nil),
}

return sst, nil
Expand Down Expand Up @@ -104,9 +106,8 @@ func (t *httpSmartSubtransport) Action(targetUrl string, action git2go.SmartServ
proxyFn = http.ProxyURL(parsedUrl)
}

// reuses the http transport from a pool, or create new one on demand.
t.httpTransport = transport.NewOrIdle(nil)
t.httpTransport.Proxy = proxyFn
t.httpTransport.DisableCompression = false

client, req, err := createClientRequest(targetUrl, action, t.httpTransport)
if err != nil {
Expand Down Expand Up @@ -209,10 +210,18 @@ func createClientRequest(targetUrl string, action git2go.SmartServiceAction, t *
}

func (t *httpSmartSubtransport) Close() error {
traceLog.Info("[http]: httpSmartSubtransport.Close()")
return nil
}

func (t *httpSmartSubtransport) Free() {
traceLog.Info("[http]: httpSmartSubtransport.Free()")

if t.httpTransport != nil {
traceLog.Info("[http]: release http transport back to pool")
pool.Release(t.httpTransport)
t.httpTransport = nil
}
}

type httpSmartSubtransportStream struct {
Expand Down Expand Up @@ -277,6 +286,8 @@ func (self *httpSmartSubtransportStream) Write(buf []byte) (int, error) {

func (self *httpSmartSubtransportStream) Free() {
if self.resp != nil {
traceLog.Info("[http]: httpSmartSubtransportStream.Free()")

// ensure body is fully processed and closed
// for increased likelihood of transport reuse in HTTP/1.x.
// it should not be a problem to do this more than once.
Expand Down Expand Up @@ -344,6 +355,7 @@ func (self *httpSmartSubtransportStream) sendRequest() error {
}

req.SetBasicAuth(userName, password)
traceLog.Info("[http]: new request", "method", req.Method, "URL", req.URL)
resp, err = self.client.Do(req)
if err != nil {
return err
Expand All @@ -363,6 +375,7 @@ func (self *httpSmartSubtransportStream) sendRequest() error {
return err
}

traceLog.Info("[http]: POST redirect", "URL", self.req.URL)
continue
}

Expand All @@ -379,11 +392,6 @@ func (self *httpSmartSubtransportStream) sendRequest() error {
return fmt.Errorf("Unhandled HTTP error %s", resp.Status)
}

if self.owner.httpTransport != nil {
transport.Release(self.owner.httpTransport)
self.owner.httpTransport = nil
}

self.resp = resp
self.sentRequest = true
return nil
Expand Down
13 changes: 12 additions & 1 deletion pkg/git/libgit2/managed/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package managed
import (
"sync"
"time"

"github.com/fluxcd/pkg/runtime/logger"
"github.com/go-logr/logr"
)

var (
Expand All @@ -34,6 +37,9 @@ var (
// regardless of the current operation (i.e. connection,
// handshake, put/get).
fullHttpClientTimeOut time.Duration = 10 * time.Minute

debugLog logr.Logger
traceLog logr.Logger
)

// InitManagedTransport initialises HTTP(S) and SSH managed transport
Expand All @@ -47,9 +53,14 @@ var (
//
// This function will only register managed transports once, subsequent calls
// leads to no-op.
func InitManagedTransport() error {
func InitManagedTransport(log logr.Logger) error {
var err error

once.Do(func() {
log.Info("Enabling experimental managed transport")
debugLog = log.V(logger.DebugLevel)
traceLog = log.V(logger.TraceLevel)

if err = registerManagedHTTP(); err != nil {
return
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/git/libgit2/managed/managed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/fluxcd/pkg/gittestserver"
"github.com/fluxcd/pkg/ssh"
"github.com/fluxcd/source-controller/pkg/git"
"github.com/go-logr/logr"

git2go "github.com/libgit2/git2go/v33"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -247,7 +248,7 @@ func TestManagedTransport_E2E(t *testing.T) {
defer server.StopSSH()

// Force managed transport to be enabled
InitManagedTransport()
InitManagedTransport(logr.Discard())

repoPath := "test.git"
err = server.InitRepo("../testdata/git/repo", git.DefaultBranch, repoPath)
Expand Down Expand Up @@ -312,7 +313,7 @@ func TestManagedTransport_HandleRedirect(t *testing.T) {
defer os.RemoveAll(tmpDir)

// Force managed transport to be enabled
InitManagedTransport()
InitManagedTransport(logr.Discard())

// GitHub will cause a 301 and redirect to https
repo, err := git2go.Clone("http://github.com/stefanprodan/podinfo", tmpDir, &git2go.CloneOptions{
Expand Down
7 changes: 5 additions & 2 deletions pkg/git/libgit2/managed/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,7 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi
return nil, err
}

t.session, err = t.client.NewSession()
if err != nil {
traceLog.Info("[ssh]: creating new ssh session")
return nil, err
}

Expand All @@ -201,6 +200,7 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi
return nil, err
}

traceLog.Info("[ssh]: run on remote", "cmd", cmd)
if err := t.session.Start(cmd); err != nil {
return nil, err
}
Expand All @@ -214,6 +214,7 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi
}

func (t *sshSmartSubtransport) Close() error {
traceLog.Info("[ssh]: sshSmartSubtransport.Close()")
t.currentStream = nil
if t.client != nil {
t.stdin.Close()
Expand All @@ -225,6 +226,7 @@ func (t *sshSmartSubtransport) Close() error {
}

func (t *sshSmartSubtransport) Free() {
traceLog.Info("[ssh]: sshSmartSubtransport.Free()")
}

type sshSmartSubtransportStream struct {
Expand All @@ -240,6 +242,7 @@ func (stream *sshSmartSubtransportStream) Write(buf []byte) (int, error) {
}

func (stream *sshSmartSubtransportStream) Free() {
traceLog.Info("[ssh]: sshSmartSubtransportStream.Free()")
}

func getSSHConfigFromCredential(cred *git2go.Credential) (*ssh.ClientConfig, error) {
Expand Down

0 comments on commit 2e87aa6

Please sign in to comment.