Skip to content

Commit

Permalink
Implement HttpBlock
Browse files Browse the repository at this point in the history
In the future we might want to consider using the message port mechanism
to trigger the requests.

Fixes #236
  • Loading branch information
vimpostor committed Feb 5, 2024
1 parent e0b5b99 commit 6b001e8
Show file tree
Hide file tree
Showing 10 changed files with 525 additions and 18 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ if (EMSCRIPTEN)
-fwasm-exceptions
-pthread
"SHELL:-s PTHREAD_POOL_SIZE=30"
"SHELL:-s FETCH=1"
"SHELL:-s ASSERTIONS=1"
"SHELL:-s USE_PTHREADS"
)
endif ()

Expand Down
1 change: 1 addition & 0 deletions blocks/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
add_subdirectory(basic)
add_subdirectory(filter)
add_subdirectory(fourier)
add_subdirectory(http)
add_subdirectory(testing)
4 changes: 2 additions & 2 deletions blocks/basic/include/gnuradio-4.0/basic/common_blocks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
template<typename T>
class InspectSink : public gr::Block<InspectSink<T>> {
public:
T value{};
gr::PortIn<T> in;
T value{};

constexpr void
processOne(T val) {
value = val;
}
};

ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (InspectSink<T>), value, in);
ENABLE_REFLECTION_FOR_TEMPLATE(InspectSink, in, value);

