diff --git a/pkg/systemd/notifyproxy/notifyproxy.go b/pkg/systemd/notifyproxy/notifyproxy.go index 4b92d9e6c4..e9dd48c51b 100644 --- a/pkg/systemd/notifyproxy/notifyproxy.go +++ b/pkg/systemd/notifyproxy/notifyproxy.go @@ -45,9 +45,16 @@ type NotifyProxy struct { connection *net.UnixConn socketPath string container Container // optional + + // Channels for synchronizing the goroutine waiting for the READY + // message and the one checking if the optional container is still + // running. + errorChan chan error + readyChan chan bool } -// New creates a NotifyProxy. The specified temp directory can be left empty. +// New creates a NotifyProxy that starts listening immediately. The specified +// temp directory can be left empty. func New(tmpDir string) (*NotifyProxy, error) { tempFile, err := os.CreateTemp(tmpDir, "-podman-notify-proxy.sock") if err != nil { @@ -69,7 +76,60 @@ func New(tmpDir string) (*NotifyProxy, error) { return nil, err } - return &NotifyProxy{connection: conn, socketPath: socketPath}, nil + errorChan := make(chan error, 1) + readyChan := make(chan bool, 1) + + proxy := &NotifyProxy{ + connection: conn, + socketPath: socketPath, + errorChan: errorChan, + readyChan: readyChan, + } + + // Start waiting for the READY message in the background. This way, + // the proxy can be created prior to starting the container and + // circumvents a race condition on writing/reading on the socket. + proxy.waitForReady() + + return proxy, nil +} + +// waitForReady waits for the READY message in the background. The goroutine +// returns on receiving READY or when the socket is closed. +func (p *NotifyProxy) waitForReady() { + go func() { + // Read until the `READY` message is received or the connection + // is closed. + const bufferSize = 1024 + sBuilder := strings.Builder{} + for { + for { + buffer := make([]byte, bufferSize) + num, err := p.connection.Read(buffer) + if err != nil { + if !errors.Is(err, io.EOF) { + p.errorChan <- err + return + } + } + sBuilder.Write(buffer[:num]) + if num != bufferSize || buffer[num-1] == '\n' { + // Break as we read an entire line that + // we can inspect for the `READY` + // message. + break + } + } + + for _, line := range strings.Split(sBuilder.String(), "\n") { + if line == daemon.SdNotifyReady { + p.readyChan <- true + return + } + } + sBuilder.Reset() + } + }() } // SocketPath returns the path of the socket the proxy is listening on. @@ -105,54 +165,21 @@ type Container interface { // the waiting gets canceled and ErrNoReadyMessage is returned. func (p *NotifyProxy) WaitAndClose() error { defer func() { + // Closing the socket/connection makes sure that the other + // goroutine reading/waiting for the READY message returns. if err := p.close(); err != nil { logrus.Errorf("Closing notify proxy: %v", err) } }() - // Since reading from the connection is blocking, we need to spin up two - // goroutines. One waiting for the `READY` message, the other waiting - // for the container to stop running. - errorChan := make(chan error, 1) - readyChan := make(chan bool, 1) - - go func() { - // Read until the `READY` message is received or the connection - // is closed. - const bufferSize = 1024 - sBuilder := strings.Builder{} - for { - for { - buffer := make([]byte, bufferSize) - num, err := p.connection.Read(buffer) - if err != nil { - if !errors.Is(err, io.EOF) { - errorChan <- err - return - } - } - sBuilder.Write(buffer[:num]) - if num != bufferSize || buffer[num-1] == '\n' { - // Break as we read an entire line that - // we can inspect for the `READY` - // message. - break - } - } - - for _, line := range strings.Split(sBuilder.String(), "\n") { - if line == daemon.SdNotifyReady { - readyChan <- true - return - } - } - sBuilder.Reset() - } - }() - + // If the proxy has a container we need to watch it as it may exit + // without sending a READY message. The goroutine below returns when + // the container exits OR when the function returns (see deferred the + // cancel()) in which case we either we've either received the READY + // message or encountered an error reading from the socket. if p.container != nil { // Create a cancellable context to make sure the goroutine - // below terminates. + // below terminates on function return. ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { @@ -162,11 +189,11 @@ func (p *NotifyProxy) WaitAndClose() error { default: state, err := p.container.State() if err != nil { - errorChan <- err + p.errorChan <- err return } if state != define.ContainerStateRunning { - errorChan <- fmt.Errorf("%w: %s", ErrNoReadyMessage, p.container.ID()) + p.errorChan <- fmt.Errorf("%w: %s", ErrNoReadyMessage, p.container.ID()) return } time.Sleep(time.Second) @@ -176,9 +203,9 @@ func (p *NotifyProxy) WaitAndClose() error { // Wait for the ready/error channel. select { - case <-readyChan: + case <-p.readyChan: return nil - case err := <-errorChan: + case err := <-p.errorChan: return err } }