Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Long-lived streaming from server to client. #69

Closed
rogerworld opened this issue Mar 11, 2023 · 5 comments
Closed

Long-lived streaming from server to client. #69

rogerworld opened this issue Mar 11, 2023 · 5 comments

Comments

@rogerworld
Copy link

Hello Tradias,

Thank you for this great library, it makes grpc very easy to understand specially when dealing with asynchronous API.
I'm trying to implement the observer design pattern using asio grpc for a long-lived streaming
The client subscribes to a subject, then the server notify all connected clients subscibed to that subject each time a data is available.

The rpc is something like that:

service Longlived { rpc Subscribe(Request) returns (stream Response) {} }

Is it possible to implement it using asio grpc?

Thank you for your help in advance.

@Tradias
Copy link
Owner

Tradias commented Mar 11, 2023

Hi,

I am glad to hear that this library makes it easier to deal with gRPC's asynchronous API, that is exactly what I created it for :).

Regarding your question: asio-grpc provides just a thin layer over gRPC's API. After that it is up to asio/unifex to provide the machinery for writing asynchronous programs. In this case I could imagine a simple solution like the following. I am sure some edge cases are unhandled, like I said this is more of a question for asio/unifex and maybe you will find an answer among the (closed) issues on their github page.

using ResponseSPtr = std::shared_ptr<rogerworld::Response>;

struct Subject
{
    using Channel = asio::experimental::channel<void(std::error_code, ResponseSPtr)>;

    std::list<Channel> channels;
};

asio::awaitable<void> for_each_channel(Subject& subject, auto action)
{
    for (auto it = subject.channels.begin(); it != subject.channels.end();)
    {
        if (it->is_open())
        {
            co_await action(*it);
            ++it;
        }
        else
        {
            it = subject.channels.erase(it);
        }
    }
}

asio::awaitable<void> notify_all(Subject& subject, ResponseSPtr response)
{
    co_await for_each_channel(subject,
                              [&](Subject::Channel& channel)
                              {
                                  return channel.async_send(std::error_code{}, response, asio::use_awaitable);
                              });
}

asio::awaitable<void> close(Subject& subject)
{
    co_await for_each_channel(subject,
                              [&](Subject::Channel& channel)
                              {
                                  return channel.async_send(std::error_code{asio::error::operation_aborted}, nullptr,
                                                            asio::use_awaitable);
                              });
}

auto& add_observer(Subject& subject, auto executor) { return subject.channels.emplace_back(executor); }

void remove_observer(Subject::Channel& observer) { observer.close(); }

asio::awaitable<void> produce_data(agrpc::GrpcContext& grpc_context, Subject& subject)
{
    for (int i{}; i < 5; ++i)
    {
        auto data = std::make_shared<rogerworld::Response>();

        // Fill data. Emulating some time passing:
        co_await agrpc::Alarm(grpc_context).wait(std::chrono::system_clock::now() + std::chrono::seconds(2));
        data->set_data(i);

        co_await notify_all(subject, std::move(data));
    }
    co_await close(subject);
}

asio::awaitable<void> handle_request(Subject& subject, grpc::ServerAsyncWriter<rogerworld::Response>& responder)
{
    bool ok{true};
    auto& observer = add_observer(subject, co_await asio::this_coro::executor);
    while (ok)
    {
        const auto [ec, response] = co_await observer.async_receive(asio::as_tuple(asio::use_awaitable));
        if (ec)
        {
            break;
        }
        ok = co_await agrpc::write(responder, *response);
    }
    remove_observer(observer);
    co_await agrpc::finish(responder, grpc::Status::OK);
}


// somewhere in main:
    rogerworld::Longlived::AsyncService service;
    // ...
    Subject subject;
    bool producer_started{};
    agrpc::repeatedly_request(
        &rogerworld::Longlived::AsyncService::RequestSubscribe, service,
        asio::bind_executor(grpc_context,
                            [&](auto&, auto&, auto& responder) -> asio::awaitable<void>
                            {
                                // I cheat a bit by having only one producer. You might want to
                                // create some kind of list of producers and start one only if not
                                // already in the list
                                if (!std::exchange(producer_started, true))
                                {
                                    asio::co_spawn(grpc_context, produce_data(grpc_context, subject), asio::detached);
                                }
                                co_await handle_request(subject, responder);
                            }));

@Tradias
Copy link
Owner

Tradias commented Mar 11, 2023

I also uploaded the above code here if that helps: https://github.com/Tradias/example-vcpkg-grpc/tree/asio-grpc-69

@rogerworld
Copy link
Author

Thank you very much. This is exactly what I need.
Just for thread safety, do I need to protect produce and handle data functions ?
Thank you !

@Tradias
Copy link
Owner

Tradias commented Mar 11, 2023

agrpc::write and agrpc::finish are thread-safe, all gRPC functions are. The Asio channel is not, as explained in their documentation: https://www.boost.org/doc/libs/1_81_0/doc/html/boost_asio/reference/experimental__basic_channel.html

@rogerworld
Copy link
Author

Okay thank you, it's clear now.
I will close the issue because you responded to all my questions !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants