Skip to content

Commit

Permalink
Fix race conditions when creating watches (knative#1113)
Browse files Browse the repository at this point in the history
* Fix a race condition between creating a watch and initiating the action that emits the event it is watching for

* update changelog

* add PR ID to changelog entry

* Fix merge in Changelog

* Fix table format in Changelog
  • Loading branch information
navidshaikh committed Nov 25, 2020
1 parent ceeb35b commit 76f8233
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 49 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
|===
| | Description | PR

| 🐛
| Fix a race condition when using Kubernetes watches
| https://github.com/knative/client/pull/1113[#1113]

| 🐛
| Embed the namespace in request body while creating channels
| https://github.com/knative/client/pull/1117[#1117]
Expand Down
11 changes: 8 additions & 3 deletions pkg/eventing/v1beta1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,17 @@ func (c *knEventingClient) DeleteBroker(name string, timeout time.Duration) erro
return c.deleteBroker(name, apis_v1.DeletePropagationBackground)
}
waitC := make(chan error)
watcher, err := c.WatchBroker(name, timeout)
if err != nil {
return nil
}
defer watcher.Stop()
go func() {
waitForEvent := wait.NewWaitForEvent("broker", c.WatchBroker, func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitForEvent := wait.NewWaitForEvent("broker", func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(watcher, name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitC <- err
}()
err := c.deleteBroker(name, apis_v1.DeletePropagationForeground)
err = c.deleteBroker(name, apis_v1.DeletePropagationForeground)
if err != nil {
return err
}
Expand Down
29 changes: 22 additions & 7 deletions pkg/serving/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,17 @@ func (cl *knServingClient) DeleteService(serviceName string, timeout time.Durati
return cl.deleteService(serviceName, v1.DeletePropagationBackground)
}
waitC := make(chan error)
watcher, err := cl.WatchService(serviceName, timeout)
if err != nil {
return nil
}
defer watcher.Stop()
go func() {
waitForEvent := wait.NewWaitForEvent("service", cl.WatchService, func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(serviceName, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitForEvent := wait.NewWaitForEvent("service", func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(watcher, serviceName, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitC <- err
}()
err := cl.deleteService(serviceName, v1.DeletePropagationForeground)
err = cl.deleteService(serviceName, v1.DeletePropagationForeground)
if err != nil {
return err
}
Expand All @@ -346,8 +351,13 @@ func (cl *knServingClient) deleteService(serviceName string, propagationPolicy v

// Wait for a service to become ready, but not longer than provided timeout
func (cl *knServingClient) WaitForService(name string, timeout time.Duration, msgCallback wait.MessageCallback) (error, time.Duration) {
waitForReady := wait.NewWaitForReady("service", cl.WatchService, serviceConditionExtractor)
return waitForReady.Wait(name, wait.Options{Timeout: &timeout}, msgCallback)
watcher, err := cl.WatchService(name, timeout)
if err != nil {
return err, timeout
}
defer watcher.Stop()
waitForReady := wait.NewWaitForReady("service", serviceConditionExtractor)
return waitForReady.Wait(watcher, name, wait.Options{Timeout: &timeout}, msgCallback)
}

// Get the configuration for a service
Expand Down Expand Up @@ -460,9 +470,14 @@ func (cl *knServingClient) DeleteRevision(name string, timeout time.Duration) er
return cl.deleteRevision(name)
}
waitC := make(chan error)
watcher, err := cl.WatchRevision(name, timeout)
if err != nil {
return err
}
defer watcher.Stop()
go func() {
waitForEvent := wait.NewWaitForEvent("revision", cl.WatchRevision, func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitForEvent := wait.NewWaitForEvent("revision", func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(watcher, name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitC <- err
}()
err = cl.deleteRevision(name)
Expand Down
39 changes: 12 additions & 27 deletions pkg/wait/wait_for_ready.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,14 @@ import (

// Callbacks and configuration used while waiting
type waitForReadyConfig struct {
watchMaker WatchMaker
conditionsExtractor ConditionsExtractor
kind string
}

// Callbacks and configuration used while waiting for event
type waitForEvent struct {
watchMaker WatchMaker
eventDone EventDone
kind string
eventDone EventDone
kind string
}

// EventDone is a marker to stop actual waiting on given event state
Expand All @@ -49,7 +47,7 @@ type Wait interface {
// Wait on resource the resource with this name
// and write event messages for unknown event to the status writer.
// Returns an error (if any) and the overall time it took to wait
Wait(name string, options Options, msgCallback MessageCallback) (error, time.Duration)
Wait(watcher watch.Interface, name string, options Options, msgCallback MessageCallback) (error, time.Duration)
}

type Options struct {
Expand All @@ -71,21 +69,19 @@ type ConditionsExtractor func(obj runtime.Object) (apis.Conditions, error)
type MessageCallback func(durationSinceState time.Duration, message string)

// NewWaitForReady waits until the condition is set to Ready == True
func NewWaitForReady(kind string, watchMaker WatchMaker, extractor ConditionsExtractor) Wait {
func NewWaitForReady(kind string, extractor ConditionsExtractor) Wait {
return &waitForReadyConfig{
kind: kind,
watchMaker: watchMaker,
conditionsExtractor: extractor,
}
}

// NewWaitForEvent creates a Wait object which waits until a specific event (i.e. when
// the EventDone function returns true)
func NewWaitForEvent(kind string, watchMaker WatchMaker, eventDone EventDone) Wait {
func NewWaitForEvent(kind string, eventDone EventDone) Wait {
return &waitForEvent{
kind: kind,
watchMaker: watchMaker,
eventDone: eventDone,
kind: kind,
eventDone: eventDone,
}
}

Expand All @@ -112,13 +108,13 @@ func NoopMessageCallback() MessageCallback {
// (e.g. "service"), `timeout` is a timeout after which the watch should be cancelled if no
// target state has been entered yet and `out` is used for printing out status messages
// msgCallback gets called for every event with an 'Ready' condition == UNKNOWN with the event's message.
func (w *waitForReadyConfig) Wait(name string, options Options, msgCallback MessageCallback) (error, time.Duration) {
func (w *waitForReadyConfig) Wait(watcher watch.Interface, name string, options Options, msgCallback MessageCallback) (error, time.Duration) {

timeout := options.timeoutWithDefault()
floatingTimeout := timeout
for {
start := time.Now()
retry, timeoutReached, err := w.waitForReadyCondition(start, name, floatingTimeout, options.errorWindowWithDefault(), msgCallback)
retry, timeoutReached, err := w.waitForReadyCondition(watcher, start, name, floatingTimeout, options.errorWindowWithDefault(), msgCallback)
if err != nil {
return err, time.Since(start)
}
Expand All @@ -141,13 +137,7 @@ func (w *waitForReadyConfig) Wait(name string, options Options, msgCallback Mess
// An errorWindow can be specified which takes into account of intermediate "false" ready conditions. So before returning
// an error, this methods waits for the errorWindow duration and if an "True" or "Unknown" event arrives in the meantime
// for the "Ready" condition, then the method continues to wait.
func (w *waitForReadyConfig) waitForReadyCondition(start time.Time, name string, timeout time.Duration, errorWindow time.Duration, msgCallback MessageCallback) (retry bool, timeoutReached bool, err error) {

watcher, err := w.watchMaker(name, timeout)
if err != nil {
return false, false, err
}
defer watcher.Stop()
func (w *waitForReadyConfig) waitForReadyCondition(watcher watch.Interface, start time.Time, name string, timeout time.Duration, errorWindow time.Duration, msgCallback MessageCallback) (retry bool, timeoutReached bool, err error) {

// channel used to transport the error that has been received
errChan := make(chan error)
Expand Down Expand Up @@ -239,13 +229,8 @@ func (w *waitForReadyConfig) waitForReadyCondition(start time.Time, name string,
}

// Wait until the expected EventDone is satisfied
func (w *waitForEvent) Wait(name string, options Options, msgCallback MessageCallback) (error, time.Duration) {
func (w *waitForEvent) Wait(watcher watch.Interface, name string, options Options, msgCallback MessageCallback) (error, time.Duration) {
timeout := options.timeoutWithDefault()
watcher, err := w.watchMaker(name, timeout)
if err != nil {
return err, 0
}
defer watcher.Stop()
start := time.Now()
// channel used to transport the error
errChan := make(chan error)
Expand All @@ -255,7 +240,7 @@ func (w *waitForEvent) Wait(name string, options Options, msgCallback MessageCal
select {
case <-timer.C:
return fmt.Errorf("timeout: %s '%s' not ready after %d seconds", w.kind, name, int(timeout/time.Second)), time.Since(start)
case err = <-errChan:
case err := <-errChan:
return err, time.Since(start)
case event := <-watcher.ResultChan():
if w.eventDone(&event) {
Expand Down
18 changes: 6 additions & 12 deletions pkg/wait/wait_for_ready_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,12 @@ func TestAddWaitForReady(t *testing.T) {

waitForReady := NewWaitForReady(
"blub",
func(name string, timeout time.Duration) (watch.Interface, error) {
return fakeWatchApi, nil
},
func(obj runtime.Object) (apis.Conditions, error) {
return apis.Conditions(obj.(*servingv1.Service).Status.Conditions), nil
})
fakeWatchApi.Start()
var msgs []string
err, _ := waitForReady.Wait("foobar", Options{Timeout: &tc.timeout}, func(_ time.Duration, msg string) {
err, _ := waitForReady.Wait(fakeWatchApi, "foobar", Options{Timeout: &tc.timeout}, func(_ time.Duration, msg string) {
msgs = append(msgs, msg)
})
close(fakeWatchApi.eventChan)
Expand All @@ -69,8 +66,8 @@ func TestAddWaitForReady(t *testing.T) {
// check messages
assert.Assert(t, cmp.DeepEqual(tc.messagesExpected, msgs), "%d: Messages expected to be equal", i)

if fakeWatchApi.StopCalled != 1 {
t.Errorf("%d: Exactly one 'stop' should be called, but got %d", i, fakeWatchApi.StopCalled)
if fakeWatchApi.StopCalled != 0 {
t.Errorf("%d: Exactly zero 'stop' should be called, but got %d", i, fakeWatchApi.StopCalled)
}

}
Expand All @@ -82,13 +79,10 @@ func TestAddWaitForDelete(t *testing.T) {

waitForEvent := NewWaitForEvent(
"blub",
func(name string, timeout time.Duration) (watch.Interface, error) {
return fakeWatchAPI, nil
},
func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
fakeWatchAPI.Start()

err, _ := waitForEvent.Wait("foobar", Options{Timeout: &tc.timeout}, NoopMessageCallback())
err, _ := waitForEvent.Wait(fakeWatchAPI, "foobar", Options{Timeout: &tc.timeout}, NoopMessageCallback())
close(fakeWatchAPI.eventChan)

if tc.errorText == "" && err != nil {
Expand All @@ -103,8 +97,8 @@ func TestAddWaitForDelete(t *testing.T) {
}
}

if fakeWatchAPI.StopCalled != 1 {
t.Errorf("%d: Exactly one 'stop' should be called, but got %d", i, fakeWatchAPI.StopCalled)
if fakeWatchAPI.StopCalled != 0 {
t.Errorf("%d: Exactly zero 'stop' should be called, but got %d", i, fakeWatchAPI.StopCalled)
}
}
}
Expand Down

0 comments on commit 76f8233

Please sign in to comment.