diff --git a/CMakeLists.txt b/CMakeLists.txt index c1167e0a1..8c28311fd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -91,6 +91,9 @@ if (EMSCRIPTEN) -fwasm-exceptions -pthread "SHELL:-s PTHREAD_POOL_SIZE=30" + "SHELL:-s FETCH=1" + "SHELL:-s ASSERTIONS=1" + "SHELL:-s USE_PTHREADS" ) endif () diff --git a/blocks/CMakeLists.txt b/blocks/CMakeLists.txt index 0b04c5f66..d9860f4c7 100644 --- a/blocks/CMakeLists.txt +++ b/blocks/CMakeLists.txt @@ -1,4 +1,5 @@ add_subdirectory(basic) add_subdirectory(filter) add_subdirectory(fourier) +add_subdirectory(http) add_subdirectory(testing) diff --git a/blocks/basic/include/gnuradio-4.0/basic/common_blocks.hpp b/blocks/basic/include/gnuradio-4.0/basic/common_blocks.hpp index 76507e49f..19a6baa7d 100644 --- a/blocks/basic/include/gnuradio-4.0/basic/common_blocks.hpp +++ b/blocks/basic/include/gnuradio-4.0/basic/common_blocks.hpp @@ -15,8 +15,8 @@ template class InspectSink : public gr::Block> { public: - T value{}; gr::PortIn in; + T value{}; constexpr void processOne(T val) { @@ -24,7 +24,7 @@ class InspectSink : public gr::Block> { } }; -ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (InspectSink), value, in); +ENABLE_REFLECTION_FOR_TEMPLATE(InspectSink, in, value); template class builtin_multiply : public gr::Block> { diff --git a/blocks/http/CMakeLists.txt b/blocks/http/CMakeLists.txt new file mode 100644 index 000000000..90b4ee900 --- /dev/null +++ b/blocks/http/CMakeLists.txt @@ -0,0 +1,14 @@ +FetchContent_Declare( + httplib + GIT_REPOSITORY https://github.com/yhirose/cpp-httplib.git + GIT_TAG v0.15.1 +) +FetchContent_MakeAvailable(httplib) + +add_library(gr-http INTERFACE) +target_link_libraries(gr-http INTERFACE gnuradio-core gnuradio-algorithm httplib::httplib) +target_include_directories(gr-http INTERFACE $ $) + +if (ENABLE_TESTING) + add_subdirectory(test) +endif () diff --git a/blocks/http/include/gnuradio-4.0/http/HttpBlock.hpp b/blocks/http/include/gnuradio-4.0/http/HttpBlock.hpp new file mode 100644 index 000000000..4321879b5 --- /dev/null +++ b/blocks/http/include/gnuradio-4.0/http/HttpBlock.hpp @@ -0,0 +1,298 @@ +#ifndef GNURADIO_HTTP_BLOCK_HPP +#define GNURADIO_HTTP_BLOCK_HPP + +#include +#include +#include +#include +#include + +#ifdef __EMSCRIPTEN__ +#include +#include +#include +#else +#include +#endif + +using namespace gr; +using namespace std::chrono_literals; + +namespace gr::http { + +enum class RequestType { + GET, + SUBSCRIBE, + POST, +}; + +using HttpBlockDoc = Doc; + +template +class HttpBlock : public gr::Block, BlockingIO, HttpBlockDoc> { +private: + // used for queuing GET responses for the consumer + std::queue _backlog; + std::mutex _backlog_mutex; + + std::shared_ptr _thread; + std::atomic_size_t _pendingRequests = 0; + std::atomic_bool _shutdownThread = false; + std::binary_semaphore _ready{ 0 }; + +#ifndef __EMSCRIPTEN__ + std::unique_ptr _client; +#endif + +#ifdef __EMSCRIPTEN__ + void + queueWorkEmscripten(emscripten_fetch_t *fetch) { + pmtv::map_t result; + result["mime-type"] = "text/plain"; + result["status"] = static_cast(fetch->status); + result["raw-data"] = std::string(fetch->data, static_cast(fetch->numBytes)); + + queueWork(result); + } + + void + onSuccess(emscripten_fetch_t *fetch) { + queueWorkEmscripten(fetch); + emscripten_fetch_close(fetch); + } + + void + onError(emscripten_fetch_t *fetch) { + // we still want to queue the response, the statusCode will just not be 200 + queueWorkEmscripten(fetch); + emscripten_fetch_close(fetch); + } + + void + doRequestEmscripten() { + emscripten_fetch_attr_t attr; + emscripten_fetch_attr_init(&attr); + if (type == RequestType::POST) { + strcpy(attr.requestMethod, "POST"); + if (!parameters.empty()) { + attr.requestData = parameters.c_str(); + attr.requestDataSize = parameters.size(); + } + } else { + strcpy(attr.requestMethod, "GET"); + } + + // this is needed so that we can call into member functions again, when we receive the Fetch callback + attr.userData = this; + + attr.attributes = EMSCRIPTEN_FETCH_LOAD_TO_MEMORY; + attr.onsuccess = [](emscripten_fetch_t *fetch) { + auto src = static_cast *>(fetch->userData); + src->onSuccess(fetch); + }; + attr.onerror = [](emscripten_fetch_t *fetch) { + auto src = static_cast *>(fetch->userData); + src->onError(fetch); + }; + const auto target = url + endpoint; + std::ignore = emscripten_fetch(&attr, target.c_str()); + } + + void + runThreadEmscripten() { + if (type == RequestType::SUBSCRIBE) { + while (!_shutdownThread) { + // long polling, just keep doing requests + std::thread thread{ &HttpBlock::doRequestEmscripten, this }; + thread.join(); + } + } else { + while (!_shutdownThread) { + while (_pendingRequests > 0) { + _pendingRequests--; + std::thread thread{ &HttpBlock::doRequestEmscripten, this }; + thread.join(); + } + _ready.acquire(); + } + } + } +#else + void + runThreadNative() { + _client = std::make_unique(url); + _client->set_follow_location(true); + if (type == RequestType::SUBSCRIBE) { + // it's long polling, be generous with timeouts + _client->set_read_timeout(1h); + _client->Get(endpoint, [&](const char *data, size_t len) { + pmtv::map_t result; + result["mime-type"] = "text/plain"; + result["status"] = 200; + result["raw-data"] = std::string(data, len); + + queueWork(result); + + return !_shutdownThread; + }); + } else { + while (!_shutdownThread) { + while (_pendingRequests > 0) { + _pendingRequests--; + httplib::Result resp; + if (type == RequestType::POST) { + resp = parameters.empty() ? _client->Post(endpoint) : _client->Post(endpoint, parameters, "application/x-www-form-urlencoded"); + } else { + resp = _client->Get(endpoint); + } + pmtv::map_t result; + if (resp) { + result["mime-type"] = "text/plain"; + result["status"] = resp->status; + result["raw-data"] = resp->body; + queueWork(result); + } + } + + _ready.acquire(); + } + } + } +#endif + + void + queueWork(const pmtv::map_t &item) { + { + std::lock_guard lg{ _backlog_mutex }; + _backlog.push(item); + } + const auto work = this->invokeWork(); + if (work == work::Status::DONE) { + std::atomic_store_explicit(&this->state, lifecycle::State::REQUESTED_STOP, std::memory_order_release); + this->state.notify_all(); + } + this->ioLastWorkStatus.exchange(work, std::memory_order_relaxed); + } + + void + runThread() { +#ifdef __EMSCRIPTEN__ + runThreadEmscripten(); +#else + runThreadNative(); +#endif + } + + void + startThread() { + lifecycle::State expectedThreadState = lifecycle::State::INITIALISED; + if (!this->state.compare_exchange_strong(expectedThreadState, lifecycle::State::RUNNING, std::memory_order_acq_rel)) { + return; + } + this->state.notify_all(); + + _thread = std::shared_ptr(new std::thread([this]() { runThread(); }), [this](std::thread *t) { + _shutdownThread = true; + _ready.release(); +#ifndef __EMSCRIPTEN__ + if (_client) { + _client->stop(); + } +#endif + if (t->joinable()) { + t->join(); + } + _shutdownThread = false; + delete t; + + std::atomic_store_explicit(&this->state, lifecycle::State::STOPPED, std::memory_order_release); + this->state.notify_all(); + }); + } + + void + stopThread() { + _thread.reset(); + } + +public: + PortIn in; + PortOut out; + + std::string url; + std::string endpoint; + gr::http::RequestType type = gr::http::RequestType::GET; + std::string parameters; // x-www-form-urlencoded encoded POST parameters + + explicit HttpBlock(const std::string &_url, const std::string &_endpoint = "/", const RequestType _type = RequestType::GET, const std::string &_parameters = "") + : url(_url), endpoint(_endpoint), type(_type), parameters(_parameters) {} + + ~HttpBlock() { stopThread(); } + + void + settingsChanged(const property_map & /*oldSettings*/, property_map &newSettings) noexcept { + if (newSettings.contains("url") || newSettings.contains("type")) { + // other setting changes are hot-swappble without restarting the Client + startThread(); + } + } + + void + start() { + startThread(); + } + + void + stop() { + stopThread(); + } + + [[nodiscard]] constexpr auto + processOne(T value) noexcept { + if (type == RequestType::SUBSCRIBE) { + // for long polling, the subscription should stay active, if and only if the value on the input port is 1 + if (value) { + if (!_thread) { + startThread(); + } + } else { + stopThread(); + } + } + + pmtv::map_t result; + std::lock_guard lg{ _backlog_mutex }; + if (!_backlog.empty()) { + result = _backlog.front(); + _backlog.pop(); + } + return result; + } + + // TODO: Consider using message ports instead of a public function for triggering requests + void + trigger() { + if (!_thread) { + startThread(); + } + if (type != RequestType::SUBSCRIBE) { + _pendingRequests++; + _ready.release(); + } + } +}; + +static_assert(gr::BlockLike>); + +} // namespace gr::http + +ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (gr::http::HttpBlock), in, out, url, endpoint, type); + +#endif // GNURADIO_HTTP_BLOCK_HPP diff --git a/blocks/http/test/CMakeLists.txt b/blocks/http/test/CMakeLists.txt new file mode 100644 index 000000000..e155e78c8 --- /dev/null +++ b/blocks/http/test/CMakeLists.txt @@ -0,0 +1,6 @@ +add_ut_test(qa_HttpBlock) +target_link_libraries(qa_HttpBlock PRIVATE gr-http) + +if (EMSCRIPTEN) + target_link_options(qa_HttpBlock PRIVATE --pre-js=${CMAKE_CURRENT_SOURCE_DIR}/pre.js --emrun) +endif () diff --git a/blocks/http/test/pre.js b/blocks/http/test/pre.js new file mode 100644 index 000000000..4a1984026 --- /dev/null +++ b/blocks/http/test/pre.js @@ -0,0 +1,47 @@ +// this is a little HTTP server intended for unit test purposes. +// for interactive CLI usage, you can start the server by passing "start" as the last argument, e.g.: +// node pre.js start +// +// for automated usage in unit tests, please call startServer() yourself after embedding this into the JS runtime with --pre-js +// see: https://emscripten.org/docs/tools_reference/emcc.html#emcc-pre-js + +var http = require("http"); +// this requires xhr2 to be installed, please run: npm install xhr2 +// see: https://github.com/emscripten-core/emscripten/issues/21158 +XMLHttpRequest = require('xhr2'); + +const server = http.createServer((req, res) => { + console.log(req.url); + if (req.url == "/echo") { + res.writeHead(200, {"Content-Type": "text/plain"}); + res.write("Hello world!"); + res.end(); + } else if (req.url == "/notify") { + // wait a bit before responding (we are testing long polling here) + return setTimeout(() => { + res.writeHead(200, {"Content-Type": "text/plain"}); + res.write("event"); + res.end(); + }, 10); + } else { + res.writeHead(404, {"Content-Type": "text/plain"}); + res.write("Unknown route"); + res.end(); + } +}); + +var startServer = () => { + server.once("error", (err) => { console.log(err); }); + server.listen(8080); + console.log("Started the server"); +}; + +var stopServer = () => { + server.close(); + server.closeAllConnections(); + console.log("Stopped the server"); +}; + +if (process.argv.at(-1) == "start") { + startServer(); +} diff --git a/blocks/http/test/qa_HttpBlock.cpp b/blocks/http/test/qa_HttpBlock.cpp new file mode 100644 index 000000000..43de19fc2 --- /dev/null +++ b/blocks/http/test/qa_HttpBlock.cpp @@ -0,0 +1,153 @@ +#include + +#include + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +template +struct fixed_source : public gr::Block, gr::PortOutNamed> { + T value = 1; + + gr::work::Result + work(std::size_t requested_work) { + auto &port = gr::outputPort<0, gr::PortType::STREAM>(this); + auto &writer = port.streamWriter(); + auto data = writer.reserve_output_range(1UZ); + data[0] = value; + data.publish(1UZ); + + value += 1; + return { requested_work, 1UZ, gr::work::Status::OK }; + } +}; + +const boost::ut::suite HttpBlocktests = [] { + using namespace boost::ut; + using namespace gr; + using namespace std::literals; + using namespace std::chrono_literals; + "http GET"_test = [] { +#ifdef __EMSCRIPTEN__ + std::thread thread{ [&]() { + // see ./pre.js for the emscripten server implementation + emscripten_run_script("startServer();"); + } }; +#else + httplib::Server server; + server.Get("/echo", [](const httplib::Request, httplib::Response &res) { res.set_content("Hello world!", "text/plain"); }); + + auto thread = std::thread{ [&server] { server.listen("localhost", 8080); } }; + server.wait_until_ready(); +#endif + + gr::Graph graph; + auto &source = graph.emplaceBlock>(); + auto &httpBlock = graph.emplaceBlock>("http://localhost:8080", "/echo"); + auto &sink = graph.emplaceBlock>(); + graph.connect(source, 0, httpBlock, 0); + graph.connect(httpBlock, 0, sink, 0); + httpBlock.start(); + + // make a request + httpBlock.trigger(); + pmtv::map_t response; + while (response.empty()) { + std::this_thread::sleep_for(1ms); + std::ignore = source.work(1UZ); + auto work = httpBlock.work(1UZ); + work = sink.work(1UZ); + response = sink.value; + } + expect(eq(std::get(response.at("raw-data")), "Hello world!"sv)); + + // change endpoint to something invalid + std::ignore = httpBlock.settings().set({ { "endpoint", "/does-not-exist" } }); + std::ignore = httpBlock.settings().applyStagedParameters(); + response.clear(); + httpBlock.trigger(); + while (response.empty()) { + std::this_thread::sleep_for(1ms); + std::ignore = source.work(1UZ); + auto work = sink.work(1UZ); + if (work.performed_work == 1UZ) { + response = sink.value; + } + } + expect(eq(std::get(response.at("status")), 404)); + +#ifdef __EMSCRIPTEN__ + emscripten_run_script("stopServer();"); +#else + server.stop(); +#endif + thread.join(); + }; + + "http SUBSCRIBE"_test = [] { +#ifdef __EMSCRIPTEN__ + std::thread thread{ [&]() { emscripten_run_script("startServer();"); } }; +#else + std::atomic_bool shutdown = false; + httplib::Server server; + server.Get("/notify", [&](const httplib::Request, httplib::Response &res) { + res.set_chunked_content_provider("text/plain", [&](size_t, httplib::DataSink &sink) { + if (shutdown) { + return false; + } + // delay the reply a bit, we are long polling + std::this_thread::sleep_for(10ms); + if (sink.is_writable()) { + sink.os << "event"; + } + return true; + }); + }); + + auto thread = std::thread{ [&server] { server.listen("localhost", 8080); } }; + server.wait_until_ready(); +#endif + + gr::Graph graph; + auto &source = graph.emplaceBlock>(); + auto &httpBlock = graph.emplaceBlock>("http://localhost:8080", "/notify", http::RequestType::SUBSCRIBE); + auto &sink = graph.emplaceBlock>(); + graph.connect(source, 0, httpBlock, 0); + graph.connect(httpBlock, 0, sink, 0); + httpBlock.start(); + + pmtv::map_t response; + size_t retry = 0; + while (retry < 10 && response.empty()) { + ++retry; + std::this_thread::sleep_for(10ms); + std::ignore = source.work(1UZ); + auto work = httpBlock.work(1UZ); + work = sink.work(1UZ); + response = sink.value; + } + expect(!response.empty()); + expect(eq(std::get(response.at("raw-data")), "event"sv)); + +#ifdef __EMSCRIPTEN__ + emscripten_run_script("stopServer();"); +#else + shutdown = true; + server.stop(); +#endif + thread.join(); + }; +}; + +int +main() { /* tests are statically executed */ +} diff --git a/core/include/gnuradio-4.0/Port.hpp b/core/include/gnuradio-4.0/Port.hpp index 79c7937ca..f01bd50b6 100644 --- a/core/include/gnuradio-4.0/Port.hpp +++ b/core/include/gnuradio-4.0/Port.hpp @@ -20,7 +20,7 @@ using gr::meta::fixed_string; #ifndef PMT_SUPPORTED_TYPE // // #### default supported types -- TODO: to be replaced by pmt::pmtv declaration #define PMT_SUPPORTED_TYPE // Only DataSet and DataSet are added => consider to support more Dataset -using supported_type = std::variant, std::complex, DataSet, DataSet /*, ...*/>; +using supported_type = std::variant, std::complex, DataSet, DataSet, pmtv::map_t /*, ...*/>; #endif enum class PortDirection { INPUT, OUTPUT, ANY }; // 'ANY' only for query and not to be used for port declarations diff --git a/core/test/qa_DynamicBlock.cpp b/core/test/qa_DynamicBlock.cpp index 291580ba7..b26b21438 100644 --- a/core/test/qa_DynamicBlock.cpp +++ b/core/test/qa_DynamicBlock.cpp @@ -29,21 +29,6 @@ static_assert(gr::BlockLike>); static_assert(gr::traits::block::stream_input_ports>::size() == 0); static_assert(gr::traits::block::stream_output_ports>::size() == 1); -template -struct DebugSink : public gr::Block> { - T lastValue = {}; - gr::PortIn in; - - void - processOne(T value) { - lastValue = value; - } -}; - -static_assert(gr::BlockLike>); - -ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (DebugSink), lastValue, in); - const boost::ut::suite DynamicBlocktests = [] { using namespace boost::ut; "Change number of ports dynamically"_test = [] {