Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kine responds with the resourceVersion corresponding to the maxRowId globally, rather than the one associated with the specified prefix. #255

Closed
brandond opened this issue Dec 8, 2023 · 10 comments

Comments

@brandond
Copy link
Member

brandond commented Dec 8, 2023

from @qwtsc in #250 (comment)
I believe I may have found the actual reason for the problem. This Kubernetes feature depends on the GetCurrentResourceVersionFromStorage function to retrieve the latest version of a specified resource prefix. The function issues an empty list request with limit of 1, utilizing the ListAccessor interface to extract the resource version from the returned metadata.
However, it seems that Kine responds with the resourceVersion corresponding to the maxRowId globally, rather than the one associated with the specified prefix.

Ah, interesting. I believe that should be fixable.

Just to be clear, this is a List against the current resource type prefix - so the etcd client is expecting to see the latest revision of any key in that prefix, NOT the latest revision of the keystore as a whole.

What is the expected behavior if there are no keys in that prefix? I guess I'll have to test.

Originally posted by @brandond in #250 (comment)

@qwtsc
Copy link

qwtsc commented Dec 10, 2023

from @qwtsc in #250 (comment)
I believe I may have found the actual reason for the problem. This Kubernetes feature depends on the GetCurrentResourceVersionFromStorage function to retrieve the latest version of a specified resource prefix. The function issues an empty list request with limit of 1, utilizing the ListAccessor interface to extract the resource version from the returned metadata.
However, it seems that Kine responds with the resourceVersion corresponding to the maxRowId globally, rather than the one associated with the specified prefix.

Ah, interesting. I believe that should be fixable.

Just to be clear, this is a List against the current resource type prefix - so the etcd client is expecting to see the latest revision of any key in that prefix, NOT the latest revision of the keystore as a whole.

What is the expected behavior if there are no keys in that prefix? I guess I'll have to test.

Originally posted by @brandond in #250 (comment)

Apologies, I am wrong again. According to the following unit-test in Kubernetes, it seems that Kine's behavior is indeed what Kubernetes anticipates.

	// create a pod and make sure its RV is equal to the one maintained by etcd
	pod := createPod(makePod("pod-1"))
	currentStorageRV, err := storage.GetCurrentResourceVersionFromStorage(context.TODO(), etcdStorage, func() runtime.Object { return &example.PodList{} }, "/pods", "Pod")
	require.NoError(t, err)
	podRV, err := versioner.ParseResourceVersion(pod.ResourceVersion)
	require.NoError(t, err)
	require.Equal(t, currentStorageRV, podRV, "expected the global etcd RV to be equal to pod's RV")

	// now create a replicaset (new resource) and make sure the target function returns global etcd RV
	rs := createReplicaSet(makeReplicaSet("replicaset-1"))
	currentStorageRV, err = storage.GetCurrentResourceVersionFromStorage(context.TODO(), etcdStorage, func() runtime.Object { return &example.PodList{} }, "/pods", "Pod")
	require.NoError(t, err)
	rsRV, err := versioner.ParseResourceVersion(rs.ResourceVersion)
	require.NoError(t, err)
	require.Equal(t, currentStorageRV, rsRV, "expected the global etcd RV to be equal to replicaset's RV")

Totally no idea now. It's possible that this Kubernetes feature is not yet stable enough for public use. I've decided to close this issue. Thanks all the same.

@brandond
Copy link
Member Author

Ok, will close.

If you do have a public reproducer available, let me know, I am curious what is going on.

@onprem
Copy link

onprem commented Jan 10, 2024