template<typename T>
class builtin_multiply : public gr::Block<builtin_multiply<T>> {
Expand Down
14 changes: 14 additions & 0 deletions blocks/http/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FetchContent_Declare(
httplib
GIT_REPOSITORY https://github.com/yhirose/cpp-httplib.git
GIT_TAG v0.15.1
)
FetchContent_MakeAvailable(httplib)

add_library(gr-http INTERFACE)
target_link_libraries(gr-http INTERFACE gnuradio-core gnuradio-algorithm httplib::httplib)
target_include_directories(gr-http INTERFACE $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include/> $<INSTALL_INTERFACE:include/>)

if (ENABLE_TESTING)
add_subdirectory(test)
endif ()
298 changes: 298 additions & 0 deletions blocks/http/include/gnuradio-4.0/http/HttpBlock.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,298 @@
#ifndef GNURADIO_HTTP_BLOCK_HPP
#define GNURADIO_HTTP_BLOCK_HPP

#include <gnuradio-4.0/Block.hpp>
#include <gnuradio-4.0/Graph.hpp>
#include <gnuradio-4.0/reflection.hpp>
#include <pmtv/pmt.hpp>
#include <semaphore>

#ifdef __EMSCRIPTEN__
#include <emscripten/emscripten.h>
#include <emscripten/fetch.h>
#include <emscripten/threading.h>
#else
#include <httplib.h>
#endif

using namespace gr;
using namespace std::chrono_literals;

namespace gr::http {

enum class RequestType {
GET,
SUBSCRIBE,
POST,
};

using HttpBlockDoc = Doc<R""(
The HttpBlock allows to use the responses from HTTP APIs (e.g. REST APIs) as the value for this block's output port.
The block can be used either on-demand to do single requests, or can use long polling to subscribe to an event stream.
The result is provided on a single output port as a map with the following keys:
- status: The HTTP status code, usually 200 on success
- raw-data: The data of the response
- mime-type: The mime-type of the response
)"">;

template<typename T>
class HttpBlock : public gr::Block<HttpBlock<T>, BlockingIO<false>, HttpBlockDoc> {
private:
// used for queuing GET responses for the consumer
std::queue<pmtv::map_t> _backlog;
std::mutex _backlog_mutex;

std::shared_ptr<std::thread> _thread;
std::atomic_size_t _pendingRequests = 0;
std::atomic_bool _shutdownThread = false;
std::binary_semaphore _ready{ 0 };

#ifndef __EMSCRIPTEN__
std::unique_ptr<httplib::Client> _client;
#endif

#ifdef __EMSCRIPTEN__
void
queueWorkEmscripten(emscripten_fetch_t *fetch) {
pmtv::map_t result;
result["mime-type"] = "text/plain";
result["status"] = static_cast<int>(fetch->status);
result["raw-data"] = std::string(fetch->data, static_cast<std::size_t>(fetch->numBytes));

queueWork(result);
}

void
onSuccess(emscripten_fetch_t *fetch) {
queueWorkEmscripten(fetch);
emscripten_fetch_close(fetch);
}

void
onError(emscripten_fetch_t *fetch) {
// we still want to queue the response, the statusCode will just not be 200
queueWorkEmscripten(fetch);
emscripten_fetch_close(fetch);
}

void
doRequestEmscripten() {
emscripten_fetch_attr_t attr;
emscripten_fetch_attr_init(&attr);
if (type == RequestType::POST) {
strcpy(attr.requestMethod, "POST");
if (!parameters.empty()) {
attr.requestData = parameters.c_str();
attr.requestDataSize = parameters.size();
}
} else {
strcpy(attr.requestMethod, "GET");
}

// this is needed so that we can call into member functions again, when we receive the Fetch callback
attr.userData = this;

attr.attributes = EMSCRIPTEN_FETCH_LOAD_TO_MEMORY;
attr.onsuccess = [](emscripten_fetch_t *fetch) {
auto src = static_cast<HttpBlock<T> *>(fetch->userData);
src->onSuccess(fetch);
};
attr.onerror = [](emscripten_fetch_t *fetch) {
auto src = static_cast<HttpBlock<T> *>(fetch->userData);
src->onError(fetch);
};
const auto target = url + endpoint;
std::ignore = emscripten_fetch(&attr, target.c_str());
}

void
runThreadEmscripten() {
if (type == RequestType::SUBSCRIBE) {
while (!_shutdownThread) {
// long polling, just keep doing requests
std::thread thread{ &HttpBlock::doRequestEmscripten, this };
thread.join();
}
} else {
while (!_shutdownThread) {
while (_pendingRequests > 0) {
_pendingRequests--;
std::thread thread{ &HttpBlock::doRequestEmscripten, this };
thread.join();
}
_ready.acquire();
}
}
}
#else
void
runThreadNative() {
_client = std::make_unique<httplib::Client>(url);
_client->set_follow_location(true);
if (type == RequestType::SUBSCRIBE) {
// it's long polling, be generous with timeouts
_client->set_read_timeout(1h);
_client->Get(endpoint, [&](const char *data, size_t len) {
pmtv::map_t result;
result["mime-type"] = "text/plain";
result["status"] = 200;
result["raw-data"] = std::string(data, len);

queueWork(result);

return !_shutdownThread;
});
} else {
while (!_shutdownThread) {
while (_pendingRequests > 0) {
_pendingRequests--;
httplib::Result resp;
if (type == RequestType::POST) {
resp = parameters.empty() ? _client->Post(endpoint) : _client->Post(endpoint, parameters, "application/x-www-form-urlencoded");
} else {
resp = _client->Get(endpoint);
}
pmtv::map_t result;
if (resp) {
result["mime-type"] = "text/plain";
result["status"] = resp->status;
result["raw-data"] = resp->body;
queueWork(result);
}
}

_ready.acquire();
}
}
}
#endif

void
queueWork(const pmtv::map_t &item) {
{
std::lock_guard lg{ _backlog_mutex };
_backlog.push(item);
}
const auto work = this->invokeWork();
if (work == work::Status::DONE) {
std::atomic_store_explicit(&this->state, lifecycle::State::REQUESTED_STOP, std::memory_order_release);
this->state.notify_all();
}
this->ioLastWorkStatus.exchange(work, std::memory_order_relaxed);
}

void
runThread() {
#ifdef __EMSCRIPTEN__
runThreadEmscripten();
#else
runThreadNative();
#endif
}

void
startThread() {
lifecycle::State expectedThreadState = lifecycle::State::INITIALISED;
if (!this->state.compare_exchange_strong(expectedThreadState, lifecycle::State::RUNNING, std::memory_order_acq_rel)) {
return;
}
this->state.notify_all();

_thread = std::shared_ptr<std::thread>(new std::thread([this]() { runThread(); }), [this](std::thread *t) {
_shutdownThread = true;
_ready.release();
#ifndef __EMSCRIPTEN__
if (_client) {
_client->stop();
}
#endif
if (t->joinable()) {
t->join();
}
_shutdownThread = false;
delete t;

std::atomic_store_explicit(&this->state, lifecycle::State::STOPPED, std::memory_order_release);
this->state.notify_all();
});
}

void
stopThread() {
_thread.reset();
}

public:
PortIn<T> in;
PortOut<pmtv::map_t> out;

std::string url;
std::string endpoint;
gr::http::RequestType type = gr::http::RequestType::GET;
std::string parameters; // x-www-form-urlencoded encoded POST parameters

explicit HttpBlock(const std::string &_url, const std::string &_endpoint = "/", const RequestType _type = RequestType::GET, const std::string &_parameters = "")
: url(_url), endpoint(_endpoint), type(_type), parameters(_parameters) {}

~HttpBlock() { stopThread(); }

void
settingsChanged(const property_map & /*oldSettings*/, property_map &newSettings) noexcept {
if (newSettings.contains("url") || newSettings.contains("type")) {
// other setting changes are hot-swappble without restarting the Client
startThread();
}
}

void
start() {
startThread();
}

void
stop() {
stopThread();
}

[[nodiscard]] constexpr auto
processOne(T value) noexcept {
if (type == RequestType::SUBSCRIBE) {
// for long polling, the subscription should stay active, if and only if the value on the input port is 1
if (value) {
if (!_thread) {
startThread();
}
} else {
stopThread();
}
}

pmtv::map_t result;
std::lock_guard lg{ _backlog_mutex };
if (!_backlog.empty()) {
result = _backlog.front();
_backlog.pop();
}
return result;
}

// TODO: Consider using message ports instead of a public function for triggering requests
void
trigger() {
if (!_thread) {
startThread();
}
if (type != RequestType::SUBSCRIBE) {
_pendingRequests++;
_ready.release();
}
}
};

static_assert(gr::BlockLike<http::HttpBlock<uint8_t>>);

} // namespace gr::http

ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (gr::http::HttpBlock<T>), in, out, url, endpoint, type);

#endif // GNURADIO_HTTP_BLOCK_HPP
6 changes: 6 additions & 0 deletions blocks/http/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
add_ut_test(qa_HttpBlock)
target_link_libraries(qa_HttpBlock PRIVATE gr-http)

if (EMSCRIPTEN)
target_link_options(qa_HttpBlock PRIVATE --pre-js=${CMAKE_CURRENT_SOURCE_DIR}/pre.js --emrun)
endif ()
Loading

0 comments on commit 6b001e8

Please sign in to comment.