Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Watch bookmarks. #9593

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion libcalico-go/lib/backend/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ type Client interface {
}

type WatchOptions struct {
Revision string
Revision string
AllowWatchBookmarks bool
}

type Syncer interface {
Expand Down Expand Up @@ -200,6 +201,7 @@ const (
WatchModified WatchEventType = "MODIFIED"
WatchDeleted WatchEventType = "DELETED"
WatchError WatchEventType = "ERROR"
WatchBookmark WatchEventType = "BOOKMARK"
)

// Event represents a single event to a watched resource.
Expand Down
9 changes: 8 additions & 1 deletion libcalico-go/lib/backend/k8s/resources/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,13 @@ func (pw *profileWatcher) processProfileEvents() {
switch e.Type {
case api.WatchModified, api.WatchAdded:
value = e.New.Value
case api.WatchBookmark:
if isNsEvent {
pw.k8sNSRev = e.New.Revision
} else {
pw.k8sSARev = e.New.Revision
}
e.New.Revision = pw.JoinProfileRevisions(pw.k8sNSRev, pw.k8sSARev)
case api.WatchDeleted:
value = e.Old.Value
}
Expand All @@ -444,7 +451,7 @@ func (pw *profileWatcher) processProfileEvents() {
}
oma.GetObjectMeta().SetResourceVersion(pw.JoinProfileRevisions(pw.k8sNSRev, pw.k8sSARev))
}
} else if e.Error == nil {
} else if e.Error == nil && e.Type != api.WatchBookmark {
log.WithField("event", e).Warning("Event without error or value")
}

Expand Down
5 changes: 3 additions & 2 deletions libcalico-go/lib/backend/k8s/resources/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ func ConvertK8sResourceToCalicoResource(res Resource) error {

func watchOptionsToK8sListOptions(wo api.WatchOptions) metav1.ListOptions {
return metav1.ListOptions{
ResourceVersion: wo.Revision,
Watch: true,
ResourceVersion: wo.Revision,
Watch: true,
AllowWatchBookmarks: wo.AllowWatchBookmarks,
}
}
12 changes: 11 additions & 1 deletion libcalico-go/lib/backend/k8s/resources/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,17 @@ func (crw *k8sWatcherConverter) convertEvent(kevent kwatch.Event) []*api.WatchEv
}

return crw.buildEventsFromKVPs(kvps, kevent.Type)

case kwatch.Bookmark:
// For bookmarks we send an empty KVPair with the current resource
// version only.
k8sRes := kevent.Object.(Resource)
revision := k8sRes.GetObjectMeta().GetResourceVersion()
return []*api.WatchEvent{{
Type: api.WatchBookmark,
New: &model.KVPair{
Revision: revision,
},
}}
default:
return []*api.WatchEvent{{
Type: api.WatchError,
Expand Down
2 changes: 1 addition & 1 deletion libcalico-go/lib/backend/k8s/resources/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var _ = Describe("Resources watcher ", func() {

It("should return error WatchEvent with unexpected kwatch event type", func() {
events := kwc.convertEvent(kwatch.Event{
Type: kwatch.Bookmark,
Type: "GARBAGE",
})
Expect(events).To(HaveLen(1))
Expect(events[0].Type).To(Equal(api.WatchError))
Expand Down
54 changes: 54 additions & 0 deletions libcalico-go/lib/backend/watchersyncer/reflector_ports.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Extracted from on Kubernetes reflector.go, Copyright 2014 The Kubernetes Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package watchersyncer

import (
"strings"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// Copied from client-go's reflector.go
func isTooLargeResourceVersionError(err error) bool {
if apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge) {
return true
}
// In Kubernetes 1.17.0-1.18.5, the api server doesn't set the error status cause to
// metav1.CauseTypeResourceVersionTooLarge to indicate that the requested minimum resource
// version is larger than the largest currently available resource version. To ensure backward
// compatibility with these server versions we also need to detect the error based on the content
// of the error message field.
if !apierrors.IsTimeout(err) {
return false
}
apierr, ok := err.(apierrors.APIStatus)
if !ok || apierr == nil || apierr.Status().Details == nil {
return false
}
for _, cause := range apierr.Status().Details.Causes {
// Matches the message returned by api server 1.17.0-1.18.5 for this error condition
if cause.Message == "Too large resource version" {
return true
}
}

// Matches the message returned by api server before 1.17.0
if strings.Contains(apierr.Status().Message, "Too large resource version") {
return true
}

return false
}
100 changes: 76 additions & 24 deletions libcalico-go/lib/backend/watchersyncer/watchercache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ package watchersyncer

import (
"context"
"errors"
"time"

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/errors"
kerrors "k8s.io/apimachinery/pkg/api/errors"
utilnet "k8s.io/apimachinery/pkg/util/net"

"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
"github.com/projectcalico/calico/libcalico-go/lib/backend/model"
Expand All @@ -36,18 +38,23 @@ import (
// - An api.Update
// - A api.SyncStatus (only for the very first InSync notification)
type watcherCache struct {
logger *logrus.Entry
client api.Client
watch api.WatchInterface
resources map[string]cacheEntry
oldResources map[string]cacheEntry
results chan<- interface{}
hasSynced bool
resourceType ResourceType
currentWatchRevision string
resyncBlockedUntil time.Time
logger *logrus.Entry
client api.Client
watch api.WatchInterface
resources map[string]cacheEntry
oldResources map[string]cacheEntry
results chan<- interface{}
hasSynced bool
resourceType ResourceType
currentWatchRevision string
errorCountAtCurrentRev int
resyncBlockedUntil time.Time
}

const (
maxErrorsPerRevision = 5
)

var (
MinResyncInterval = 500 * time.Millisecond
ListRetryInterval = 1000 * time.Millisecond
Expand Down Expand Up @@ -117,13 +124,29 @@ mainLoop:
}
kvp.Value = nil
wc.handleWatchListEvent(kvp)
case api.WatchBookmark:
wc.logger.WithField("newRevision", event.New.Revision).Debug("Watch bookmark received")
wc.currentWatchRevision = event.New.Revision
wc.errorCountAtCurrentRev = 0
case api.WatchError:
// Handle a WatchError. This error triggered from upstream, all type
// of WatchError are treated equally,log the Error and trigger a full resync. We only log at info
// because errors may occur due to compaction causing revisions to no longer be valid - in this case
// we simply need to do a full resync.
wc.logger.WithError(event.Error).Infof("Watch error received from Upstream")
wc.currentWatchRevision = "0"
if kerrors.IsResourceExpired(event.Error) {
// Our current watch revision is too old. We hit this path after the API server restarts
// (and presumably does an immediate compaction).
wc.logger.WithError(event.Error).Info("Watch has expired, triggering full resync.")
wc.resetWatchRevisionForFullResync()
} else {
wc.logger.WithError(event.Error).Warn("Unknown watch error event received, restarting the watch.")
wc.errorCountAtCurrentRev++
if wc.errorCountAtCurrentRev > maxErrorsPerRevision {
// Too many errors at the current revision, trigger a full resync.
wc.logger.Warn("Too many errors at current revision, triggering full resync")
wc.resetWatchRevisionForFullResync()
}
}
wc.resyncAndCreateWatcher(ctx)
default:
// Unknown event type - not much we can do other than log.
Expand Down Expand Up @@ -159,7 +182,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) {
wc.cleanExistingWatcher()

// If we don't have a currentWatchRevision then we need to perform a full resync.
performFullResync := wc.currentWatchRevision == "0"
var performFullResync bool
for {
select {
case <-ctx.Done():
Expand All @@ -177,6 +200,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) {
// watch immediately ends.
wc.resyncBlockedUntil = time.Now().Add(MinResyncInterval)

performFullResync = performFullResync || wc.currentWatchRevision == "0"
if performFullResync {
wc.logger.Info("Full resync is required")

Expand All @@ -192,10 +216,10 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) {
if err != nil {
// Failed to perform the list. Pause briefly (so we don't tight loop) and retry.
wc.logger.WithError(err).Info("Failed to perform list of current data during resync")
if errors.IsResourceExpired(err) {
if kerrors.IsResourceExpired(err) {
// Our current watch revision is too old. Start again without a revision.
wc.logger.Info("Clearing cached watch revision for next List call")
wc.currentWatchRevision = "0"
wc.resetWatchRevisionForFullResync()
}
wc.resyncBlockedUntil = time.Now().Add(ListRetryInterval)
continue
Expand All @@ -219,6 +243,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) {

// Store the current watch revision. This gets updated on any new add/modified event.
wc.currentWatchRevision = l.Revision
wc.errorCountAtCurrentRev = 0

// Mark the resync as complete.
performFullResync = false
Expand All @@ -227,17 +252,34 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) {
// And now start watching from the revision returned by the List, or from a previous watch event
// (depending on whether we were performing a full resync).
w, err := wc.client.Watch(ctx, wc.resourceType.ListInterface, api.WatchOptions{
Revision: wc.currentWatchRevision,
Revision: wc.currentWatchRevision,
AllowWatchBookmarks: true,
})
if err != nil {
// Failed to create the watcher - we'll need to retry.
switch err.(type) {
case cerrors.ErrorOperationNotSupported, cerrors.ErrorResourceDoesNotExist:
if kerrors.IsResourceExpired(err) || kerrors.IsGone(err) || isTooLargeResourceVersionError(err) {
// Our current watch revision is too old (or too new!). Start again
// without a revision. Condition cribbed from client-go's reflector.
wc.logger.Info("Watch has expired, queueing full resync.")
wc.resetWatchRevisionForFullResync()
continue
}

if utilnet.IsConnectionRefused(err) || kerrors.IsTooManyRequests(err) {
// Connection-related error, we can just retry without resetting
// the watch. Condition cribbed from client-go's reflector.
wc.logger.WithError(err).Warn("API server refused connection, will retry.")
continue
}

var errNotSupp cerrors.ErrorOperationNotSupported
var errNotExist cerrors.ErrorResourceDoesNotExist
if errors.As(err, &errNotSupp) ||
errors.As(err, &errNotExist) {
// Watch is not supported on this resource type, either because the type fundamentally
// doesn't support it, or because there are no resources to watch yet (and Kubernetes won't
// let us watch if there are no resources yet). Pause for the watch poll interval.
// This loop effectively becomes a poll loop for this resource type.
wc.logger.Debug("Watch operation not supported")
wc.logger.Debug("Watch operation not supported; reverting to poll.")
wc.resyncBlockedUntil = time.Now().Add(WatchPollInterval)

// Make sure we force a re-list of the resource even if the watch previously succeeded
Expand All @@ -246,9 +288,13 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) {
continue
}

// We hit an error creating the Watch. Trigger a full resync.
wc.logger.WithError(err).WithField("performFullResync", performFullResync).Info("Failed to create watcher")
performFullResync = true
wc.logger.WithError(err).WithField("performFullResync", performFullResync).Warn(
"Failed to create watcher; will retry.")
wc.errorCountAtCurrentRev++
if wc.errorCountAtCurrentRev > maxErrorsPerRevision {
// Hitting repeated errors, try a full resync next time.
performFullResync = true
}
continue
}

Expand All @@ -259,6 +305,11 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) {
}
}

func (wc *watcherCache) resetWatchRevisionForFullResync() {
wc.currentWatchRevision = "0"
wc.errorCountAtCurrentRev = 0
}

var closedTimeC = make(chan time.Time)

func init() {
Expand Down Expand Up @@ -324,6 +375,7 @@ func (wc *watcherCache) finishResync() {
func (wc *watcherCache) handleWatchListEvent(kvp *model.KVPair) {
// Track the resource version from this watch/list event.
wc.currentWatchRevision = kvp.Revision
wc.errorCountAtCurrentRev = 0

if wc.resourceType.UpdateProcessor == nil {
// No update processor - handle immediately.
Expand Down
20 changes: 20 additions & 0 deletions libcalico-go/lib/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ type ErrorDatastoreError struct {
Identifier interface{}
}

func (e ErrorDatastoreError) Unwrap() error {
return e.Err
}

func (e ErrorDatastoreError) Error() string {
return e.Err.Error()
}
Expand Down Expand Up @@ -61,6 +65,10 @@ func (e ErrorResourceDoesNotExist) Error() string {
return fmt.Sprintf("resource does not exist: %v with error: %v", e.Identifier, e.Err)
}

func (e ErrorResourceDoesNotExist) Unwrap() error {
return e.Err
}

// Error indicating an operation is not supported.
type ErrorOperationNotSupported struct {
Operation string
Expand All @@ -83,6 +91,10 @@ type ErrorResourceAlreadyExists struct {
Identifier interface{}
}

func (e ErrorResourceAlreadyExists) Unwrap() error {
return e.Err
}

func (e ErrorResourceAlreadyExists) Error() string {
return fmt.Sprintf("resource already exists: %v", e.Identifier)
}
Expand All @@ -92,6 +104,10 @@ type ErrorConnectionUnauthorized struct {
Err error
}

func (e ErrorConnectionUnauthorized) Unwrap() error {
return e.Err
}

func (e ErrorConnectionUnauthorized) Error() string {
return fmt.Sprintf("connection is unauthorized: %v", e.Err)
}
Expand Down Expand Up @@ -151,6 +167,10 @@ type ErrorResourceUpdateConflict struct {
Identifier interface{}
}

func (e ErrorResourceUpdateConflict) Unwrap() error {
return e.Err
}

func (e ErrorResourceUpdateConflict) Error() string {
return fmt.Sprintf("update conflict: %v", e.Identifier)
}
Expand Down
Loading