I implemented some the Progress Notify stuff in a private fork for exactly the same reason, using the streaming WatchList in our informers. The progress notification in etcd can be triggered by two different flows:

  1. The client sets the ProgressNotify option to true when a new watch stream is created using WatchRequest_CreateRequest message. This enables progress notification just for that stream, using a ticker, delivering a progress notification if no events have been sent for a while. I can see this already works in kine upstream, thanks @brandond!
  2. The other scenario is when a client sends a WatchRequest_ProgressRequest message. In this case etcd waits for all of the watch streams for that client to sync (i.e they haven't sent a event in some time and are up to date) and then sends a progress notification for all of the streams for that client.

It seems like K8s streaming WatchList uses both of these mechanisms, and there is also the Consistent List from Cache KEP which utilizes the second method explicitly for issuing bookmark events. I think implementing that would make streaming lists work.

We have been using WatchList with kine in production for a while and it seems to be working great for us.

@brandond
Copy link
Member Author

brandond commented Jan 10, 2024

@onprem can you link me to where/when the apiserver sends a ProgressRequest message? I can take a look at making sure that works.

Also, lets take this to a new issue so I have something to work against.

@qwtsc
Copy link

qwtsc commented Jan 11, 2024

Interesting observation @onprem

@qwtsc
Copy link

qwtsc commented Jan 11, 2024

@brandond you can check out the following code snippets.

func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) {
	ctx := wait.ContextForChannel(stopCh)
	go func() {
		defer utilruntime.HandleCrash()
		<-stopCh
		pr.mux.Lock()
		defer pr.mux.Unlock()
		pr.stopped = true
		pr.cond.Signal()
	}()
	ticker := pr.clock.NewTicker(progressRequestPeriod)
	defer ticker.Stop()
	for {
		stopped := func() bool {
			pr.mux.RLock()
			defer pr.mux.RUnlock()
			for pr.waiting == 0 && !pr.stopped {
				pr.cond.Wait()
			}
			return pr.stopped
		}()
		if stopped {
			return
		}

		select {
		case <-ticker.C():
			shouldRequest := func() bool {
				pr.mux.RLock()
				defer pr.mux.RUnlock()
				return pr.waiting > 0 && !pr.stopped
			}()
			if !shouldRequest {
				continue
			}
			err := pr.requestWatchProgress(ctx)
			if err != nil {
				klog.V(4).InfoS("Error requesting bookmark", "err", err)
			}
		case <-stopCh:
			return
		}
	}
	// RequestWatchProgress requests the a watch stream progress status be sent in the
	// watch response stream as soon as possible.
	// Used for monitor watch progress even if watching resources with no changes.
	//
	// If watch is lagging, progress status might:
	// * be pointing to stale resource version. Use etcd KV request to get linearizable resource version.
	// * not be delivered at all. It's recommended to poll request progress periodically.
	//
	// Note: Only watches with matching context grpc metadata will be notified.
	// https://github.com/kubernetes/kubernetes/blob/9325a57125e8502941d1b0c7379c4bb80a678d5c/vendor/go.etcd.io/etcd/client/v3/watch.go#L1037-L1042
	//
	// TODO: Remove when storage.Interface will be separate from etc3.store.
	// Deprecated: Added temporarily to simplify exposing RequestProgress for watch cache.
	RequestWatchProgress(ctx context.Context) error
func (s *store) RequestWatchProgress(ctx context.Context) error {
	// Use watchContext to match ctx metadata provided when creating the watch.
	// In best case scenario we would use the same context that watch was created, but there is no way access it from watchCache.
	return s.client.RequestProgress(s.watchContext(ctx))
}

@brandond
Copy link
Member Author

@qwtsc assuming these are all enabled on the apiserver, do you have a reproducer that will make the correct client calls to trigger this on the server side? I see that there is a TODO/Deprecated note on the code in question in the apiserver, so I'm not quite sure how to hit that path with a client request.

@qwtsc
Copy link

qwtsc commented Jan 12, 2024

@brandond I found a way to reproduce the watch-list request easily. Firstly, use kubectl proxy to bypass the authorization/certification issue.

kubectl proxy &

Then, use curl to create a watch-list request to apiserver, and please ensure you enabled watch-list feature in k8s apiserver.

curl 'http://localhost:8001/api/v1/namespaces/default/pods?allowWatchBookmarks=true&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&timeoutSeconds=565&watch=true'

Now You can dig out what's going on under the hood.

@qwtsc
Copy link

qwtsc commented Jan 12, 2024

@brandond I think this issue can be reopened, or recreate a new one to track the progress. I am still interested to enable k8s's watch-list with the support of kine.

@brandond
Copy link
Member Author

Moved to another issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants