Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cannot cancel watch #33

Open
fofewyoung opened this issue Nov 7, 2019 · 18 comments
Open

cannot cancel watch #33

fofewyoung opened this issue Nov 7, 2019 · 18 comments
Assignees
Labels

Comments

@fofewyoung
Copy link

fofewyoung commented Nov 7, 2019

Describe the bug
i try to cancel a watch with below code, but failed.

To Reproduce

using Etcdserverpb;
using Google.Protobuf;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;

namespace dotnet_etcd
{
    class TestCancelWatch
    {
        dotnet_etcd.EtcdClient myConsulClient;

        const long WID = 4445;

        public void Start()
        {
            myConsulClient = new dotnet_etcd.EtcdClient("http://127.0.0.1:2379");
            WatchKey("/zzz");
        }

        public void Stop()
        {
            UnWatch();
        }

        void WatchKey(string keyName)
        {
            WatchRequest req = new WatchRequest {
                CreateRequest = new WatchCreateRequest
                {
                    Key = ByteString.CopyFromUtf8(keyName),
                    WatchId = WID,
                }
            };

            myConsulClient.Watch(req, (Etcdserverpb.WatchResponse respon) => {
                Debug.Assert(respon.WatchId == WID);
                Console.WriteLine("watch event!!!");
            });
        }

        // unwatch
        public void UnWatch()
        {
            WatchRequest req = new WatchRequest
            {
                CancelRequest = new WatchCancelRequest
                {
                    WatchId = WID
                }
            };

            myConsulClient.Watch(req, (Etcdserverpb.WatchResponse respon) => {

                // never be executed!!!! bug?
                Console.WriteLine($"cancel:{respon.Canceled}: {respon.CancelReason}");
            });
        }
    }


    class Program
    {
        static void Main(string[] args)
        {
            var watch = new TestCancelWatch();

            while (true)
            {
                var str = Console.ReadLine();
                if (str == "s")
                {
                    watch.Start();
                }
                else if (str == "u")
                {
                    watch.Stop();
                }
            }
        }
    }
}

Expected behavior
input 's'
input 'u'
console print "cancel: ..."

Screenshots

Additional context
dotnet-etcd: 3.0.0
etcd:3.4.3

@fofewyoung
Copy link
Author

Hi, May I check if you read my questions please?
Maybe I know the reason, but i have no idea to fix it simply,
in dotnet-etcd, each watch operation has its own gRPC stream,
but cancel watch operation should use the streaming of watch.
cancel operation works after i hack the code to let all watch* share a single streaming.
Could you please help me deal with it?

@shubhamranjan
Copy link
Owner

Hi, May I check if you read my questions please?
Maybe I know the reason, but i have no idea to fix it simply,
in dotnet-etcd, each watch operation has its own gRPC stream,
but cancel watch operation should use the streaming of watch.
cancel operation works after i hack the code to let all watch* share a single streaming.
Could you please help me deal with it?

Hi, I have read your question. Will get back once I reproduce the issue and if a problem exists, will fix it.

Do let me know about your findings and how did you modify the code to get it working.

@shubhamranjan
Copy link
Owner

but cancel watch operation should use the streaming of watch.

Strange. What would be the use of watchId then. Need to dig a bit more in this.

@fofewyoung
Copy link
Author

fofewyoung commented Nov 11, 2019

hi, this is my hack code :
it is not thread safe, and not graceful

It should be noted that Annotated code:
// await watcher.RequestStream.CompleteAsync();

 public partial class EtcdClient : IDisposable
 {
        // the share streaming
        AsyncDuplexStreamingCall<WatchRequest, WatchResponse> myWatchStreaming = null;
        AsyncDuplexStreamingCall<WatchRequest, WatchResponse> GetWatchStreaming()
        {
            if (myWatchStreaming == null)
                myWatchStreaming = _balancer.GetConnection().watchClient.Watch(null);

            return myWatchStreaming;
        }
        
         // unwatch operation
        public async void UnWatch(long wid)
        {
            WatchRequest req = new WatchRequest
            {
                CancelRequest = new WatchCancelRequest
                {
                    WatchId = wid
                }
            };

            await GetWatchStreaming().RequestStream.WriteAsync(req);
        }

        #region Watch Key
        /// <summary>
        /// Watches a key according to the specified watch request and
        /// passes the watch response to the method provided.
        /// </summary>
        /// <param name="request">Watch Request containing key to be watched</param>
        /// <param name="method">Method to which watch response should be passed on</param>
        public async void Watch(WatchRequest request, Action<WatchResponse> method, Metadata headers = null)
        {
            bool success = false;
            int retryCount = 0;
            while (!success)
            {
                try
                {
                    var watcher = GetWatchStreaming();
                    {
                        Task watcherTask = Task.Run(async () =>
                        {
                            while (await watcher.ResponseStream.MoveNext())
                            {
                                WatchResponse update = watcher.ResponseStream.Current;
                                if (update.Canceled)
                                    break;

                                method(update);
                            }
                        });

                        await watcher.RequestStream.WriteAsync(request);
                        //await watcher.RequestStream.CompleteAsync();    // prevent  streaming to be closed
                        await watcherTask;
                    }
                    success = true;
                }
                catch (RpcException ex) when (ex.StatusCode == StatusCode.Unavailable)
                {
                    retryCount++;
                    if (retryCount >= _balancer._numNodes)
                    {
                        throw ex;
                    }
                }
            }

        }
 }

@shubhamranjan
Copy link
Owner

Thanks. I will look into this. A bit of a busy week, may not be able to come up with a solution soon enough.

@shubhamranjan
Copy link
Owner

Update: I was able to replicate the issue. Still working on the solution.

@shubhamranjan shubhamranjan self-assigned this Nov 20, 2019
@shubhamranjan shubhamranjan added the bug Something isn't working label Nov 20, 2019
@shubhamranjan
Copy link
Owner

Lets take an example where we have an etcd cluster and an application using this client. Suppose I have 3 instances of my application behind a load balancer and my application connects to etcd using this client. Now if I a make an api call to my application via loadbalancer for cancel watch request, I would never be able to gurantee for the request to land on the same server from where the watch was started making it difficult to cancel the watch. Will have to look into it why the watch ID is not being honored by etcd.

I will take this up ahead with etcd team.

@yoricksmeets
Copy link
Contributor

The watchId exists to manage different watches on the same stream to the server and is only scoped to that stream. See this discussion here.

What I assume is that if you close the response stream (or maybe both request and response stream but in this client the request stream is always closed after sending the initial requests) the watch is also cancelled, but I could not confirm this assumption yet.

@shubhamranjan
Copy link
Owner

So probably, we will have to maintain request streams on our end. Will try to re-design the watch implementation here to persist request streams. The discussion also states that each stream may share the same watch ID which becomes a bit difficult to manage on client side.

@yoricksmeets
Copy link
Contributor

@shubhamranjan I'm working on a watch manager to use in our distributed locking client. I can share the code when I have progressed a bit more. You could get some inspiration from it or we could adjust it to be included in this library if you think it fits the scope of this library.

The basic idea is as follows (WM = Watch Manager, WMS = Watch Manager Subscription):

  • The WM is created from a etcdclient instance and the WM will use this etcdclient instance to get AsyncDuplexStreamingCall for its request and response stream
  • The WM can issue WMS that are tracked by the WM
  • The WM will issue unique watchIds for all WMS issued by this WM
  • The WM creates a task that reads the response stream in a seperate task and dispatches the responses to the correct WMS based on the watchId
  • The WM will respond to any exceptions in the response stream reading task, if the stream fails the WM will resubscribe all WMS that it tracks on a newly created channel
  • WMS will implement IDisposable: disposing the WMS will cancel the subscription
  • WM will implement IDisposable: disposing the WM will close the grpc channel and dispose all WMS it was tracking
  • The etcd client will track all issued WMs and will dispose them in its own Dispose function

