Skip to content

Commit

Permalink
libct/cg/sd: retry on dbus disconnect
Browse files Browse the repository at this point in the history
Instead of reconnecting to dbus after some failed operations, and
returning an error (so a caller has to retry), reconnect AND retry
in place for all such operations.

This should fix issues caused by a stale dbus connection after e.g.
a dbus daemon restart.

Signed-off-by: Kir Kolyshkin <[email protected]>
  • Loading branch information
kolyshkin committed Apr 27, 2021
1 parent 6122bc8 commit 47ef9a1
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 74 deletions.
52 changes: 40 additions & 12 deletions libcontainer/cgroups/systemd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,13 @@ func isUnitExists(err error) bool {
return isDbusError(err, "org.freedesktop.systemd1.UnitExists")
}

func startUnit(dbusConnection *systemdDbus.Conn, unitName string, properties []systemdDbus.Property) error {
func startUnit(cm *dbusConnManager, unitName string, properties []systemdDbus.Property) error {
statusChan := make(chan string, 1)
if _, err := dbusConnection.StartTransientUnit(unitName, "replace", properties, statusChan); err == nil {
err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
_, err := c.StartTransientUnit(unitName, "replace", properties, statusChan)
return err
})
if err == nil {
timeout := time.NewTimer(30 * time.Second)
defer timeout.Stop()

Expand All @@ -329,11 +333,11 @@ func startUnit(dbusConnection *systemdDbus.Conn, unitName string, properties []s
close(statusChan)
// Please refer to https://godoc.org/github.com/coreos/go-systemd/dbus#Conn.StartUnit
if s != "done" {
dbusConnection.ResetFailedUnit(unitName)
resetFailedUnit(cm, unitName)
return errors.Errorf("error creating systemd unit `%s`: got `%s`", unitName, s)
}
case <-timeout.C:
dbusConnection.ResetFailedUnit(unitName)
resetFailedUnit(cm, unitName)
return errors.New("Timeout waiting for systemd to create " + unitName)
}
} else if !isUnitExists(err) {
Expand All @@ -343,9 +347,13 @@ func startUnit(dbusConnection *systemdDbus.Conn, unitName string, properties []s
return nil
}

func stopUnit(dbusConnection *systemdDbus.Conn, unitName string) error {
func stopUnit(cm *dbusConnManager, unitName string) error {
statusChan := make(chan string, 1)
if _, err := dbusConnection.StopUnit(unitName, "replace", statusChan); err == nil {
err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
_, err := c.StopUnit(unitName, "replace", statusChan)
return err
})
if err == nil {
select {
case s := <-statusChan:
close(statusChan)
Expand All @@ -360,10 +368,30 @@ func stopUnit(dbusConnection *systemdDbus.Conn, unitName string) error {
return nil
}

func systemdVersion(conn *systemdDbus.Conn) int {
func resetFailedUnit(cm *dbusConnManager, name string) {
err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
return c.ResetFailedUnit(name)
})
if err != nil {
logrus.Warnf("unable to reset failed unit: %v", err)
}
}

func setUnitProperties(cm *dbusConnManager, name string, properties ...systemdDbus.Property) error {
return cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
return c.SetUnitProperties(name, true, properties...)
})
}

func systemdVersion(cm *dbusConnManager) int {
versionOnce.Do(func() {
version = -1
verStr, err := conn.GetManagerProperty("Version")
var verStr string
err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
var err error
verStr, err = c.GetManagerProperty("Version")
return err
})
if err == nil {
version, err = systemdVersionAtoi(verStr)
}
Expand Down Expand Up @@ -391,10 +419,10 @@ func systemdVersionAtoi(verStr string) (int, error) {
return ver, errors.Wrapf(err, "can't parse version %s", verStr)
}

func addCpuQuota(conn *systemdDbus.Conn, properties *[]systemdDbus.Property, quota int64, period uint64) {
func addCpuQuota(cm *dbusConnManager, properties *[]systemdDbus.Property, quota int64, period uint64) {
if period != 0 {
// systemd only supports CPUQuotaPeriodUSec since v242
sdVer := systemdVersion(conn)
sdVer := systemdVersion(cm)
if sdVer >= 242 {
*properties = append(*properties,
newProp("CPUQuotaPeriodUSec", period))
Expand Down Expand Up @@ -425,13 +453,13 @@ func addCpuQuota(conn *systemdDbus.Conn, properties *[]systemdDbus.Property, quo
}
}

func addCpuset(conn *systemdDbus.Conn, props *[]systemdDbus.Property, cpus, mems string) error {
func addCpuset(cm *dbusConnManager, props *[]systemdDbus.Property, cpus, mems string) error {
if cpus == "" && mems == "" {
return nil
}

// systemd only supports AllowedCPUs/AllowedMemoryNodes since v244
sdVer := systemdVersion(conn)
sdVer := systemdVersion(cm)
if sdVer < 244 {
logrus.Debugf("systemd v%d is too old to support AllowedCPUs/AllowedMemoryNodes"+
" (settings will still be applied to cgroupfs)", sdVer)
Expand Down
27 changes: 14 additions & 13 deletions libcontainer/cgroups/systemd/dbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

systemdDbus "github.com/coreos/go-systemd/v22/dbus"
dbus "github.com/godbus/dbus/v5"
"github.com/sirupsen/logrus"
)

type dbusConnManager struct {
Expand Down Expand Up @@ -73,17 +72,19 @@ func (d *dbusConnManager) resetConnection(conn *systemdDbus.Conn) {

var errDbusConnClosed = dbus.ErrClosed.Error()

// checkAndReconnect checks if the connection is disconnected,
// and tries reconnect if it is.
func (d *dbusConnManager) checkAndReconnect(conn *systemdDbus.Conn, err error) {
if !isDbusError(err, errDbusConnClosed) {
return
}
d.resetConnection(conn)

// Try to reconnect
_, err = d.getConnection()
if err != nil {
logrus.Warnf("Dbus disconnected and failed to reconnect: %s", err)
// retryOnDisconnect calls op, and if the error it returns is about closed dbus
// connection, the connection is re-established and the op is retried. This helps
// with the situation when dbus is restarted and we have a stale connection.
func (d *dbusConnManager) retryOnDisconnect(op func(*systemdDbus.Conn) error) error {
for {
conn, err := d.getConnection()
if err != nil {
return err
}
err = op(conn)
if !isDbusError(err, errDbusConnClosed) {
return err
}
d.resetConnection(conn)
}
}
29 changes: 7 additions & 22 deletions libcontainer/cgroups/systemd/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ var legacySubsystems = []subsystem{
&fs.NameGroup{GroupName: "name=systemd"},
}

func genV1ResourcesProperties(c *configs.Cgroup, conn *systemdDbus.Conn) ([]systemdDbus.Property, error) {
func genV1ResourcesProperties(c *configs.Cgroup, cm *dbusConnManager) ([]systemdDbus.Property, error) {
var properties []systemdDbus.Property
r := c.Resources

Expand All @@ -78,7 +78,7 @@ func genV1ResourcesProperties(c *configs.Cgroup, conn *systemdDbus.Conn) ([]syst
newProp("CPUShares", r.CpuShares))
}

addCpuQuota(conn, &properties, r.CpuQuota, r.CpuPeriod)
addCpuQuota(cm, &properties, r.CpuQuota, r.CpuPeriod)

if r.BlkioWeight != 0 {
properties = append(properties,
Expand All @@ -90,7 +90,7 @@ func genV1ResourcesProperties(c *configs.Cgroup, conn *systemdDbus.Conn) ([]syst
newProp("TasksMax", uint64(r.PidsLimit)))
}

err = addCpuset(conn, &properties, r.CpusetCpus, r.CpusetMems)
err = addCpuset(cm, &properties, r.CpusetCpus, r.CpusetMems)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -166,14 +166,9 @@ func (m *legacyManager) Apply(pid int) error {
properties = append(properties,
newProp("DefaultDependencies", false))

dbusConnection, err := m.dbus.getConnection()
if err != nil {
return err
}
properties = append(properties, c.SystemdProps...)

if err := startUnit(dbusConnection, unitName, properties); err != nil {
m.dbus.checkAndReconnect(dbusConnection, err)
if err := startUnit(m.dbus, unitName, properties); err != nil {
return err
}

Expand Down Expand Up @@ -211,13 +206,8 @@ func (m *legacyManager) Destroy() error {
m.mu.Lock()
defer m.mu.Unlock()

dbusConnection, err := m.dbus.getConnection()
if err != nil {
return err
}
unitName := getUnitName(m.cgroups)
stopErr := stopUnit(m.dbus, getUnitName(m.cgroups))

stopErr := stopUnit(dbusConnection, unitName)
// Both on success and on error, cleanup all the cgroups we are aware of.
// Some of them were created directly by Apply() and are not managed by systemd.
if err := cgroups.RemovePaths(m.paths); err != nil {
Expand Down Expand Up @@ -344,11 +334,7 @@ func (m *legacyManager) Set(container *configs.Config) error {
if container.Cgroups.Resources.Unified != nil {
return cgroups.ErrV1NoUnified
}
dbusConnection, err := m.dbus.getConnection()
if err != nil {
return err
}
properties, err := genV1ResourcesProperties(container.Cgroups, dbusConnection)
properties, err := genV1ResourcesProperties(container.Cgroups, m.dbus)
if err != nil {
return err
}
Expand Down Expand Up @@ -376,8 +362,7 @@ func (m *legacyManager) Set(container *configs.Config) error {
}
}

if err := dbusConnection.SetUnitProperties(getUnitName(container.Cgroups), true, properties...); err != nil {
m.dbus.checkAndReconnect(dbusConnection, err)
if err := setUnitProperties(m.dbus, getUnitName(container.Cgroups), properties...); err != nil {
_ = m.Freeze(targetFreezerState)
return err
}
Expand Down
40 changes: 13 additions & 27 deletions libcontainer/cgroups/systemd/v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewUnifiedManager(config *configs.Cgroup, path string, rootless bool) cgrou
// For the list of keys, see https://www.kernel.org/doc/Documentation/cgroup-v2.txt
//
// For the list of systemd unit properties, see systemd.resource-control(5).
func unifiedResToSystemdProps(conn *systemdDbus.Conn, res map[string]string) (props []systemdDbus.Property, _ error) {
func unifiedResToSystemdProps(cm *dbusConnManager, res map[string]string) (props []systemdDbus.Property, _ error) {
var err error

for k, v := range res {
Expand Down Expand Up @@ -85,7 +85,7 @@ func unifiedResToSystemdProps(conn *systemdDbus.Conn, res map[string]string) (pr
return nil, fmt.Errorf("unified resource %q quota value conversion error: %w", k, err)
}
}
addCpuQuota(conn, &props, quota, period)
addCpuQuota(cm, &props, quota, period)

case "cpu.weight":
num, err := strconv.ParseUint(v, 10, 64)
Expand All @@ -105,7 +105,7 @@ func unifiedResToSystemdProps(conn *systemdDbus.Conn, res map[string]string) (pr
"cpuset.mems": "AllowedMemoryNodes",
}
// systemd only supports these properties since v244
sdVer := systemdVersion(conn)
sdVer := systemdVersion(cm)
if sdVer >= 244 {
props = append(props,
newProp(m[k], bits))
Expand Down Expand Up @@ -164,7 +164,7 @@ func unifiedResToSystemdProps(conn *systemdDbus.Conn, res map[string]string) (pr
return props, nil
}

func genV2ResourcesProperties(c *configs.Cgroup, conn *systemdDbus.Conn) ([]systemdDbus.Property, error) {
func genV2ResourcesProperties(c *configs.Cgroup, cm *dbusConnManager) ([]systemdDbus.Property, error) {
var properties []systemdDbus.Property
r := c.Resources

Expand Down Expand Up @@ -202,14 +202,14 @@ func genV2ResourcesProperties(c *configs.Cgroup, conn *systemdDbus.Conn) ([]syst
newProp("CPUWeight", r.CpuWeight))
}

addCpuQuota(conn, &properties, r.CpuQuota, r.CpuPeriod)
addCpuQuota(cm, &properties, r.CpuQuota, r.CpuPeriod)

if r.PidsLimit > 0 || r.PidsLimit == -1 {
properties = append(properties,
newProp("TasksMax", uint64(r.PidsLimit)))
}

err = addCpuset(conn, &properties, r.CpusetCpus, r.CpusetMems)
err = addCpuset(cm, &properties, r.CpusetCpus, r.CpusetMems)
if err != nil {
return nil, err
}
Expand All @@ -218,7 +218,7 @@ func genV2ResourcesProperties(c *configs.Cgroup, conn *systemdDbus.Conn) ([]syst

// convert Resources.Unified map to systemd properties
if r.Unified != nil {
unifiedProps, err := unifiedResToSystemdProps(conn, r.Unified)
unifiedProps, err := unifiedResToSystemdProps(cm, r.Unified)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -281,18 +281,13 @@ func (m *unifiedManager) Apply(pid int) error {
properties = append(properties,
newProp("DefaultDependencies", false))

dbusConnection, err := m.dbus.getConnection()
if err != nil {
return err
}
properties = append(properties, c.SystemdProps...)

if err := startUnit(dbusConnection, unitName, properties); err != nil {
m.dbus.checkAndReconnect(dbusConnection, err)
if err := startUnit(m.dbus, unitName, properties); err != nil {
return errors.Wrapf(err, "error while starting unit %q with properties %+v", unitName, properties)
}

if err = m.initPath(); err != nil {
if err := m.initPath(); err != nil {
return err
}
if err := fs2.CreateCgroupPath(m.path, m.cgroups); err != nil {
Expand All @@ -308,17 +303,13 @@ func (m *unifiedManager) Destroy() error {
m.mu.Lock()
defer m.mu.Unlock()

dbusConnection, err := m.dbus.getConnection()
if err != nil {
return err
}
unitName := getUnitName(m.cgroups)
if err := stopUnit(dbusConnection, unitName); err != nil {
if err := stopUnit(m.dbus, unitName); err != nil {
return err
}

// XXX this is probably not needed, systemd should handle it
err = os.Remove(m.path)
err := os.Remove(m.path)
if err != nil && !os.IsNotExist(err) {
return err
}
Expand Down Expand Up @@ -430,11 +421,7 @@ func (m *unifiedManager) GetStats() (*cgroups.Stats, error) {
}

func (m *unifiedManager) Set(container *configs.Config) error {
dbusConnection, err := m.dbus.getConnection()
if err != nil {
return err
}
properties, err := genV2ResourcesProperties(m.cgroups, dbusConnection)
properties, err := genV2ResourcesProperties(m.cgroups, m.dbus)
if err != nil {
return err
}
Expand Down Expand Up @@ -462,8 +449,7 @@ func (m *unifiedManager) Set(container *configs.Config) error {
}
}

if err := dbusConnection.SetUnitProperties(getUnitName(m.cgroups), true, properties...); err != nil {
m.dbus.checkAndReconnect(dbusConnection, err)
if err := setUnitProperties(m.dbus, getUnitName(m.cgroups), properties...); err != nil {
_ = m.Freeze(targetFreezerState)
return errors.Wrap(err, "error while setting unit properties")
}
Expand Down

0 comments on commit 47ef9a1

Please sign in to comment.