Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Any plan to support ProgressNotify feature? #250

Closed
qwtsc opened this issue Dec 4, 2023 · 11 comments
Closed

Any plan to support ProgressNotify feature? #250

qwtsc opened this issue Dec 4, 2023 · 11 comments
Assignees

Comments

@qwtsc
Copy link

qwtsc commented Dec 4, 2023

Hi! I was wondering if there are any plans to implement the ProgressNotify feature that mirrors the functionality provided by the real etcd v3 server. Without support, it appears that we may not be able to activate this particular Kubernetes feature gate. We have observed that Custom Resource controllers with the watch-list feature enabled encounter issues during the initialization phase. It seems that the lack of ProgressNotify support could be a contributing factor to this problem. Thx.

@brandond
Copy link
Member

brandond commented Dec 4, 2023

makes watch server send periodic progress updates every 10 minutes when there is no incoming events.

I hadn't really been tracking that, but it seems easy enough. Is a 10 minute idle keepalive really the only thing that's required to support this?

controllers with the watch-list feature enabled encounter issues during the initialization phase

That doesn't seem like it would be related, as progress notify would only make a difference when there haven't been any events for a period of time - which would not apply to the initialization phase. What specific "issues" are you seeing? Can you provide steps to reproduce?

@brandond
Copy link
Member

brandond commented Dec 4, 2023

I will say that the upstream documentation for this client option doesn't seem to be exactly accurate - it says

WithProgressNotify makes watch server send periodic progress updates every 10 minutes when there is no incoming events.

What actually happens is that there is a configurable ticker interval on the etcd side, which defaults to 10 minutes. If a watcher has not received any events since the last time the ticker ticked, a WatchResponse is sent with an empty event list. So it's not exactly 10 minutes per client, it is more like "at least every 10 minutes", as the progress-needed flag is set on new watches when they are first created.

I don't think Kubernetes should care, but it is worth noting as a potential difference in implementation, if we don't do it exactly the same.

@brandond brandond self-assigned this Dec 5, 2023
@qwtsc
Copy link
Author

qwtsc commented Dec 5, 2023

What specific "issues" are you seeing?

When I enabled the watch-list feature, the cache_reflector in the operator became stuck at startup. Two minutes following the initial problem, the operator underwent a reboot due to a timeout. Below are the log excerpts that illustrate the issue.

I1205 14:58:57.254258       1 reflector.go:289] Starting reflector *v1beta1.Workflow (10h13m55.864994589s) from pkg/mod/k8s.io/[email protected]/tools/cache/reflector.go:229
I1205 14:58:57.254282       1 reflector.go:325] Listing and watching *v1beta1.Workflow from pkg/mod/k8s.io/[email protected]/tools/cache/reflector.go:229
I1205 14:58:57.254326       1 reflector.go:289] Starting reflector *v1beta1.WorkflowTopology (10h22m7.873616144s) from pkg/mod/k8s.io/[email protected]/tools/cache/reflector.go:229
I1205 14:58:57.254346       1 reflector.go:325] Listing and watching *v1beta1.WorkflowTopology from pkg/mod/k8s.io/[email protected]/tools/cache/reflector.go:229
I1205 14:58:57.256015       1 reflector.go:289] Starting reflector *v1beta1.DagRun (9h32m52.718727482s) from pkg/mod/k8s.io/[email protected]/tools/cache/reflector.go:229
I1205 14:58:57.256033       1 reflector.go:325] Listing and watching *v1beta1.DagRun from pkg/mod/k8s.io/[email protected]/tools/cache/reflector.go:229
I1205 15:00:57.144333       1 shared_informer.go:337] stop requested
I1205 15:00:57.144370       1 shared_informer.go:337] stop requested
I1205 15:00:57.144484       1 shared_informer.go:337] stop requested
I1205 15:00:57.144587       1 trace.go:236] Trace[1701663128]: "Reflector WatchList" name:pkg/mod/k8s.io/[email protected]/tools/cache/reflector.go:229 (05-Dec-2023 14:58:57.254) (total time: 119890ms):
Trace[1701663128]: [1m59.890206192s] [1m59.890206192s] END
2023-12-05T15:00:57+08:00	ERROR	Could not wait for Cache to sync	{"controller": "workflowtopology", "controllerGroup": "demo.com", "controllerKind": "WorkflowTopology", "error": "failed to wait for workflowtopology caches to sync: timed out waiting for cache to be synced for Kind *v1beta1.WorkflowTopology"}
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Start.func2.1
	/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:203
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Start.func2
I1205 15:00:57.144609       1 shared_informer.go:337] stop requested
	/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:208
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Start
	/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:234
sigs.k8s.io/controller-runtime/pkg/manager.(*runnableGroup).reconcile.func1
	/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/manager/runnable_group.go:223
2023-12-05T15:00:57+08:00	ERROR	Could not wait for Cache to sync	{"controller": "dagrun", "controllerGroup": "demo.com", "controllerKind": "DagRun", "error": "failed to wait for dagrun caches to sync: timed out waiting for cache to be synced for Kind *v1beta1.DagRun"}
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Start.func2.1
	/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:203
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Start.func2
	/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:208
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Start
	/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:234
sigs.k8s.io/controller-runtime/pkg/manager.(*runnableGroup).reconcile.func1
	/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/manager/runnable_group.go:223

Can you provide steps to reproduce?