I'm not sure if a WMS needs to track versions of the keys it receives so when the WM needs to create a new stream and resubscribe it knows what the start version is that it needs to receive, making sure the WMS only receives every version once even if we need to create a new channel.

A developer now can get a WatchManger that can issue multiple Subscriptions using a single duplex gprc stream. A Subscription will manage its own lifetime and by disposing the Subscription object the watch will be cancelled on the etcd server.

@shubhamranjan
Copy link
Owner

The WM will issue unique watchIds for all WMS issued by this WM.

I would like to see how you plan to achieve this in case of same etcd watch ID(s) issued in different streams. Because in case of connection exceptions, I believe the stream would be recreated and etcd issued watch ID(s) won't be valid anymore for the new stream.

Looks fine to me if its included in this library. Would make the watch client better.

@yoricksmeets
Copy link
Contributor

yoricksmeets commented Apr 12, 2020

I plan on making the watchId of the WM increment only (the watchId is a long so I don't think we will run out). Because the WM manages both the stream to the etcd server and the watchId-counter it can guarantee that watchIds are unique for the specific stream that it manages. Because the etcd server scopes the watchIds to a specific stream we only need to guarantee the uniqueness of the watchId in combination with a specific stream to the etcd server.

The WMS will store the watch creation data inside its object so the WM can issue new WatchCreateRequests on behalf of the WMS to restore its watch after a stream has been recreated. The recreated stream will be a new stream to the etcd server with no watches (and therefor no watchIds) linked to this stream. Because the WM will only increment the watchId I can re-use the old watchIds in the new WatchCreateRequests to create new watches with the same watchId.

Note: 2 different WM instances can issue the same watchId but that is not a problem since they would also manage their own stream to the server, so the guarantee is only needed per WM instance.

Note2: Because I keep the watch creation data in the WMS and will have to send new WatchCreateRequests when restoring the connection I think the WMS will have to store the last seen update version so we can issue a correct start version in the new WatchCreateRequests and have the same behaviour as a stream that was not recreated where all updates are delivered exactly once. For single keys this seems do-able since the WMS will receive every update for that watch from the WM but for ranged watches I'm not sure how that would work because I did not see an option to issue start versions for individual keys within a keyrange creation request.

@shubhamranjan
Copy link
Owner

For single keys this seems do-able since the WMS will receive every update for that watch from the WM but for ranged watches I'm not sure how that would work because I did not see an option to issue start versions for individual keys within a keyrange creation request.

Watch and WatchRange internally are the same api(s). The revision specified in the request should mean something whether its an individual key or a range. However, I will try these cases out on the weekend and check on the behavior.

@yoricksmeets
Copy link
Contributor

That would be good news and should simplify the rebuild of the channel. I will start with just supporting an exact key for this moment and if that is working I will look at the WatchRange.

I have included my fork of this client as submodule in our solution so I can test its behaviour before making the PR to this repository. It will go through manual testing for our use cases and I'm hoping to add some integration-tests. These test would need a running etcd server as asserting and mocking a etcd server at the grpc network level is not something I'm planning to do in the foreseeable future.

@shubhamranjan
Copy link
Owner

Yeah, Automation of test cases is long pending here. I should start that soon enough now.

@kaflake
Copy link

kaflake commented Feb 26, 2021

Is there still a solution for the cancelation problem?

@lapinbleu007
Copy link

Hi guys, what are the consequenses if we watch variables without canceling the watch? Also, what happens if we watch the same variable again with the same id?

@shubhamranjan
Copy link
Owner

@lapinbleu007 - For now, nothing is handled at the client level, the requests are passed as is to the etcd server, So each watch request would basically be a new watch request.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants