Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libgit2: remove deadlock #785

Merged
merged 2 commits into from
Jun 21, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 32 additions & 39 deletions pkg/git/libgit2/managed/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"time"

"golang.org/x/crypto/ssh"
Expand All @@ -80,10 +81,12 @@ func registerManagedSSH() error {
}

func sshSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Transport) (git2go.SmartSubtransport, error) {
var closed int32 = 0
return &sshSmartSubtransport{
transport: transport,
ctx: context.Background(),
logger: logr.Discard(),
transport: transport,
ctx: context.Background(),
logger: logr.Discard(),
closedSessions: &closed,
}, nil
}

Expand All @@ -109,15 +112,12 @@ type sshSmartSubtransport struct {
stdin io.WriteCloser
stdout io.Reader

con connection
}
closedSessions *int32

type connection struct {
client *ssh.Client
session *ssh.Session
currentStream *sshSmartSubtransportStream
connected bool
m sync.RWMutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this approach, no more locking and safer.
This whole connection type was created in order to group these variables and lock them together. Since we no longer lock with this change, we can put them back into sshSmartSubtransport struct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed a commit with this change, as Paulo is away at the moment.

}

func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) {
Expand Down Expand Up @@ -151,17 +151,17 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
var cmd string
switch action {
case git2go.SmartServiceActionUploadpackLs, git2go.SmartServiceActionUploadpack:
if t.con.currentStream != nil {
if t.currentStream != nil {
if t.lastAction == git2go.SmartServiceActionUploadpackLs {
return t.con.currentStream, nil
return t.currentStream, nil
}
}
cmd = fmt.Sprintf("git-upload-pack '%s'", uPath)

case git2go.SmartServiceActionReceivepackLs, git2go.SmartServiceActionReceivepack:
if t.con.currentStream != nil {
if t.currentStream != nil {
if t.lastAction == git2go.SmartServiceActionReceivepackLs {
return t.con.currentStream, nil
return t.currentStream, nil
}
}
cmd = fmt.Sprintf("git-receive-pack '%s'", uPath)
Expand Down Expand Up @@ -208,32 +208,30 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
return nil
}

t.con.m.RLock()
if t.con.connected == true {
if t.connected {
// The connection is no longer shared across actions, so ensures
// all has been released before starting a new connection.
_ = t.Close()
}
t.con.m.RUnlock()

err = t.createConn(addr, sshConfig)
if err != nil {
return nil, err
}

t.logger.V(logger.TraceLevel).Info("creating new ssh session")
if t.con.session, err = t.con.client.NewSession(); err != nil {
if t.session, err = t.client.NewSession(); err != nil {
return nil, err
}

if t.stdin, err = t.con.session.StdinPipe(); err != nil {
if t.stdin, err = t.session.StdinPipe(); err != nil {
return nil, err
}

var w *io.PipeWriter
var reader io.Reader
t.stdout, w = io.Pipe()
if reader, err = t.con.session.StdoutPipe(); err != nil {
if reader, err = t.session.StdoutPipe(); err != nil {
return nil, err
}

Expand All @@ -251,7 +249,6 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
"recovered from libgit2 ssh smart subtransport panic")
}
}()

var cancel context.CancelFunc
ctx := t.ctx

Expand All @@ -261,19 +258,17 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
defer cancel()
}

closedAlready := atomic.LoadInt32(t.closedSessions)
for {
select {
case <-ctx.Done():
t.Close()
return nil

default:
t.con.m.RLock()
if !t.con.connected {
t.con.m.RUnlock()
if atomic.LoadInt32(t.closedSessions) > closedAlready {
return nil
}
t.con.m.RUnlock()

_, err := io.Copy(w, reader)
if err != nil {
Expand All @@ -285,16 +280,16 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
}()

t.logger.V(logger.TraceLevel).Info("run on remote", "cmd", cmd)
if err := t.con.session.Start(cmd); err != nil {
if err := t.session.Start(cmd); err != nil {
return nil, err
}

t.lastAction = action
t.con.currentStream = &sshSmartSubtransportStream{
t.currentStream = &sshSmartSubtransportStream{
owner: t,
}

return t.con.currentStream, nil
return t.currentStream, nil
}

func (t *sshSmartSubtransport) createConn(addr string, sshConfig *ssh.ClientConfig) error {
Expand All @@ -311,10 +306,8 @@ func (t *sshSmartSubtransport) createConn(addr string, sshConfig *ssh.ClientConf
return err
}

t.con.m.Lock()
t.con.connected = true
t.con.client = ssh.NewClient(c, chans, reqs)
t.con.m.Unlock()
t.connected = true
t.client = ssh.NewClient(c, chans, reqs)

return nil
}
Expand All @@ -330,27 +323,27 @@ func (t *sshSmartSubtransport) createConn(addr string, sshConfig *ssh.ClientConf
// SmartSubTransport (i.e. unreleased resources, staled connections).
func (t *sshSmartSubtransport) Close() error {
t.logger.V(logger.TraceLevel).Info("sshSmartSubtransport.Close()")
t.con.m.Lock()
defer t.con.m.Unlock()

t.con.currentStream = nil
if t.con.client != nil && t.stdin != nil {
t.currentStream = nil
if t.client != nil && t.stdin != nil {
_ = t.stdin.Close()
}
t.stdin = nil

if t.con.session != nil {
if t.session != nil {
t.logger.V(logger.TraceLevel).Info("session.Close()")
_ = t.con.session.Close()
_ = t.session.Close()
}
t.con.session = nil
t.session = nil

if t.con.client != nil {
_ = t.con.client.Close()
if t.client != nil {
_ = t.client.Close()
t.logger.V(logger.TraceLevel).Info("close client")
}
t.client = nil

t.con.connected = false
t.connected = false
atomic.AddInt32(t.closedSessions, 1)

return nil
}
Expand Down