Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use atomic.Bool for rungroup actors that should not run interrupt routines more than once #2012

Merged
merged 1 commit into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions cmd/launcher/signal_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log/slog"
"os"
"os/signal"
"sync/atomic"
"syscall"
)

Expand All @@ -13,7 +14,7 @@ type signalListener struct {
sigChannel chan os.Signal
cancel context.CancelFunc
slogger *slog.Logger
interrupted bool
interrupted atomic.Bool
}

func newSignalListener(sigChannel chan os.Signal, cancel context.CancelFunc, slogger *slog.Logger) *signalListener {
Expand All @@ -36,10 +37,11 @@ func (s *signalListener) Execute() error {

func (s *signalListener) Interrupt(_ error) {
// Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls.
if s.interrupted {
if s.interrupted.Load() {
return
}
s.interrupted = true

s.interrupted.Store(true)
s.cancel()
close(s.sigChannel)
}
7 changes: 4 additions & 3 deletions ee/agent/storage/bbolt/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log/slog"
"os"
"path/filepath"
"sync/atomic"
"time"

"github.com/kolide/launcher/ee/agent/types"
Expand All @@ -23,7 +24,7 @@ type databaseBackupSaver struct {
knapsack types.Knapsack
slogger *slog.Logger
interrupt chan struct{}
interrupted bool
interrupted atomic.Bool
}

func NewDatabaseBackupSaver(k types.Knapsack) *databaseBackupSaver {
Expand Down Expand Up @@ -71,10 +72,10 @@ func (d *databaseBackupSaver) Execute() error {

func (d *databaseBackupSaver) Interrupt(_ error) {
// Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls.
if d.interrupted {
if d.interrupted.Load() {
return
}
d.interrupted = true
d.interrupted.Store(true)

d.interrupt <- struct{}{}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"log/slog"
"sync/atomic"
"time"

"github.com/kolide/launcher/ee/agent/types"
Expand All @@ -33,7 +34,7 @@ type RemoteRestartConsumer struct {
slogger *slog.Logger
signalRestart chan error
interrupt chan struct{}
interrupted bool
interrupted atomic.Bool
}

type remoteRestartAction struct {
Expand Down Expand Up @@ -122,10 +123,10 @@ func (r *RemoteRestartConsumer) Execute() (err error) {
// and be shut down when the rungroup shuts down.
func (r *RemoteRestartConsumer) Interrupt(_ error) {
// Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls.
if r.interrupted {
if r.interrupted.Load() {
return
}
r.interrupted = true
r.interrupted.Store(true)

r.interrupt <- struct{}{}
}
7 changes: 4 additions & 3 deletions ee/debug/checkups/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"io"
"log/slog"
"sync/atomic"
"time"

"github.com/kolide/launcher/ee/agent/types"
Expand All @@ -14,7 +15,7 @@ type (
slogger *slog.Logger
knapsack types.Knapsack
interrupt chan struct{}
interrupted bool
interrupted atomic.Bool
}
)

Expand Down Expand Up @@ -49,11 +50,11 @@ func (c *logCheckPointer) Run() error {

func (c *logCheckPointer) Interrupt(_ error) {
// Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls.
if c.interrupted {
if c.interrupted.Load() {
return
}

c.interrupted = true
c.interrupted.Store(true)

c.interrupt <- struct{}{}
}
Expand Down
7 changes: 4 additions & 3 deletions ee/desktop/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/kolide/kit/ulid"
Expand Down Expand Up @@ -103,7 +104,7 @@ type DesktopUsersProcessesRunner struct {
// menuRefreshInterval is the interval on which the desktop menu will be refreshed
menuRefreshInterval time.Duration
interrupt chan struct{}
interrupted bool
interrupted atomic.Bool
// uidProcs is a map of uid to desktop process
uidProcs map[string]processRecord
// procsWg is a WaitGroup to wait for all desktop processes to finish during an interrupt
Expand Down Expand Up @@ -251,11 +252,11 @@ func (r *DesktopUsersProcessesRunner) Execute() error {
// It also signals the execute loop to exit, so new desktop processes cease to spawn.
func (r *DesktopUsersProcessesRunner) Interrupt(_ error) {
// Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls.
if r.interrupted {
if r.interrupted.Load() {
return
}

r.interrupted = true
r.interrupted.Store(true)

// Tell the execute loop to stop checking, and exit
r.interrupt <- struct{}{}
Expand Down
10 changes: 9 additions & 1 deletion ee/desktop/user/notify/notify_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
"log/slog"
"os"
"strings"
"sync/atomic"
"unsafe"
)

type macNotifier struct {
interrupt chan struct{}
interrupt chan struct{}
interrupted atomic.Bool
}

func NewDesktopNotifier(_ *slog.Logger, _ string) *macNotifier {
Expand All @@ -39,6 +41,12 @@ func (m *macNotifier) Execute() error {
}

func (m *macNotifier) Interrupt(err error) {
if m.interrupted.Load() {
return
}

m.interrupted.Store(true)

m.interrupt <- struct{}{}
}

Expand Down
7 changes: 7 additions & 0 deletions ee/desktop/user/notify/notify_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"log/slog"
"sync"
"sync/atomic"
"time"

"github.com/godbus/dbus/v5"
Expand All @@ -21,6 +22,7 @@ type dbusNotifier struct {
conn *dbus.Conn
signal chan *dbus.Signal
interrupt chan struct{}
interrupted atomic.Bool
sentNotificationIds map[uint32]bool
lock sync.RWMutex
}
Expand Down Expand Up @@ -129,6 +131,11 @@ func (d *dbusNotifier) Execute() error {
}

func (d *dbusNotifier) Interrupt(err error) {
if d.interrupted.Load() {
return
}
d.interrupted.Store(true)

d.interrupt <- struct{}{}

d.conn.RemoveSignal(d.signal)
Expand Down
7 changes: 7 additions & 0 deletions ee/desktop/user/notify/notify_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ package notify

import (
"log/slog"
"sync/atomic"

"github.com/kolide/toast"
)

type windowsNotifier struct {
iconFilepath string
interrupt chan struct{}
interrupted atomic.Bool
}

func NewDesktopNotifier(_ *slog.Logger, iconFilepath string) *windowsNotifier {
Expand All @@ -32,6 +34,11 @@ func (w *windowsNotifier) Execute() error {
func (w *windowsNotifier) Listen() {}

func (w *windowsNotifier) Interrupt(err error) {
if w.interrupted.Load() {
return
}
w.interrupted.Store(true)

w.interrupt <- struct{}{}
}

Expand Down
7 changes: 4 additions & 3 deletions ee/desktop/user/universallink/handler_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/url"
"os"
"strings"
"sync/atomic"
"unsafe"
)

Expand All @@ -28,7 +29,7 @@ import (
type universalLinkHandler struct {
urlInput chan string
slogger *slog.Logger
interrupted bool
interrupted atomic.Bool
interrupt chan struct{}
}

Expand Down Expand Up @@ -73,10 +74,10 @@ func (u *universalLinkHandler) Interrupt(_ error) {
)

// Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls.
if u.interrupted {
if u.interrupted.Load() {
return
}
u.interrupted = true
u.interrupted.Store(true)

u.interrupt <- struct{}{}
close(u.urlInput)
Expand Down
7 changes: 4 additions & 3 deletions ee/desktop/user/universallink/handler_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ package universallink

import (
"log/slog"
"sync/atomic"
)

// On other OSes, universal link handling is a no-op.
type noopUniversalLinkHandler struct {
unusedInput chan string
interrupted bool
interrupted atomic.Bool
interrupt chan struct{}
}

Expand All @@ -29,10 +30,10 @@ func (n *noopUniversalLinkHandler) Execute() error {

func (n *noopUniversalLinkHandler) Interrupt(_ error) {
// Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls.
if n.interrupted {
if n.interrupted.Load() {
return
}
n.interrupted = true
n.interrupted.Store(true)

n.interrupt <- struct{}{}
close(n.unusedInput)
Expand Down
7 changes: 4 additions & 3 deletions ee/powereventwatcher/power_event_watcher_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ package powereventwatcher
import (
"context"
"log/slog"
"sync/atomic"

"github.com/kolide/launcher/ee/agent/types"
"github.com/kolide/launcher/pkg/traces"
)

type noOpPowerEventWatcher struct {
interrupt chan struct{}
interrupted bool
interrupted atomic.Bool
}

type noOpKnapsackSleepStateUpdater struct{}
Expand All @@ -38,11 +39,11 @@ func (n *noOpPowerEventWatcher) Execute() error {

func (n *noOpPowerEventWatcher) Interrupt(_ error) {
// Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls.
if n.interrupted {
if n.interrupted.Load() {
return
}

n.interrupted = true
n.interrupted.Store(true)

n.interrupt <- struct{}{}
}
7 changes: 4 additions & 3 deletions ee/powereventwatcher/power_event_watcher_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"log/slog"
"sync"
"sync/atomic"
"syscall"
"unsafe"

Expand All @@ -35,7 +36,7 @@ type (
unsubscribeProcedure *syscall.LazyProc
renderEventLogProcedure *syscall.LazyProc
interrupt chan struct{}
interrupted bool
interrupted atomic.Bool
}

// powerEventSubscriber is an interface to be implemented by anything utilizing the power event updates.
Expand Down Expand Up @@ -234,11 +235,11 @@ func (p *powerEventWatcher) Execute() error {

func (p *powerEventWatcher) Interrupt(_ error) {
// Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls.
if p.interrupted {
if p.interrupted.Load() {
return
}

p.interrupted = true
p.interrupted.Store(true)

// EvtClose: https://learn.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtclose
ret, _, err := p.unsubscribeProcedure.Call(p.subscriptionHandle)
Expand Down
7 changes: 4 additions & 3 deletions ee/tuf/autoupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"slices"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/kolide/kit/version"
Expand Down Expand Up @@ -95,7 +96,7 @@ type TufAutoupdater struct {
initialDelayEnd time.Time
updateLock *sync.Mutex
interrupt chan struct{}
interrupted bool
interrupted atomic.Bool
signalRestart chan error
slogger *slog.Logger
restartFuncs map[autoupdatableBinary]func() error
Expand Down Expand Up @@ -273,10 +274,10 @@ func (ta *TufAutoupdater) Execute() (err error) {

func (ta *TufAutoupdater) Interrupt(_ error) {
// Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls.
if ta.interrupted {
if ta.interrupted.Load() {
return
}
ta.interrupted = true
ta.interrupted.Store(true)

ta.interrupt <- struct{}{}
}
Expand Down
Loading
Loading