use a common CR operator with a kine-based kube-apiserver, and set the environment variable ENABLE_CLIENT_GO_WATCH_LIST_ALPHA to true in the operator.

@qwtsc
Copy link
Author

qwtsc commented Dec 5, 2023

I will say that the upstream documentation for this client option doesn't seem to be exactly accurate - it says

WithProgressNotify makes watch server send periodic progress updates every 10 minutes when there is no incoming events.

What actually happens is that there is a configurable ticker interval on the etcd side, which defaults to 10 minutes. If a watcher has not received any events since the last time the ticker ticked, a WatchResponse is sent with an empty event list. So it's not exactly 10 minutes per client, it is more like "at least every 10 minutes", as the progress-needed flag is set on new watches when they are first created.

I don't think Kubernetes should care, but it is worth noting as a potential difference in implementation, if we don't do it exactly the same.

The cause of this issue is speculative at the moment, as I am uncertain about the underlying reason, but once your PR is merged, I would perform another test and update you with the results.

@brandond
Copy link
Member

brandond commented Dec 5, 2023

use a common CR operator

Do you have a specific prebuilt example I can use?

@qwtsc
Copy link
Author

qwtsc commented Dec 6, 2023

use a common CR operator

Do you have a specific prebuilt example I can use?

Sorry, I don't have an open-source demo available at the moment. Let me see how I can reproduce this issue in a easy way in github.

@brandond
Copy link
Member

brandond commented Dec 8, 2023

@qwtsc
Copy link
Author

qwtsc commented Dec 8, 2023

Try with https://github.com/k3s-io/kine/releases/tag/v0.11.2

Bad news. The issue persists. I will investigate and debug kube-apiserver further to find the root cause.

@qwtsc
Copy link
Author

qwtsc commented Dec 8, 2023

I believe I may have found the actual reason for the problem. This Kubernetes feature depends on the GetCurrentResourceVersionFromStorage function to retrieve the latest version of a specified resource prefix. The function issues an empty list request with limit of 1, utilizing the ListAccessor interface to extract the resource version from the returned metadata, as demonstrated in the following:

func GetCurrentResourceVersionFromStorage(ctx context.Context, storage Interface, newListFunc func() runtime.Object, resourcePrefix, objectType string) (uint64, error) {
	if storage == nil {
		return 0, fmt.Errorf("storage wasn't provided for %s", objectType)
	}
	if newListFunc == nil {
		return 0, fmt.Errorf("newListFunction wasn't provided for %s", objectType)
	}
	emptyList := newListFunc()
	pred := SelectionPredicate{
		Label: labels.Everything(),
		Field: fields.Everything(),
		Limit: 1, // just in case we actually hit something
	}
	err := storage.GetList(ctx, resourcePrefix, ListOptions{Predicate: pred}, emptyList)
	if err != nil {
		return 0, err
	}
	emptyListAccessor, err := meta.ListAccessor(emptyList)
	if err != nil {
		return 0, err
	}
	if emptyListAccessor == nil {
		return 0, fmt.Errorf("unable to extract a list accessor from %T", emptyList)
	}
	currentResourceVersion, err := strconv.Atoi(emptyListAccessor.GetResourceVersion())
	if err != nil {
		return 0, err
	}
	if currentResourceVersion == 0 {
		return 0, fmt.Errorf("the current resource version must be greater than 0")
	}
	return uint64(currentResourceVersion), nil
}

However, it seems that Kine responds with the resourceVersion corresponding to the maxRowId globally, rather than the one associated with the specified prefix. This behavior is illustrated in the following code snippet:

 ---> NOTE: rev is maxRowId.
	rev, kvs, err := l.backend.List(ctx, prefix, start, limit, r.Revision)
	if err != nil {
		return nil, err
	}

	logrus.Tracef("LIST key=%s, end=%s, revision=%d, currentRev=%d count=%d, limit=%d", r.Key, r.RangeEnd, r.Revision, rev, len(kvs), r.Limit)
	resp := &RangeResponse{
		Header: txnHeader(rev),
		Count:  int64(len(kvs)),
		Kvs:    kvs,
	}

As a result, Kubernetes may disregard all bookmarks that fall below the maxRowId, as seen in the nonblockingAdd function of the cacheWatcher:

func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
	// if the bookmarkAfterResourceVersion hasn't been seen
	// we will try to deliver a bookmark event every second.
	// the following check will discard a bookmark event
	// if it is < than the bookmarkAfterResourceVersion
	// so that we don't pollute the input channel
	if event.Type == watch.Bookmark && event.ResourceVersion < c.bookmarkAfterResourceVersion {
		return false
	}
	select {
	case c.input <- event:
		c.markBookmarkAfterRvAsReceived(event)
		return true
	default:
		return false
	}
}

This issue likely leads to the incorrect handling of resource versions and could be the root cause of the observed problem.

@brandond
Copy link
Member

brandond commented Dec 8, 2023

Kine responds with the resourceVersion corresponding to the maxRowId globally, rather than the one associated with the specified prefix.

Ah, interesting. I believe that should be fixable.

Just to be clear, this is a List against the current resource type prefix - so the etcd client is expecting to see the latest revision of any key in that prefix, NOT the latest revision of the keystore as a whole.

What is the expected behavior if there are no keys in that prefix? I guess I'll have to test.

@brandond
Copy link
Member

brandond commented Dec 8, 2023

Moving to a new issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants