Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race conditions when creating watches #1113

Merged
merged 9 commits into from
Nov 25, 2020
Merged
4 changes: 4 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
|===
| | Description | PR

| 🐛
| Fix a race condition when using Kubernetes watches
| https://github.com/knative/client/pull/1113[#1113]

| 🐛
| Embed the namespace in request body while creating channels
| https://github.com/knative/client/pull/1117[#1117]
Expand Down
11 changes: 8 additions & 3 deletions pkg/eventing/v1beta1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,17 @@ func (c *knEventingClient) DeleteBroker(name string, timeout time.Duration) erro
return c.deleteBroker(name, apis_v1.DeletePropagationBackground)
}
waitC := make(chan error)
watcher, err := c.WatchBroker(name, timeout)
if err != nil {
return nil
}
defer watcher.Stop()
go func() {
waitForEvent := wait.NewWaitForEvent("broker", c.WatchBroker, func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitForEvent := wait.NewWaitForEvent("broker", func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(watcher, name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitC <- err
}()
err := c.deleteBroker(name, apis_v1.DeletePropagationForeground)
err = c.deleteBroker(name, apis_v1.DeletePropagationForeground)
if err != nil {
return err
}
Expand Down
29 changes: 22 additions & 7 deletions pkg/serving/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,17 @@ func (cl *knServingClient) DeleteService(serviceName string, timeout time.Durati
return cl.deleteService(serviceName, v1.DeletePropagationBackground)
}
waitC := make(chan error)
watcher, err := cl.WatchService(serviceName, timeout)
if err != nil {
return nil
}
defer watcher.Stop()
go func() {
waitForEvent := wait.NewWaitForEvent("service", cl.WatchService, func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(serviceName, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitForEvent := wait.NewWaitForEvent("service", func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(watcher, serviceName, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This closure works as long as watcher is in scope. Not sure how it works in go, but if this variable would go out of scope (e.g. when this function where it is allocated is left), wouldn't it be garbage collected and the go routine (in case it is still running) would then access a zombie watcher object ?

I know in this case this is a theoretical question, as we block on the channel before leaving the function, and only the goroutine can send to the channel.

So it's more a theoretical question, I guess :)

waitC <- err
}()
err := cl.deleteService(serviceName, v1.DeletePropagationForeground)
err = cl.deleteService(serviceName, v1.DeletePropagationForeground)
if err != nil {
return err
}
Expand All @@ -346,8 +351,13 @@ func (cl *knServingClient) deleteService(serviceName string, propagationPolicy v

// Wait for a service to become ready, but not longer than provided timeout
func (cl *knServingClient) WaitForService(name string, timeout time.Duration, msgCallback wait.MessageCallback) (error, time.Duration) {
waitForReady := wait.NewWaitForReady("service", cl.WatchService, serviceConditionExtractor)
return waitForReady.Wait(name, wait.Options{Timeout: &timeout}, msgCallback)
watcher, err := cl.WatchService(name, timeout)
if err != nil {
return err, timeout
}
defer watcher.Stop()
waitForReady := wait.NewWaitForReady("service", serviceConditionExtractor)
return waitForReady.Wait(watcher, name, wait.Options{Timeout: &timeout}, msgCallback)
}

// Get the configuration for a service
Expand Down Expand Up @@ -460,9 +470,14 @@ func (cl *knServingClient) DeleteRevision(name string, timeout time.Duration) er
return cl.deleteRevision(name)
}
waitC := make(chan error)
watcher, err := cl.WatchRevision(name, timeout)
if err != nil {
return err
}
defer watcher.Stop()
go func() {
waitForEvent := wait.NewWaitForEvent("revision", cl.WatchRevision, func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitForEvent := wait.NewWaitForEvent("revision", func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(watcher, name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitC <- err
}()
err = cl.deleteRevision(name)
Expand Down
39 changes: 12 additions & 27 deletions pkg/wait/wait_for_ready.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,14 @@ import (

// Callbacks and configuration used while waiting
type waitForReadyConfig struct {
watchMaker WatchMaker
conditionsExtractor ConditionsExtractor
kind string
}

// Callbacks and configuration used while waiting for event
type waitForEvent struct {
watchMaker WatchMaker
eventDone EventDone
kind string
eventDone EventDone
kind string
}

// EventDone is a marker to stop actual waiting on given event state
Expand All @@ -49,7 +47,7 @@ type Wait interface {
// Wait on resource the resource with this name
// and write event messages for unknown event to the status writer.
// Returns an error (if any) and the overall time it took to wait
Wait(name string, options Options, msgCallback MessageCallback) (error, time.Duration)
Wait(watcher watch.Interface, name string, options Options, msgCallback MessageCallback) (error, time.Duration)
}

type Options struct {
Expand All @@ -71,21 +69,19 @@ type ConditionsExtractor func(obj runtime.Object) (apis.Conditions, error)
type MessageCallback func(durationSinceState time.Duration, message string)

// NewWaitForReady waits until the condition is set to Ready == True
func NewWaitForReady(kind string, watchMaker WatchMaker, extractor ConditionsExtractor) Wait {
func NewWaitForReady(kind string, extractor ConditionsExtractor) Wait {
return &waitForReadyConfig{
kind: kind,
watchMaker: watchMaker,
conditionsExtractor: extractor,
}
}

// NewWaitForEvent creates a Wait object which waits until a specific event (i.e. when
// the EventDone function returns true)
func NewWaitForEvent(kind string, watchMaker WatchMaker, eventDone EventDone) Wait {
func NewWaitForEvent(kind string, eventDone EventDone) Wait {
return &waitForEvent{
kind: kind,
watchMaker: watchMaker,
eventDone: eventDone,
kind: kind,
eventDone: eventDone,
}
}

Expand All @@ -112,13 +108,13 @@ func NoopMessageCallback() MessageCallback {
// (e.g. "service"), `timeout` is a timeout after which the watch should be cancelled if no
// target state has been entered yet and `out` is used for printing out status messages
// msgCallback gets called for every event with an 'Ready' condition == UNKNOWN with the event's message.
func (w *waitForReadyConfig) Wait(name string, options Options, msgCallback MessageCallback) (error, time.Duration) {
func (w *waitForReadyConfig) Wait(watcher watch.Interface, name string, options Options, msgCallback MessageCallback) (error, time.Duration) {
akerekes marked this conversation as resolved.
Show resolved Hide resolved

timeout := options.timeoutWithDefault()
floatingTimeout := timeout
for {
start := time.Now()
retry, timeoutReached, err := w.waitForReadyCondition(start, name, floatingTimeout, options.errorWindowWithDefault(), msgCallback)
retry, timeoutReached, err := w.waitForReadyCondition(watcher, start, name, floatingTimeout, options.errorWindowWithDefault(), msgCallback)
if err != nil {
return err, time.Since(start)
}
Expand All @@ -141,13 +137,7 @@ func (w *waitForReadyConfig) Wait(name string, options Options, msgCallback Mess
// An errorWindow can be specified which takes into account of intermediate "false" ready conditions. So before returning
// an error, this methods waits for the errorWindow duration and if an "True" or "Unknown" event arrives in the meantime
// for the "Ready" condition, then the method continues to wait.
func (w *waitForReadyConfig) waitForReadyCondition(start time.Time, name string, timeout time.Duration, errorWindow time.Duration, msgCallback MessageCallback) (retry bool, timeoutReached bool, err error) {

watcher, err := w.watchMaker(name, timeout)
if err != nil {
return false, false, err
}
defer watcher.Stop()
func (w *waitForReadyConfig) waitForReadyCondition(watcher watch.Interface, start time.Time, name string, timeout time.Duration, errorWindow time.Duration, msgCallback MessageCallback) (retry bool, timeoutReached bool, err error) {

// channel used to transport the error that has been received
errChan := make(chan error)
Expand Down Expand Up @@ -239,13 +229,8 @@ func (w *waitForReadyConfig) waitForReadyCondition(start time.Time, name string,
}

// Wait until the expected EventDone is satisfied
func (w *waitForEvent) Wait(name string, options Options, msgCallback MessageCallback) (error, time.Duration) {
func (w *waitForEvent) Wait(watcher watch.Interface, name string, options Options, msgCallback MessageCallback) (error, time.Duration) {
akerekes marked this conversation as resolved.
Show resolved Hide resolved
timeout := options.timeoutWithDefault()
watcher, err := w.watchMaker(name, timeout)
if err != nil {
return err, 0
}
defer watcher.Stop()
start := time.Now()
// channel used to transport the error
errChan := make(chan error)
Expand All @@ -255,7 +240,7 @@ func (w *waitForEvent) Wait(name string, options Options, msgCallback MessageCal
select {
case <-timer.C:
return fmt.Errorf("timeout: %s '%s' not ready after %d seconds", w.kind, name, int(timeout/time.Second)), time.Since(start)
case err = <-errChan:
case err := <-errChan:
return err, time.Since(start)
case event := <-watcher.ResultChan():
if w.eventDone(&event) {
Expand Down
18 changes: 6 additions & 12 deletions pkg/wait/wait_for_ready_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,12 @@ func TestAddWaitForReady(t *testing.T) {

waitForReady := NewWaitForReady(
"blub",
func(name string, timeout time.Duration) (watch.Interface, error) {
return fakeWatchApi, nil
},
func(obj runtime.Object) (apis.Conditions, error) {
return apis.Conditions(obj.(*servingv1.Service).Status.Conditions), nil
})
fakeWatchApi.Start()
var msgs []string
err, _ := waitForReady.Wait("foobar", Options{Timeout: &tc.timeout}, func(_ time.Duration, msg string) {
err, _ := waitForReady.Wait(fakeWatchApi, "foobar", Options{Timeout: &tc.timeout}, func(_ time.Duration, msg string) {
msgs = append(msgs, msg)
})
close(fakeWatchApi.eventChan)
Expand All @@ -69,8 +66,8 @@ func TestAddWaitForReady(t *testing.T) {
// check messages
assert.Assert(t, cmp.DeepEqual(tc.messagesExpected, msgs), "%d: Messages expected to be equal", i)

if fakeWatchApi.StopCalled != 1 {
t.Errorf("%d: Exactly one 'stop' should be called, but got %d", i, fakeWatchApi.StopCalled)
if fakeWatchApi.StopCalled != 0 {
t.Errorf("%d: Exactly zero 'stop' should be called, but got %d", i, fakeWatchApi.StopCalled)
}

}
Expand All @@ -82,13 +79,10 @@ func TestAddWaitForDelete(t *testing.T) {

waitForEvent := NewWaitForEvent(
"blub",
func(name string, timeout time.Duration) (watch.Interface, error) {
return fakeWatchAPI, nil
},
func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
fakeWatchAPI.Start()

err, _ := waitForEvent.Wait("foobar", Options{Timeout: &tc.timeout}, NoopMessageCallback())
err, _ := waitForEvent.Wait(fakeWatchAPI, "foobar", Options{Timeout: &tc.timeout}, NoopMessageCallback())
close(fakeWatchAPI.eventChan)

if tc.errorText == "" && err != nil {
Expand All @@ -103,8 +97,8 @@ func TestAddWaitForDelete(t *testing.T) {
}
}

if fakeWatchAPI.StopCalled != 1 {
t.Errorf("%d: Exactly one 'stop' should be called, but got %d", i, fakeWatchAPI.StopCalled)
if fakeWatchAPI.StopCalled != 0 {
t.Errorf("%d: Exactly zero 'stop' should be called, but got %d", i, fakeWatchAPI.StopCalled)
}
}
}
Expand Down