Skip to content

Commit

Permalink
merge branch 'pr-2923'
Browse files Browse the repository at this point in the history
Shiming Zhang (3):
  libct/cg/sd: add dbus manager
  libct/cg/sd: add renew dbus connection
  Privatize NewUserSystemDbus

Kir Kolyshkin (2):
  libct/cg/sd: add isDbusError
  libct/cg/sd: retry on dbus disconnect

LGTMs: mrunalp cyphar
Closes #2923
  • Loading branch information
cyphar committed Apr 28, 2021
2 parents b2eb908 + 47ef9a1 commit fac8492
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 82 deletions.
84 changes: 50 additions & 34 deletions libcontainer/cgroups/systemd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package systemd

import (
"bufio"
"context"
"fmt"
"math"
"os"
Expand All @@ -29,10 +28,6 @@ const (
)

var (
connOnce sync.Once
connDbus *systemdDbus.Conn
connErr error

versionOnce sync.Once
version int

Expand Down Expand Up @@ -292,19 +287,6 @@ func generateDeviceProperties(rules []*devices.Rule) ([]systemdDbus.Property, er
return properties, nil
}

// getDbusConnection lazy initializes systemd dbus connection
// and returns it
func getDbusConnection(rootless bool) (*systemdDbus.Conn, error) {
connOnce.Do(func() {
if rootless {
connDbus, connErr = NewUserSystemdDbus()
} else {
connDbus, connErr = systemdDbus.NewWithContext(context.TODO())
}
})
return connDbus, connErr
}

func newProp(name string, units interface{}) systemdDbus.Property {
return systemdDbus.Property{
Name: name,
Expand All @@ -320,19 +302,29 @@ func getUnitName(c *configs.Cgroup) string {
return c.Name
}

// isUnitExists returns true if the error is that a systemd unit already exists.
func isUnitExists(err error) bool {
// isDbusError returns true if the error is a specific dbus error.
func isDbusError(err error, name string) bool {
if err != nil {
if dbusError, ok := err.(dbus.Error); ok {
return strings.Contains(dbusError.Name, "org.freedesktop.systemd1.UnitExists")
var derr *dbus.Error
if errors.As(err, &derr) {
return strings.Contains(derr.Name, name)
}
}
return false
}

func startUnit(dbusConnection *systemdDbus.Conn, unitName string, properties []systemdDbus.Property) error {
// isUnitExists returns true if the error is that a systemd unit already exists.
func isUnitExists(err error) bool {
return isDbusError(err, "org.freedesktop.systemd1.UnitExists")
}

func startUnit(cm *dbusConnManager, unitName string, properties []systemdDbus.Property) error {
statusChan := make(chan string, 1)
if _, err := dbusConnection.StartTransientUnit(unitName, "replace", properties, statusChan); err == nil {
err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
_, err := c.StartTransientUnit(unitName, "replace", properties, statusChan)
return err
})
if err == nil {
timeout := time.NewTimer(30 * time.Second)
defer timeout.Stop()

Expand All @@ -341,11 +333,11 @@ func startUnit(dbusConnection *systemdDbus.Conn, unitName string, properties []s
close(statusChan)
// Please refer to https://godoc.org/github.com/coreos/go-systemd/dbus#Conn.StartUnit
if s != "done" {
dbusConnection.ResetFailedUnit(unitName)
resetFailedUnit(cm, unitName)
return errors.Errorf("error creating systemd unit `%s`: got `%s`", unitName, s)
}
case <-timeout.C:
dbusConnection.ResetFailedUnit(unitName)
resetFailedUnit(cm, unitName)
return errors.New("Timeout waiting for systemd to create " + unitName)
}
} else if !isUnitExists(err) {
Expand All @@ -355,9 +347,13 @@ func startUnit(dbusConnection *systemdDbus.Conn, unitName string, properties []s
return nil
}

func stopUnit(dbusConnection *systemdDbus.Conn, unitName string) error {
func stopUnit(cm *dbusConnManager, unitName string) error {
statusChan := make(chan string, 1)
if _, err := dbusConnection.StopUnit(unitName, "replace", statusChan); err == nil {
err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
_, err := c.StopUnit(unitName, "replace", statusChan)
return err
})
if err == nil {
select {
case s := <-statusChan:
close(statusChan)
Expand All @@ -372,10 +368,30 @@ func stopUnit(dbusConnection *systemdDbus.Conn, unitName string) error {
return nil
}

func systemdVersion(conn *systemdDbus.Conn) int {
func resetFailedUnit(cm *dbusConnManager, name string) {
err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
return c.ResetFailedUnit(name)
})
if err != nil {
logrus.Warnf("unable to reset failed unit: %v", err)
}
}

func setUnitProperties(cm *dbusConnManager, name string, properties ...systemdDbus.Property) error {
return cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
return c.SetUnitProperties(name, true, properties...)
})
}

func systemdVersion(cm *dbusConnManager) int {
versionOnce.Do(func() {
version = -1
verStr, err := conn.GetManagerProperty("Version")
var verStr string
err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
var err error
verStr, err = c.GetManagerProperty("Version")
return err
})
if err == nil {
version, err = systemdVersionAtoi(verStr)
}
Expand Down Expand Up @@ -403,10 +419,10 @@ func systemdVersionAtoi(verStr string) (int, error) {
return ver, errors.Wrapf(err, "can't parse version %s", verStr)
}

func addCpuQuota(conn *systemdDbus.Conn, properties *[]systemdDbus.Property, quota int64, period uint64) {
func addCpuQuota(cm *dbusConnManager, properties *[]systemdDbus.Property, quota int64, period uint64) {
if period != 0 {
// systemd only supports CPUQuotaPeriodUSec since v242
sdVer := systemdVersion(conn)
sdVer := systemdVersion(cm)
if sdVer >= 242 {
*properties = append(*properties,
newProp("CPUQuotaPeriodUSec", period))
Expand Down Expand Up @@ -437,13 +453,13 @@ func addCpuQuota(conn *systemdDbus.Conn, properties *[]systemdDbus.Property, quo
}
}

func addCpuset(conn *systemdDbus.Conn, props *[]systemdDbus.Property, cpus, mems string) error {
func addCpuset(cm *dbusConnManager, props *[]systemdDbus.Property, cpus, mems string) error {
if cpus == "" && mems == "" {
return nil
}

// systemd only supports AllowedCPUs/AllowedMemoryNodes since v244
sdVer := systemdVersion(conn)
sdVer := systemdVersion(cm)
if sdVer < 244 {
logrus.Debugf("systemd v%d is too old to support AllowedCPUs/AllowedMemoryNodes"+
" (settings will still be applied to cgroupfs)", sdVer)
Expand Down
90 changes: 90 additions & 0 deletions libcontainer/cgroups/systemd/dbus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// +build linux

package systemd

import (
"context"
"sync"

systemdDbus "github.com/coreos/go-systemd/v22/dbus"
dbus "github.com/godbus/dbus/v5"
)

type dbusConnManager struct {
conn *systemdDbus.Conn
rootless bool
sync.RWMutex
}

// newDbusConnManager initializes systemd dbus connection manager.
func newDbusConnManager(rootless bool) *dbusConnManager {
return &dbusConnManager{
rootless: rootless,
}
}

// getConnection lazily initializes and returns systemd dbus connection.
func (d *dbusConnManager) getConnection() (*systemdDbus.Conn, error) {
// In the case where d.conn != nil
// Use the read lock the first time to ensure
// that Conn can be acquired at the same time.
d.RLock()
if conn := d.conn; conn != nil {
d.RUnlock()
return conn, nil
}
d.RUnlock()

// In the case where d.conn == nil
// Use write lock to ensure that only one
// will be created
d.Lock()
defer d.Unlock()
if conn := d.conn; conn != nil {
return conn, nil
}

conn, err := d.newConnection()
if err != nil {
return nil, err
}
d.conn = conn
return conn, nil
}

func (d *dbusConnManager) newConnection() (*systemdDbus.Conn, error) {
if d.rootless {
return newUserSystemdDbus()
}
return systemdDbus.NewWithContext(context.TODO())
}

// resetConnection resets the connection to its initial state
// (so it can be reconnected if necessary).
func (d *dbusConnManager) resetConnection(conn *systemdDbus.Conn) {
d.Lock()
defer d.Unlock()
if d.conn != nil && d.conn == conn {
d.conn.Close()
d.conn = nil
}
}

var errDbusConnClosed = dbus.ErrClosed.Error()

// retryOnDisconnect calls op, and if the error it returns is about closed dbus
// connection, the connection is re-established and the op is retried. This helps
// with the situation when dbus is restarted and we have a stale connection.
func (d *dbusConnManager) retryOnDisconnect(op func(*systemdDbus.Conn) error) error {
for {
conn, err := d.getConnection()
if err != nil {
return err
}
err = op(conn)
if !isDbusError(err, errDbusConnClosed) {
return err
}
d.resetConnection(conn)
}
}
4 changes: 2 additions & 2 deletions libcontainer/cgroups/systemd/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"github.com/pkg/errors"
)

// NewUserSystemdDbus creates a connection for systemd user-instance.
func NewUserSystemdDbus() (*systemdDbus.Conn, error) {
// newUserSystemdDbus creates a connection for systemd user-instance.
func newUserSystemdDbus() (*systemdDbus.Conn, error) {
addr, err := DetectUserDbusSessionBusAddress()
if err != nil {
return nil, err
Expand Down
29 changes: 9 additions & 20 deletions libcontainer/cgroups/systemd/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ type legacyManager struct {
mu sync.Mutex
cgroups *configs.Cgroup
paths map[string]string
dbus *dbusConnManager
}

func NewLegacyManager(cg *configs.Cgroup, paths map[string]string) cgroups.Manager {
return &legacyManager{
cgroups: cg,
paths: paths,
dbus: newDbusConnManager(false),
}
}

Expand Down Expand Up @@ -56,7 +58,7 @@ var legacySubsystems = []subsystem{
&fs.NameGroup{GroupName: "name=systemd"},
}

func genV1ResourcesProperties(c *configs.Cgroup, conn *systemdDbus.Conn) ([]systemdDbus.Property, error) {
func genV1ResourcesProperties(c *configs.Cgroup, cm *dbusConnManager) ([]systemdDbus.Property, error) {
var properties []systemdDbus.Property
r := c.Resources

Expand All @@ -76,7 +78,7 @@ func genV1ResourcesProperties(c *configs.Cgroup, conn *systemdDbus.Conn) ([]syst
newProp("CPUShares", r.CpuShares))
}

addCpuQuota(conn, &properties, r.CpuQuota, r.CpuPeriod)
addCpuQuota(cm, &properties, r.CpuQuota, r.CpuPeriod)

if r.BlkioWeight != 0 {
properties = append(properties,
Expand All @@ -88,7 +90,7 @@ func genV1ResourcesProperties(c *configs.Cgroup, conn *systemdDbus.Conn) ([]syst
newProp("TasksMax", uint64(r.PidsLimit)))
}

err = addCpuset(conn, &properties, r.CpusetCpus, r.CpusetMems)
err = addCpuset(cm, &properties, r.CpusetCpus, r.CpusetMems)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -164,13 +166,9 @@ func (m *legacyManager) Apply(pid int) error {
properties = append(properties,
newProp("DefaultDependencies", false))

dbusConnection, err := getDbusConnection(false)
if err != nil {
return err
}
properties = append(properties, c.SystemdProps...)

if err := startUnit(dbusConnection, unitName, properties); err != nil {
if err := startUnit(m.dbus, unitName, properties); err != nil {
return err
}

Expand Down Expand Up @@ -208,13 +206,8 @@ func (m *legacyManager) Destroy() error {
m.mu.Lock()
defer m.mu.Unlock()

dbusConnection, err := getDbusConnection(false)
if err != nil {
return err
}
unitName := getUnitName(m.cgroups)
stopErr := stopUnit(m.dbus, getUnitName(m.cgroups))

stopErr := stopUnit(dbusConnection, unitName)
// Both on success and on error, cleanup all the cgroups we are aware of.
// Some of them were created directly by Apply() and are not managed by systemd.
if err := cgroups.RemovePaths(m.paths); err != nil {
Expand Down Expand Up @@ -341,11 +334,7 @@ func (m *legacyManager) Set(container *configs.Config) error {
if container.Cgroups.Resources.Unified != nil {
return cgroups.ErrV1NoUnified
}
dbusConnection, err := getDbusConnection(false)
if err != nil {
return err
}
properties, err := genV1ResourcesProperties(container.Cgroups, dbusConnection)
properties, err := genV1ResourcesProperties(container.Cgroups, m.dbus)
if err != nil {
return err
}
Expand Down Expand Up @@ -373,7 +362,7 @@ func (m *legacyManager) Set(container *configs.Config) error {
}
}

if err := dbusConnection.SetUnitProperties(getUnitName(container.Cgroups), true, properties...); err != nil {
if err := setUnitProperties(m.dbus, getUnitName(container.Cgroups), properties...); err != nil {
_ = m.Freeze(targetFreezerState)
return err
}
Expand Down
Loading

0 comments on commit fac8492

Please sign in to comment.