From 3f6e4343bc384f9baa5e22cbdf90f92a4b42017e Mon Sep 17 00:00:00 2001 From: John Firebaugh Date: Mon, 8 Jun 2015 18:40:11 -0400 Subject: [PATCH] Move WorkerTask functionality into RunLoop core As a byproduct, this makes FileCache get requests properly cancelable. --- include/mbgl/storage/file_cache.hpp | 3 +- include/mbgl/storage/sqlite_cache.hpp | 2 +- platform/default/sqlite_cache.cpp | 4 +- src/mbgl/storage/default_file_source.cpp | 37 ++-- src/mbgl/storage/default_file_source_impl.hpp | 8 +- src/mbgl/util/run_loop.hpp | 163 ++++++++++++++---- src/mbgl/util/thread.hpp | 16 +- src/mbgl/util/work_task.cpp | 37 ---- src/mbgl/util/work_task.hpp | 20 +-- src/mbgl/util/worker.cpp | 13 +- test/miscellaneous/thread.cpp | 26 +-- 11 files changed, 191 insertions(+), 138 deletions(-) diff --git a/include/mbgl/storage/file_cache.hpp b/include/mbgl/storage/file_cache.hpp index f815d5b8c2c..a9071b1b672 100644 --- a/include/mbgl/storage/file_cache.hpp +++ b/include/mbgl/storage/file_cache.hpp @@ -10,6 +10,7 @@ namespace mbgl { struct Resource; class Response; +class WorkRequest; class FileCache : private util::noncopyable { public: @@ -18,7 +19,7 @@ class FileCache : private util::noncopyable { enum class Hint : uint8_t { Full, Refresh, No }; using Callback = std::function)>; - virtual void get(const Resource &resource, Callback callback) = 0; + virtual std::unique_ptr get(const Resource &resource, Callback callback) = 0; virtual void put(const Resource &resource, std::shared_ptr response, Hint hint) = 0; }; diff --git a/include/mbgl/storage/sqlite_cache.hpp b/include/mbgl/storage/sqlite_cache.hpp index fd67d48bb56..65328652f40 100644 --- a/include/mbgl/storage/sqlite_cache.hpp +++ b/include/mbgl/storage/sqlite_cache.hpp @@ -17,7 +17,7 @@ class SQLiteCache : public FileCache { ~SQLiteCache() override; // FileCache API - void get(const Resource &resource, Callback callback) override; + std::unique_ptr get(const Resource &resource, Callback callback) override; void put(const Resource &resource, std::shared_ptr response, Hint hint) override; class Impl; diff --git a/platform/default/sqlite_cache.cpp b/platform/default/sqlite_cache.cpp index 06d168ce4e1..7d7e7d27de9 100644 --- a/platform/default/sqlite_cache.cpp +++ b/platform/default/sqlite_cache.cpp @@ -126,12 +126,12 @@ void SQLiteCache::Impl::createSchema() { } } -void SQLiteCache::get(const Resource &resource, Callback callback) { +std::unique_ptr SQLiteCache::get(const Resource &resource, Callback callback) { // Can be called from any thread, but most likely from the file source thread. // Will try to load the URL from the SQLite database and call the callback when done. // Note that the callback is probably going to invoked from another thread, so the caller // must make sure that it can run in that thread. - thread->invokeWithResult(&Impl::get, std::move(callback), resource); + return thread->invokeWithResult(&Impl::get, std::move(callback), resource); } std::unique_ptr SQLiteCache::Impl::get(const Resource &resource) { diff --git a/src/mbgl/storage/default_file_source.cpp b/src/mbgl/storage/default_file_source.cpp index b46696aaa31..7f5e0beb79e 100644 --- a/src/mbgl/storage/default_file_source.cpp +++ b/src/mbgl/storage/default_file_source.cpp @@ -100,26 +100,16 @@ void DefaultFileSource::Impl::add(Request* req) { request->observers.insert(req); if (cache) { - startCacheRequest(resource); + startCacheRequest(request); } else { - startRealRequest(resource); + startRealRequest(request); } } -void DefaultFileSource::Impl::startCacheRequest(const Resource& resource) { +void DefaultFileSource::Impl::startCacheRequest(DefaultFileRequest* request) { // Check the cache for existing data so that we can potentially // revalidate the information without having to redownload everything. - cache->get(resource, [this, resource](std::unique_ptr response) { - DefaultFileRequest* request = find(resource); - - if (!request || request->request) { - // There is no request for this URL anymore. Likely, the request was canceled - // before we got around to process the cache result. - // The second possibility is that a request has already been started by another cache - // request. In this case, we don't have to do anything either. - return; - } - + request->cacheRequest = cache->get(request->resource, [this, request](std::unique_ptr response) { auto expired = [&response] { const int64_t now = std::chrono::duration_cast( SystemClock::now().time_since_epoch()).count(); @@ -128,7 +118,7 @@ void DefaultFileSource::Impl::startCacheRequest(const Resource& resource) { if (!response || expired()) { // No response or stale cache. Run the real request. - startRealRequest(resource, std::move(response)); + startRealRequest(request, std::move(response)); } else { // The response is fresh. We're good to notify the caller. notify(request, std::move(response), FileCache::Hint::No); @@ -136,17 +126,15 @@ void DefaultFileSource::Impl::startCacheRequest(const Resource& resource) { }); } -void DefaultFileSource::Impl::startRealRequest(const Resource& resource, std::shared_ptr response) { - DefaultFileRequest* request = find(resource); - +void DefaultFileSource::Impl::startRealRequest(DefaultFileRequest* request, std::shared_ptr response) { auto callback = [request, this] (std::shared_ptr res, FileCache::Hint hint) { notify(request, res, hint); }; - if (algo::starts_with(resource.url, "asset://")) { - request->request = assetContext->createRequest(resource, callback, loop, assetRoot); + if (algo::starts_with(request->resource.url, "asset://")) { + request->realRequest = assetContext->createRequest(request->resource, callback, loop, assetRoot); } else { - request->request = httpContext->createRequest(resource, callback, loop, response); + request->realRequest = httpContext->createRequest(request->resource, callback, loop, response); } } @@ -158,8 +146,11 @@ void DefaultFileSource::Impl::cancel(Request* req) { // cancel the request and remove it from the pending list. request->observers.erase(req); if (request->observers.empty()) { - if (request->request) { - request->request->cancel(); + if (request->cacheRequest) { + request->cacheRequest.reset(); + } + if (request->realRequest) { + request->realRequest->cancel(); } pending.erase(request->resource); } diff --git a/src/mbgl/storage/default_file_source_impl.hpp b/src/mbgl/storage/default_file_source_impl.hpp index 3845014e971..22ca5b6d151 100644 --- a/src/mbgl/storage/default_file_source_impl.hpp +++ b/src/mbgl/storage/default_file_source_impl.hpp @@ -17,7 +17,9 @@ class RequestBase; struct DefaultFileRequest { const Resource resource; std::set observers; - RequestBase* request = nullptr; + + std::unique_ptr cacheRequest; + RequestBase* realRequest = nullptr; inline DefaultFileRequest(const Resource& resource_) : resource(resource_) {} @@ -39,8 +41,8 @@ class DefaultFileSource::Impl { private: DefaultFileRequest* find(const Resource&); - void startCacheRequest(const Resource&); - void startRealRequest(const Resource&, std::shared_ptr = nullptr); + void startCacheRequest(DefaultFileRequest*); + void startRealRequest(DefaultFileRequest*, std::shared_ptr = nullptr); void notify(DefaultFileRequest*, std::shared_ptr, FileCache::Hint); std::unordered_map pending; diff --git a/src/mbgl/util/run_loop.hpp b/src/mbgl/util/run_loop.hpp index 1c92847b695..3b753aca889 100644 --- a/src/mbgl/util/run_loop.hpp +++ b/src/mbgl/util/run_loop.hpp @@ -2,6 +2,8 @@ #define MBGL_UTIL_RUN_LOOP #include +#include +#include #include #include @@ -23,34 +25,54 @@ class RunLoop : private util::noncopyable { template void invoke(Fn&& fn, Args&&... args) { auto tuple = std::make_tuple(std::move(args)...); - auto invokable = std::make_unique>(std::move(fn), std::move(tuple)); - withMutex([&] { queue.push(std::move(invokable)); }); + auto task = std::make_shared>( + std::move(fn), + std::move(tuple)); + + withMutex([&] { queue.push(task); }); async.send(); } - // Return a function that invokes the given function on this RunLoop. - template - auto bind(std::function fn) { - return [this, fn = std::move(fn)] (Args&&... args) { - invoke(std::move(fn), std::move(args)...); - }; + // Invoke fn(args...) on this RunLoop, then invoke callback() on the current RunLoop. + template + std::unique_ptr + invokeWithResult(Fn&& fn, std::function callback, Args&&... args) { + auto tuple = std::make_tuple(std::move(args)...); + auto task = std::make_shared>( + std::move(fn), + std::move(tuple)); + + task->bind(callback); + + withMutex([&] { queue.push(task); }); + async.send(); + + return std::make_unique(task); } // Invoke fn(args...) on this RunLoop, then invoke callback(result) on the current RunLoop. template - void invokeWithResult(Fn&& fn, std::function callback, Args&&... args) { - invoke([fn = std::move(fn), callback = current.get()->bind(callback)] (Args&&... a) mutable { - callback(fn(std::forward(a)...)); - }, std::forward(args)...); + std::unique_ptr + invokeWithResult(Fn&& fn, std::function callback, Args&&... args) { + auto tuple = std::make_tuple(std::move(args)...); + auto task = std::make_shared>( + std::move(fn), + std::move(tuple)); + + task->bind(callback); + + withMutex([&] { queue.push(task); }); + async.send(); + + return std::make_unique(task); } - // Invoke fn(args...) on this RunLoop, then invoke callback() on the current RunLoop. - template - void invokeWithResult(Fn&& fn, std::function callback, Args&&... args) { - invoke([fn = std::move(fn), callback = current.get()->bind(callback)] (Args&&... a) mutable { - fn(std::forward(a)...); - callback(); - }, std::forward(args)...); + // Return a function that invokes the given function on this RunLoop. + template + auto bind(std::function fn) { + return [this, fn = std::move(fn)] (Args&&... args) { + invoke(std::move(fn), std::move(args)...); + }; } uv_loop_t* get() { return async.get()->loop; } @@ -58,35 +80,114 @@ class RunLoop : private util::noncopyable { static uv::tls current; private: - // A movable type-erasing invokable entity wrapper. This allows to store arbitrary invokable - // things (like std::function<>, or the result of a movable-only std::bind()) in the queue. - // Source: http://stackoverflow.com/a/29642072/331379 - struct Message { - virtual void operator()() = 0; - virtual ~Message() = default; - }; - template - struct Invoker : Message { + class Invoker : public WorkTask, public std::enable_shared_from_this> { + public: Invoker(F&& f, P&& p) : func(std::move(f)), params(std::move(p)) { } + using C = std::function; + + void bind(C after) { + auto task = this->shared_from_this(); + callback = RunLoop::current.get()->bind(C([task, after] { + if (!task->canceled) { + after(); + } + })); + } + + void operator()() override { + // We are only running the task when there's an after callback to call. This means that an + // empty after callback will be treated as a cancelled request. The mutex will be locked while + // processing so that the cancel() callback will block. + std::lock_guard lock(mutex); + if (!canceled) { + invoke(std::index_sequence_for{}); + } + } + + void cancel() override { + // Remove the after callback to indicate that this callback has been canceled. The mutex will + // block when the task is currently in progres. When the task has not begun yet, the runTask() + // method will not do anything. When the task has been completed already, and the after callback + // was run as well, this will also do nothing. + std::lock_guard lock(mutex); + canceled = true; + } + + private: + template + void invoke(std::index_sequence) { + func(std::forward(std::get(params))...); + if (callback) { + callback(); + } + } + + std::mutex mutex; + bool canceled = false; + + F func; + P params; + C callback; + }; + + template + class InvokerWithResult : public WorkTask, public std::enable_shared_from_this> { + public: + InvokerWithResult(F&& f, P&& p) + : func(std::move(f)), + params(std::move(p)) { + } + + using C = std::function; + + void bind(C after) { + auto task = this->shared_from_this(); + callback = RunLoop::current.get()->bind(C([task, after] (R result) { + if (!task->canceled) { + after(std::move(result)); + } + })); + } + void operator()() override { - invoke(std::index_sequence_for{}); + // We are only running the task when there's an after callback to call. This means that an + // empty after callback will be treated as a cancelled request. The mutex will be locked while + // processing so that the cancel() callback will block. + std::lock_guard lock(mutex); + if (!canceled) { + invoke(std::index_sequence_for{}); + } } + void cancel() override { + // Remove the after callback to indicate that this callback has been canceled. The mutex will + // block when the task is currently in progres. When the task has not begun yet, the runTask() + // method will not do anything. When the task has been completed already, and the after callback + // was run as well, this will also do nothing. + std::lock_guard lock(mutex); + canceled = true; + } + + private: template void invoke(std::index_sequence) { - func(std::forward(std::get(params))...); + callback(func(std::forward(std::get(params))...)); } + std::mutex mutex; + bool canceled = false; + F func; P params; + C callback; }; - using Queue = std::queue>; + using Queue = std::queue>; void withMutex(std::function&&); void process(); diff --git a/src/mbgl/util/thread.hpp b/src/mbgl/util/thread.hpp index f3a9baa6f3b..ee9ae3e609d 100644 --- a/src/mbgl/util/thread.hpp +++ b/src/mbgl/util/thread.hpp @@ -40,15 +40,17 @@ class Thread { } // Invoke object->fn(args...) in the runloop thread, then invoke callback(result) in the current thread. - template - void invokeWithResult(Fn fn, std::function callback, Args&&... args) { - loop->invokeWithResult(bind(fn), callback, std::forward(args)...); + template + std::unique_ptr + invokeWithResult(Fn fn, std::function callback, Args&&... args) { + return loop->invokeWithResult(bind(fn), callback, std::forward(args)...); } - // Invoke object->fn(args...) in the runloop thread, then invoke callback() in the current thread. - template - void invokeWithResult(Fn fn, std::function callback, Args&&... args) { - loop->invokeWithResult(bind(fn), callback, std::forward(args)...); + // Invoke object->fn(args...) in the runloop thread, then invoke callback(result) in the current thread. + template + std::unique_ptr + invokeWithResult(Fn fn, std::function callback, Args&&... args) { + return loop->invokeWithResult(bind(fn), callback, std::forward(args)...); } // Invoke object->fn(args...) in the runloop thread, and wait for the result. diff --git a/src/mbgl/util/work_task.cpp b/src/mbgl/util/work_task.cpp index ebec420dec4..e69de29bb2d 100644 --- a/src/mbgl/util/work_task.cpp +++ b/src/mbgl/util/work_task.cpp @@ -1,37 +0,0 @@ -#include - -#include - -namespace mbgl { - -WorkTask::WorkTask(std::function task_, std::function after_) - : task(task_), after(after_) { - assert(after); -} - -void WorkTask::runTask() { - // We are only running the task when there's an after callback to call. This means that an - // empty after callback will be treated as a cancelled request. The mutex will be locked while - // processing so that the cancel() callback will block. - std::lock_guard lock(mutex); - if (after) { - task(); - } -} - -void WorkTask::runAfter() { - if (after) { - after(); - } -} - -void WorkTask::cancel() { - // Remove the after callback to indicate that this callback has been canceled. The mutex will - // block when the task is currently in progres. When the task has not begun yet, the runTask() - // method will not do anything. When the task has been completed already, and the after callback - // was run as well, this will also do nothing. - std::lock_guard lock(mutex); - after = {}; -} - -} // end namespace mbgl diff --git a/src/mbgl/util/work_task.hpp b/src/mbgl/util/work_task.hpp index f730a31c56f..2224b211c42 100644 --- a/src/mbgl/util/work_task.hpp +++ b/src/mbgl/util/work_task.hpp @@ -3,25 +3,19 @@ #include -#include -#include - namespace mbgl { +// A movable type-erasing function wrapper. This allows to store arbitrary invokable +// things (like std::function<>, or the result of a movable-only std::bind()) in the queue. +// Source: http://stackoverflow.com/a/29642072/331379 class WorkTask : private util::noncopyable { public: - WorkTask(std::function task, std::function after); - - void runTask(); - void runAfter(); - void cancel(); + virtual ~WorkTask() = default; -private: - const std::function task; - std::function after; - std::mutex mutex; + virtual void operator()() = 0; + virtual void cancel() = 0; }; -} // end namespace mbgl +} #endif diff --git a/src/mbgl/util/worker.cpp b/src/mbgl/util/worker.cpp index 3022d302778..020740b3358 100644 --- a/src/mbgl/util/worker.cpp +++ b/src/mbgl/util/worker.cpp @@ -12,8 +12,8 @@ class Worker::Impl { public: Impl(uv_loop_t*) {} - void doWork(std::shared_ptr& task) { - task->runTask(); + void doWork(Fn work) { + work(); } }; @@ -26,15 +26,8 @@ Worker::Worker(std::size_t count) { Worker::~Worker() = default; std::unique_ptr Worker::send(Fn work, Fn after) { - auto task = std::make_shared(work, after); - auto request = std::make_unique(task); - - threads[current]->invokeWithResult(&Worker::Impl::doWork, [task] { - task->runAfter(); - }, task); - current = (current + 1) % threads.size(); - return request; + return threads[current]->invokeWithResult(&Worker::Impl::doWork, after, work); } } // end namespace mbgl diff --git a/test/miscellaneous/thread.cpp b/test/miscellaneous/thread.cpp index b0dd2210e9f..1376870bb7c 100644 --- a/test/miscellaneous/thread.cpp +++ b/test/miscellaneous/thread.cpp @@ -61,40 +61,46 @@ TEST(Thread, invoke) { const std::thread::id tid = std::this_thread::get_id(); RunLoop loop(uv_default_loop()); + std::vector> requests; loop.invoke([&] { EXPECT_EQ(tid, std::this_thread::get_id()); Thread thread("Test", ThreadPriority::Regular, tid); thread.invoke(&TestObject::fn1, 1); - thread.invokeWithResult(&TestObject::fn2, [&] (int result) { + requests.push_back(thread.invokeWithResult(&TestObject::fn2, [&] (int result) { EXPECT_EQ(tid, std::this_thread::get_id()); EXPECT_EQ(result, 1); - }); + })); thread.invoke(&TestObject::transferIn, std::make_unique(1)); - thread.invokeWithResult>(&TestObject::transferOut, [&] (std::unique_ptr result) { + requests.push_back(thread.invokeWithResult>(&TestObject::transferOut, [&] (std::unique_ptr result) { EXPECT_EQ(tid, std::this_thread::get_id()); EXPECT_EQ(*result, 1); - }); + })); - thread.invokeWithResult>(&TestObject::transferInOut, [&] (std::unique_ptr result) { + requests.push_back(thread.invokeWithResult>(&TestObject::transferInOut, [&] (std::unique_ptr result) { EXPECT_EQ(tid, std::this_thread::get_id()); EXPECT_EQ(*result, 1); - }, std::make_unique(1)); + }, std::make_unique(1))); thread.invoke(&TestObject::transferInShared, std::make_shared(1)); - thread.invokeWithResult>(&TestObject::transferOutShared, [&] (std::shared_ptr result) { + requests.push_back(thread.invokeWithResult>(&TestObject::transferOutShared, [&] (std::shared_ptr result) { EXPECT_EQ(tid, std::this_thread::get_id()); EXPECT_EQ(*result, 1); - }); + })); + + // Cancelled request + thread.invokeWithResult(&TestObject::fn1, [&] { + ADD_FAILURE(); + }, 1); std::string test("test"); - thread.invokeWithResult(&TestObject::transferString, [&] (std::string result){ + requests.push_back(thread.invokeWithResult(&TestObject::transferString, [&] (std::string result){ EXPECT_EQ(tid, std::this_thread::get_id()); EXPECT_EQ(result, "test"); loop.stop(); - }, test); + }, test)); test.clear(); });