Skip to content

Commit

Permalink
Merge pull request #16118 from vrothberg/proxy-mcproxface
Browse files Browse the repository at this point in the history
play kube: notifyproxy: listen before starting the pod
  • Loading branch information
rhatdan authored Oct 12, 2022
2 parents a344928 + 7b84a3a commit c1d832d
Showing 1 changed file with 74 additions and 47 deletions.
121 changes: 74 additions & 47 deletions pkg/systemd/notifyproxy/notifyproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,16 @@ type NotifyProxy struct {
connection *net.UnixConn
socketPath string
container Container // optional

// Channels for synchronizing the goroutine waiting for the READY
// message and the one checking if the optional container is still
// running.
errorChan chan error
readyChan chan bool
}

// New creates a NotifyProxy. The specified temp directory can be left empty.
// New creates a NotifyProxy that starts listening immediately. The specified
// temp directory can be left empty.
func New(tmpDir string) (*NotifyProxy, error) {
tempFile, err := os.CreateTemp(tmpDir, "-podman-notify-proxy.sock")
if err != nil {
Expand All @@ -69,7 +76,60 @@ func New(tmpDir string) (*NotifyProxy, error) {
return nil, err
}

return &NotifyProxy{connection: conn, socketPath: socketPath}, nil
errorChan := make(chan error, 1)
readyChan := make(chan bool, 1)

proxy := &NotifyProxy{
connection: conn,
socketPath: socketPath,
errorChan: errorChan,
readyChan: readyChan,
}

// Start waiting for the READY message in the background. This way,
// the proxy can be created prior to starting the container and
// circumvents a race condition on writing/reading on the socket.
proxy.waitForReady()

return proxy, nil
}

// waitForReady waits for the READY message in the background. The goroutine
// returns on receiving READY or when the socket is closed.
func (p *NotifyProxy) waitForReady() {
go func() {
// Read until the `READY` message is received or the connection
// is closed.
const bufferSize = 1024
sBuilder := strings.Builder{}
for {
for {
buffer := make([]byte, bufferSize)
num, err := p.connection.Read(buffer)
if err != nil {
if !errors.Is(err, io.EOF) {
p.errorChan <- err
return
}
}
sBuilder.Write(buffer[:num])
if num != bufferSize || buffer[num-1] == '\n' {
// Break as we read an entire line that
// we can inspect for the `READY`
// message.
break
}
}

for _, line := range strings.Split(sBuilder.String(), "\n") {
if line == daemon.SdNotifyReady {
p.readyChan <- true
return
}
}
sBuilder.Reset()
}
}()
}

// SocketPath returns the path of the socket the proxy is listening on.
Expand Down Expand Up @@ -105,54 +165,21 @@ type Container interface {
// the waiting gets canceled and ErrNoReadyMessage is returned.
func (p *NotifyProxy) WaitAndClose() error {
defer func() {
// Closing the socket/connection makes sure that the other
// goroutine reading/waiting for the READY message returns.
if err := p.close(); err != nil {
logrus.Errorf("Closing notify proxy: %v", err)
}
}()

// Since reading from the connection is blocking, we need to spin up two
// goroutines. One waiting for the `READY` message, the other waiting
// for the container to stop running.
errorChan := make(chan error, 1)
readyChan := make(chan bool, 1)

go func() {
// Read until the `READY` message is received or the connection
// is closed.
const bufferSize = 1024
sBuilder := strings.Builder{}
for {
for {
buffer := make([]byte, bufferSize)
num, err := p.connection.Read(buffer)
if err != nil {
if !errors.Is(err, io.EOF) {
errorChan <- err
return
}
}
sBuilder.Write(buffer[:num])
if num != bufferSize || buffer[num-1] == '\n' {
// Break as we read an entire line that
// we can inspect for the `READY`
// message.
break
}
}

for _, line := range strings.Split(sBuilder.String(), "\n") {
if line == daemon.SdNotifyReady {
readyChan <- true
return
}
}
sBuilder.Reset()
}
}()

// If the proxy has a container we need to watch it as it may exit
// without sending a READY message. The goroutine below returns when
// the container exits OR when the function returns (see deferred the
// cancel()) in which case we either we've either received the READY
// message or encountered an error reading from the socket.
if p.container != nil {
// Create a cancellable context to make sure the goroutine
// below terminates.
// below terminates on function return.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
Expand All @@ -162,11 +189,11 @@ func (p *NotifyProxy) WaitAndClose() error {
default:
state, err := p.container.State()
if err != nil {
errorChan <- err
p.errorChan <- err
return
}
if state != define.ContainerStateRunning {
errorChan <- fmt.Errorf("%w: %s", ErrNoReadyMessage, p.container.ID())
p.errorChan <- fmt.Errorf("%w: %s", ErrNoReadyMessage, p.container.ID())
return
}
time.Sleep(time.Second)
Expand All @@ -176,9 +203,9 @@ func (p *NotifyProxy) WaitAndClose() error {

// Wait for the ready/error channel.
select {
case <-readyChan:
case <-p.readyChan:
return nil
case err := <-errorChan:
case err := <-p.errorChan:
return err
}
}

0 comments on commit c1d832d

Please sign in to comment.