Skip to content

Commit

Permalink
core: c++ scaffolding for main api (#251)
Browse files Browse the repository at this point in the history
Description: this PR adds a few foundational constructs to implement the c API introduced in #225.
The approach take here is to create an Http dispatcher that is able to manage http streams via the APIs introduced in #225. The streams created by the dispatcher can be interacted with in the mobile platform, and ultimately map down to direct injection into the envoy router via upstream envoy constructs (async client, async stream, and the router).

The code introduced here is capable of making a headers only request from platform code, and receiving the corresponding callback in platform code.

Subsequent PRs will implement the other relevant methods to deal with data and trailers.

Risk Level: med - foundational constructs introduced.
Testing: added cpp unit tests, and a CI job to run them.

Signed-off-by: Jose Nino <[email protected]>
Co-authored-by: Mike Schore <[email protected]>
Signed-off-by: JP Simard <[email protected]>
  • Loading branch information
2 people authored and jpsim committed Nov 28, 2022
1 parent 1d32353 commit d12cd3c
Show file tree
Hide file tree
Showing 19 changed files with 752 additions and 128 deletions.
14 changes: 14 additions & 0 deletions mobile/azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -363,3 +363,17 @@ stages:
# # Check for the sentinel value that shows the app is alive and well.
# - script: cat /tmp/envoy.log | grep 'Hello, world!'
# displayName: 'Check liveliness'
- stage: cpp_tests
dependsOn: [] # this removes the implicit dependency on previous stage and causes this to run in parallel.
jobs:
- job: mac_dist
timeoutInMinutes: 60
pool:
vmImage: 'macos-10.14'
steps:
- checkout: self
submodules: true
- script: ./ci/mac_ci_setup.sh
displayName: 'Install dependencies'
- script: ./bazelw test //test/common/...
displayName: 'Run tests'
8 changes: 7 additions & 1 deletion mobile/library/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ envoy_cc_library(
srcs = ["main_interface.cc"],
hdrs = ["main_interface.h"],
repository = "@envoy",
deps = ["@envoy//source/exe:envoy_main_common_lib"],
deps = [
"//library/common/buffer:utility_lib",
"//library/common/http:dispatcher_lib",
"//library/common/http:header_utility_lib",
"//library/common/include:c_types_interface",
"@envoy//source/exe:envoy_main_common_lib",
],
)

cc_library(
Expand Down
16 changes: 16 additions & 0 deletions mobile/library/common/buffer/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
licenses(["notice"]) # Apache 2

load("@envoy//bazel:envoy_build_system.bzl", "envoy_cc_library", "envoy_package")

envoy_package()

envoy_cc_library(
name = "utility_lib",
srcs = ["utility.cc"],
hdrs = ["utility.h"],
repository = "@envoy",
deps = [
"//library/common/include:c_types_interface",
"@envoy//include/envoy/buffer:buffer_interface",
],
)
17 changes: 17 additions & 0 deletions mobile/library/common/buffer/utility.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#include "library/common/buffer/utility.h"

#include "envoy/buffer/buffer.h"

namespace Envoy {
namespace Buffer {
namespace Utility {

// TODO: implement this https://github.com/lyft/envoy-mobile/issues/284.
Buffer::InstancePtr transformData(envoy_data) { return nullptr; }

// TODO: implement this https://github.com/lyft/envoy-mobile/issues/284.
envoy_data transformData(Buffer::Instance&) { return {0, nullptr}; }

} // namespace Utility
} // namespace Buffer
} // namespace Envoy
27 changes: 27 additions & 0 deletions mobile/library/common/buffer/utility.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include "envoy/buffer/buffer.h"

#include "library/common/include/c_types.h"

namespace Envoy {
namespace Buffer {
namespace Utility {

/**
* Transform envoy_data to Envoy::Buffer::Instance.
* @param headers, the envoy_data to transform.
* @return Envoy::Buffer::InstancePtr, the 1:1 transformation of the envoy_data param.
*/
Buffer::InstancePtr transformData(envoy_data data);

/**
* Transform from Buffer::Instance to envoy_data.
* @param data, the Buffer::Instance to transform.
* @return envoy_data, the 1:1 transformation of the Buffer::Instance param.
*/
envoy_data transformData(Buffer::Instance&);

} // namespace Utility
} // namespace Buffer
} // namespace Envoy
36 changes: 36 additions & 0 deletions mobile/library/common/http/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
licenses(["notice"]) # Apache 2

load("@envoy//bazel:envoy_build_system.bzl", "envoy_cc_library", "envoy_package")

envoy_package()

envoy_cc_library(
name = "dispatcher_lib",
srcs = ["dispatcher.cc"],
hdrs = ["dispatcher.h"],
repository = "@envoy",
deps = [
"//library/common/buffer:utility_lib",
"//library/common/http:header_utility_lib",
"//library/common/include:c_types_interface",
"@envoy//include/envoy/buffer:buffer_interface",
"@envoy//include/envoy/event:dispatcher_interface",
"@envoy//include/envoy/http:async_client_interface",
"@envoy//include/envoy/http:header_map_interface",
"@envoy//include/envoy/upstream:cluster_manager_interface",
"@envoy//source/common/common:minimal_logger_lib",
],
)

envoy_cc_library(
name = "header_utility_lib",
srcs = ["header_utility.cc"],
hdrs = ["header_utility.h"],
repository = "@envoy",
deps = [
"//library/common/include:c_types_interface",
"@envoy//include/envoy/buffer:buffer_interface",
"@envoy//include/envoy/http:header_map_interface",
"@envoy//source/common/http:header_map_lib",
],
)
117 changes: 117 additions & 0 deletions mobile/library/common/http/dispatcher.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#include "library/common/http/dispatcher.h"

#include "library/common/buffer/utility.h"
#include "library/common/http/header_utility.h"

namespace Envoy {
namespace Http {

Dispatcher::DirectStreamCallbacks::DirectStreamCallbacks(envoy_stream_t stream,
envoy_observer observer,
Dispatcher& http_dispatcher)
: stream_(stream), observer_(observer), http_dispatcher_(http_dispatcher) {}

void Dispatcher::DirectStreamCallbacks::onHeaders(HeaderMapPtr&& headers, bool end_stream) {
ENVOY_LOG(debug, "response headers for stream (end_stream={}):\n{}", end_stream, *headers);
if (end_stream) {
http_dispatcher_.removeStream(stream_);
}
observer_.on_headers_f(Utility::transformHeaders(*headers), end_stream, observer_.context);
}

void Dispatcher::DirectStreamCallbacks::onData(Buffer::Instance& data, bool end_stream) {
ENVOY_LOG(debug, "response data for stream (length={} end_stream={})", data.length(), end_stream);
if (end_stream) {
http_dispatcher_.removeStream(stream_);
}
observer_.on_data_f(Envoy::Buffer::Utility::transformData(data), end_stream, observer_.context);
}

void Dispatcher::DirectStreamCallbacks::onTrailers(HeaderMapPtr&& trailers) {
ENVOY_LOG(debug, "response trailers for stream:\n{}", *trailers);
http_dispatcher_.removeStream(stream_);
observer_.on_trailers_f(Utility::transformHeaders(*trailers), observer_.context);
}

void Dispatcher::DirectStreamCallbacks::onReset() {
http_dispatcher_.removeStream(stream_);
observer_.on_error_f({ENVOY_STREAM_RESET, {0, nullptr}}, observer_.context);
}

Dispatcher::DirectStream::DirectStream(AsyncClient::Stream& underlying_stream,
DirectStreamCallbacksPtr&& callbacks)
: underlying_stream_(underlying_stream), callbacks_(std::move(callbacks)) {}

Dispatcher::Dispatcher(Event::Dispatcher& event_dispatcher,
Upstream::ClusterManager& cluster_manager)
: current_stream_id_(0), event_dispatcher_(event_dispatcher),
cluster_manager_(cluster_manager) {}

envoy_stream_t Dispatcher::startStream(envoy_observer observer) {
envoy_stream_t new_stream_id = current_stream_id_++;

event_dispatcher_.post([this, observer, new_stream_id]() -> void {
DirectStreamCallbacksPtr callbacks =
std::make_unique<DirectStreamCallbacks>(new_stream_id, observer, *this);
AsyncClient& async_client = cluster_manager_.httpAsyncClientForCluster("egress_cluster");
AsyncClient::Stream* underlying_stream = async_client.start(*callbacks, {});

if (!underlying_stream) {
// TODO: this callback might fire before the startStream function returns.
// Take this into account when thinking about stream cancellation.
callbacks->onReset();
} else {
DirectStreamPtr direct_stream =
std::make_unique<DirectStream>(*underlying_stream, std::move(callbacks));
streams_.emplace(new_stream_id, std::move(direct_stream));
ENVOY_LOG(debug, "started stream [{}]", new_stream_id);
}
});

return new_stream_id;
}

envoy_status_t Dispatcher::sendHeaders(envoy_stream_t stream_id, envoy_headers headers,
bool end_stream) {
event_dispatcher_.post([this, stream_id, headers, end_stream]() -> void {
DirectStream* direct_stream = getStream(stream_id);
// If direct_stream is not found, it means the stream has already closed or been reset
// and the appropriate callback has been issued to the caller. There's nothing to do here
// except silently swallow this.
// TODO: handle potential race condition with cancellation or failure get a stream in the
// first place. Additionally it is possible to get a nullptr due to bogus stream_id
// from the caller.
// https://github.com/lyft/envoy-mobile/issues/301
if (direct_stream != nullptr) {
direct_stream->headers_ = Utility::transformHeaders(headers);
ENVOY_LOG(debug, "request headers for stream [{}] (end_stream={}):\n{}", stream_id,
end_stream, *direct_stream->headers_);
direct_stream->underlying_stream_.sendHeaders(*direct_stream->headers_, end_stream);
}
});

return ENVOY_SUCCESS;
}

// TODO: implement.
envoy_status_t Dispatcher::sendData(envoy_stream_t, envoy_headers, bool) { return ENVOY_FAILURE; }
envoy_status_t Dispatcher::sendMetadata(envoy_stream_t, envoy_headers, bool) {
return ENVOY_FAILURE;
}
envoy_status_t Dispatcher::sendTrailers(envoy_stream_t, envoy_headers) { return ENVOY_FAILURE; }
envoy_status_t Dispatcher::locallyCloseStream(envoy_stream_t) { return ENVOY_FAILURE; }
envoy_status_t Dispatcher::resetStream(envoy_stream_t) { return ENVOY_FAILURE; }

Dispatcher::DirectStream* Dispatcher::getStream(envoy_stream_t stream_id) {
ASSERT(event_dispatcher_.isThreadSafe(),
"stream interaction must be performed on the event_dispatcher_'s thread.");
auto direct_stream_pair_it = streams_.find(stream_id);
return (direct_stream_pair_it != streams_.end()) ? direct_stream_pair_it->second.get() : nullptr;
}

// TODO: implement. Note: the stream might not be in the map if for example startStream called
// onReset due to its inability to get an underlying stream.
envoy_status_t Dispatcher::removeStream(envoy_stream_t) { return ENVOY_FAILURE; }

} // namespace Http
} // namespace Envoy
115 changes: 115 additions & 0 deletions mobile/library/common/http/dispatcher.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#pragma once

#include <atomic>
#include <unordered_map>

#include "envoy/buffer/buffer.h"
#include "envoy/event/dispatcher.h"
#include "envoy/http/async_client.h"
#include "envoy/http/header_map.h"
#include "envoy/upstream/cluster_manager.h"

#include "common/common/logger.h"

#include "library/common/include/c_types.h"

namespace Envoy {
namespace Http {

/**
* Manages HTTP streams, and provides an interface to interact with them.
* The Dispatcher executes all stream operations on the provided Event::Dispatcher's event loop.
*/
class Dispatcher : public Logger::Loggable<Logger::Id::http> {
public:
Dispatcher(Event::Dispatcher& event_dispatcher, Upstream::ClusterManager& cluster_manager);

/**
* Attempts to open a new stream to the remote. Note that this function is asynchronous and
* opening a stream may fail. The returned handle is immediately valid for use with this API, but
* there is no guarantee it will ever functionally represent an open stream.
* @param observer wrapper for callbacks for events on this stream.
* @return envoy_stream_t handle to the stream being created.
*/
envoy_stream_t startStream(envoy_observer observer);
envoy_status_t sendHeaders(envoy_stream_t stream, envoy_headers headers, bool end_stream);
envoy_status_t sendData(envoy_stream_t stream, envoy_headers headers, bool end_stream);
envoy_status_t sendMetadata(envoy_stream_t stream, envoy_headers headers, bool end_stream);
envoy_status_t sendTrailers(envoy_stream_t stream, envoy_headers headers);
envoy_status_t locallyCloseStream(envoy_stream_t stream);
// TODO: when implementing this function we have to make sure to prevent races with already
// scheduled and potentially scheduled callbacks. In order to do so the platform callbacks need to
// check for atomic state (boolean most likely) that will be updated here to mark the stream as
// closed.
envoy_status_t resetStream(envoy_stream_t stream);
envoy_status_t removeStream(envoy_stream_t stream);

private:
/**
* Notifies caller of async HTTP stream status.
* Note the HTTP stream is full-duplex, even if the local to remote stream has been ended
* by sendHeaders/sendData with end_stream=true, sendTrailers, or locallyCloseStream
* DirectStreamCallbacks can continue to receive events until the remote to local stream is
* closed, or resetStream is called.
*/
class DirectStreamCallbacks : public AsyncClient::StreamCallbacks,
public Logger::Loggable<Logger::Id::http> {
public:
DirectStreamCallbacks(envoy_stream_t stream, envoy_observer observer,
Dispatcher& http_dispatcher);

// AsyncClient::StreamCallbacks
void onHeaders(HeaderMapPtr&& headers, bool end_stream);
void onData(Buffer::Instance& data, bool end_stream);
void onTrailers(HeaderMapPtr&& trailers);
void onReset();

private:
const envoy_stream_t stream_;
const envoy_observer observer_;
Dispatcher& http_dispatcher_;
};

using DirectStreamCallbacksPtr = std::unique_ptr<DirectStreamCallbacks>;

/**
* Contains state about an HTTP stream; both in the outgoing direction via an underlying
* AsyncClient::Stream and in the incoming direction via DirectStreamCallbacks.
*/
class DirectStream {
// TODO: Bookkeeping for this class is insufficient to fully cover all cases necessary to
// track the lifecycle of the underlying_stream_. One way or another, we must fix this
// to prevent bugs in the future. (Enhanced internal bookkeeping is probably good enough,
// but other options include upstream modifications to AsyncClient and friends.
public:
DirectStream(AsyncClient::Stream& underlying_stream, DirectStreamCallbacksPtr&& callbacks);

// Used to issue outgoing HTTP stream operations.
AsyncClient::Stream& underlying_stream_;
// Used to receive incoming HTTP stream operations.
const DirectStreamCallbacksPtr callbacks_;

HeaderMapPtr headers_;
// TODO: because the client may send infinite metadata frames we need some ongoing way to
// free metadata ahead of object destruction.
// An implementation option would be to have drainable header maps, or done callbacks.
std::vector<HeaderMapPtr> metadata_;
HeaderMapPtr trailers_;
};

using DirectStreamPtr = std::unique_ptr<DirectStream>;

// Everything in the below interface must only be accessed from the event_dispatcher's thread.
// This allows us to generally avoid synchronization.
DirectStream* getStream(envoy_stream_t stream_id);

std::unordered_map<envoy_stream_t, DirectStreamPtr> streams_;
std::atomic<envoy_stream_t> current_stream_id_;
// The event_dispatcher is the only member state that may be accessed from a thread other than
// the event_dispatcher's own thread.
Event::Dispatcher& event_dispatcher_;
Upstream::ClusterManager& cluster_manager_;
};

} // namespace Http
} // namespace Envoy
Loading

0 comments on commit d12cd3c

Please sign in to comment.