Skip to content
This repository has been archived by the owner on Aug 8, 2023. It is now read-only.

Commit

Permalink
Move WorkerTask functionality into RunLoop core
Browse files Browse the repository at this point in the history
As a byproduct, this makes FileCache get requests properly
cancelable.
  • Loading branch information
jfirebaugh committed Jun 9, 2015
1 parent 18b8bc2 commit 3f6e434
Show file tree
Hide file tree
Showing 11 changed files with 191 additions and 138 deletions.
3 changes: 2 additions & 1 deletion include/mbgl/storage/file_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace mbgl {

struct Resource;
class Response;
class WorkRequest;

class FileCache : private util::noncopyable {
public:
Expand All @@ -18,7 +19,7 @@ class FileCache : private util::noncopyable {
enum class Hint : uint8_t { Full, Refresh, No };
using Callback = std::function<void(std::unique_ptr<Response>)>;

virtual void get(const Resource &resource, Callback callback) = 0;
virtual std::unique_ptr<WorkRequest> get(const Resource &resource, Callback callback) = 0;
virtual void put(const Resource &resource, std::shared_ptr<const Response> response, Hint hint) = 0;
};

Expand Down
2 changes: 1 addition & 1 deletion include/mbgl/storage/sqlite_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class SQLiteCache : public FileCache {
~SQLiteCache() override;

// FileCache API
void get(const Resource &resource, Callback callback) override;
std::unique_ptr<WorkRequest> get(const Resource &resource, Callback callback) override;
void put(const Resource &resource, std::shared_ptr<const Response> response, Hint hint) override;

class Impl;
Expand Down
4 changes: 2 additions & 2 deletions platform/default/sqlite_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ void SQLiteCache::Impl::createSchema() {
}
}

void SQLiteCache::get(const Resource &resource, Callback callback) {
std::unique_ptr<WorkRequest> SQLiteCache::get(const Resource &resource, Callback callback) {
// Can be called from any thread, but most likely from the file source thread.
// Will try to load the URL from the SQLite database and call the callback when done.
// Note that the callback is probably going to invoked from another thread, so the caller
// must make sure that it can run in that thread.
thread->invokeWithResult(&Impl::get, std::move(callback), resource);
return thread->invokeWithResult(&Impl::get, std::move(callback), resource);
}

std::unique_ptr<Response> SQLiteCache::Impl::get(const Resource &resource) {
Expand Down
37 changes: 14 additions & 23 deletions src/mbgl/storage/default_file_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,26 +100,16 @@ void DefaultFileSource::Impl::add(Request* req) {
request->observers.insert(req);

if (cache) {
startCacheRequest(resource);
startCacheRequest(request);
} else {
startRealRequest(resource);
startRealRequest(request);
}
}

void DefaultFileSource::Impl::startCacheRequest(const Resource& resource) {
void DefaultFileSource::Impl::startCacheRequest(DefaultFileRequest* request) {
// Check the cache for existing data so that we can potentially
// revalidate the information without having to redownload everything.
cache->get(resource, [this, resource](std::unique_ptr<Response> response) {
DefaultFileRequest* request = find(resource);

if (!request || request->request) {
// There is no request for this URL anymore. Likely, the request was canceled
// before we got around to process the cache result.
// The second possibility is that a request has already been started by another cache
// request. In this case, we don't have to do anything either.
return;
}

request->cacheRequest = cache->get(request->resource, [this, request](std::unique_ptr<Response> response) {
auto expired = [&response] {
const int64_t now = std::chrono::duration_cast<std::chrono::seconds>(
SystemClock::now().time_since_epoch()).count();
Expand All @@ -128,25 +118,23 @@ void DefaultFileSource::Impl::startCacheRequest(const Resource& resource) {

if (!response || expired()) {
// No response or stale cache. Run the real request.
startRealRequest(resource, std::move(response));
startRealRequest(request, std::move(response));
} else {
// The response is fresh. We're good to notify the caller.
notify(request, std::move(response), FileCache::Hint::No);
}
});
}

void DefaultFileSource::Impl::startRealRequest(const Resource& resource, std::shared_ptr<const Response> response) {
DefaultFileRequest* request = find(resource);

void DefaultFileSource::Impl::startRealRequest(DefaultFileRequest* request, std::shared_ptr<const Response> response) {
auto callback = [request, this] (std::shared_ptr<const Response> res, FileCache::Hint hint) {
notify(request, res, hint);
};

if (algo::starts_with(resource.url, "asset://")) {
request->request = assetContext->createRequest(resource, callback, loop, assetRoot);
if (algo::starts_with(request->resource.url, "asset://")) {
request->realRequest = assetContext->createRequest(request->resource, callback, loop, assetRoot);
} else {
request->request = httpContext->createRequest(resource, callback, loop, response);
request->realRequest = httpContext->createRequest(request->resource, callback, loop, response);
}
}

Expand All @@ -158,8 +146,11 @@ void DefaultFileSource::Impl::cancel(Request* req) {
// cancel the request and remove it from the pending list.
request->observers.erase(req);
if (request->observers.empty()) {
if (request->request) {
request->request->cancel();
if (request->cacheRequest) {
request->cacheRequest.reset();
}
if (request->realRequest) {
request->realRequest->cancel();
}
pending.erase(request->resource);
}
Expand Down
8 changes: 5 additions & 3 deletions src/mbgl/storage/default_file_source_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ class RequestBase;
struct DefaultFileRequest {
const Resource resource;
std::set<Request*> observers;
RequestBase* request = nullptr;

std::unique_ptr<WorkRequest> cacheRequest;
RequestBase* realRequest = nullptr;

inline DefaultFileRequest(const Resource& resource_)
: resource(resource_) {}
Expand All @@ -39,8 +41,8 @@ class DefaultFileSource::Impl {
private:
DefaultFileRequest* find(const Resource&);

void startCacheRequest(const Resource&);
void startRealRequest(const Resource&, std::shared_ptr<const Response> = nullptr);
void startCacheRequest(DefaultFileRequest*);
void startRealRequest(DefaultFileRequest*, std::shared_ptr<const Response> = nullptr);
void notify(DefaultFileRequest*, std::shared_ptr<const Response>, FileCache::Hint);

std::unordered_map<Resource, DefaultFileRequest, Resource::Hash> pending;
Expand Down
163 changes: 132 additions & 31 deletions src/mbgl/util/run_loop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#define MBGL_UTIL_RUN_LOOP

#include <mbgl/util/noncopyable.hpp>
#include <mbgl/util/work_task.hpp>
#include <mbgl/util/work_request.hpp>
#include <mbgl/util/uv_detail.hpp>

#include <functional>
Expand All @@ -23,70 +25,169 @@ class RunLoop : private util::noncopyable {
template <class Fn, class... Args>
void invoke(Fn&& fn, Args&&... args) {
auto tuple = std::make_tuple(std::move(args)...);
auto invokable = std::make_unique<Invoker<Fn, decltype(tuple), Args...>>(std::move(fn), std::move(tuple));
withMutex([&] { queue.push(std::move(invokable)); });
auto task = std::make_shared<Invoker<Fn, decltype(tuple), Args...>>(
std::move(fn),
std::move(tuple));

withMutex([&] { queue.push(task); });
async.send();
}

// Return a function that invokes the given function on this RunLoop.
template <class... Args>
auto bind(std::function<void (Args...)> fn) {
return [this, fn = std::move(fn)] (Args&&... args) {
invoke(std::move(fn), std::move(args)...);
};
// Invoke fn(args...) on this RunLoop, then invoke callback() on the current RunLoop.
template <class Fn, class... Args>
std::unique_ptr<WorkRequest>
invokeWithResult(Fn&& fn, std::function<void ()> callback, Args&&... args) {
auto tuple = std::make_tuple(std::move(args)...);
auto task = std::make_shared<Invoker<Fn, decltype(tuple), Args...>>(
std::move(fn),
std::move(tuple));

task->bind(callback);

withMutex([&] { queue.push(task); });
async.send();

return std::make_unique<WorkRequest>(task);
}

// Invoke fn(args...) on this RunLoop, then invoke callback(result) on the current RunLoop.
template <class R, class Fn, class... Args>
void invokeWithResult(Fn&& fn, std::function<void (R)> callback, Args&&... args) {
invoke([fn = std::move(fn), callback = current.get()->bind(callback)] (Args&&... a) mutable {
callback(fn(std::forward<Args>(a)...));
}, std::forward<Args>(args)...);
std::unique_ptr<WorkRequest>
invokeWithResult(Fn&& fn, std::function<void (R)> callback, Args&&... args) {
auto tuple = std::make_tuple(std::move(args)...);
auto task = std::make_shared<InvokerWithResult<Fn, decltype(tuple), R, Args...>>(
std::move(fn),
std::move(tuple));

task->bind(callback);

withMutex([&] { queue.push(task); });
async.send();

return std::make_unique<WorkRequest>(task);
}

// Invoke fn(args...) on this RunLoop, then invoke callback() on the current RunLoop.
template <class Fn, class... Args>
void invokeWithResult(Fn&& fn, std::function<void ()> callback, Args&&... args) {
invoke([fn = std::move(fn), callback = current.get()->bind(callback)] (Args&&... a) mutable {
fn(std::forward<Args>(a)...);
callback();
}, std::forward<Args>(args)...);
// Return a function that invokes the given function on this RunLoop.
template <class... Args>
auto bind(std::function<void (Args...)> fn) {
return [this, fn = std::move(fn)] (Args&&... args) {
invoke(std::move(fn), std::move(args)...);
};
}

uv_loop_t* get() { return async.get()->loop; }

static uv::tls<RunLoop> current;

private:
// A movable type-erasing invokable entity wrapper. This allows to store arbitrary invokable
// things (like std::function<>, or the result of a movable-only std::bind()) in the queue.
// Source: http://stackoverflow.com/a/29642072/331379
struct Message {
virtual void operator()() = 0;
virtual ~Message() = default;
};

template <class F, class P, class... Args>
struct Invoker : Message {
class Invoker : public WorkTask, public std::enable_shared_from_this<Invoker<F, P, Args...>> {
public:
Invoker(F&& f, P&& p)
: func(std::move(f)),
params(std::move(p)) {
}

using C = std::function<void ()>;

void bind(C after) {
auto task = this->shared_from_this();
callback = RunLoop::current.get()->bind(C([task, after] {
if (!task->canceled) {
after();
}
}));
}

void operator()() override {
// We are only running the task when there's an after callback to call. This means that an
// empty after callback will be treated as a cancelled request. The mutex will be locked while
// processing so that the cancel() callback will block.
std::lock_guard<std::mutex> lock(mutex);
if (!canceled) {
invoke(std::index_sequence_for<Args...>{});
}
}

void cancel() override {
// Remove the after callback to indicate that this callback has been canceled. The mutex will
// block when the task is currently in progres. When the task has not begun yet, the runTask()
// method will not do anything. When the task has been completed already, and the after callback
// was run as well, this will also do nothing.
std::lock_guard<std::mutex> lock(mutex);
canceled = true;
}

private:
template <std::size_t... I>
void invoke(std::index_sequence<I...>) {
func(std::forward<Args>(std::get<I>(params))...);
if (callback) {
callback();
}
}

std::mutex mutex;
bool canceled = false;

F func;
P params;
C callback;
};

template <class F, class P, class R, class... Args>
class InvokerWithResult : public WorkTask, public std::enable_shared_from_this<InvokerWithResult<F, P, R, Args...>> {
public:
InvokerWithResult(F&& f, P&& p)
: func(std::move(f)),
params(std::move(p)) {
}

using C = std::function<void (R)>;

void bind(C after) {
auto task = this->shared_from_this();
callback = RunLoop::current.get()->bind(C([task, after] (R result) {
if (!task->canceled) {
after(std::move(result));
}
}));
}

void operator()() override {
invoke(std::index_sequence_for<Args...>{});
// We are only running the task when there's an after callback to call. This means that an
// empty after callback will be treated as a cancelled request. The mutex will be locked while
// processing so that the cancel() callback will block.
std::lock_guard<std::mutex> lock(mutex);
if (!canceled) {
invoke(std::index_sequence_for<Args...>{});
}
}

void cancel() override {
// Remove the after callback to indicate that this callback has been canceled. The mutex will
// block when the task is currently in progres. When the task has not begun yet, the runTask()
// method will not do anything. When the task has been completed already, and the after callback
// was run as well, this will also do nothing.
std::lock_guard<std::mutex> lock(mutex);
canceled = true;
}

private:
template <std::size_t... I>
void invoke(std::index_sequence<I...>) {
func(std::forward<Args>(std::get<I>(params))...);
callback(func(std::forward<Args>(std::get<I>(params))...));
}

std::mutex mutex;
bool canceled = false;

F func;
P params;
C callback;
};

using Queue = std::queue<std::unique_ptr<Message>>;
using Queue = std::queue<std::shared_ptr<WorkTask>>;

void withMutex(std::function<void()>&&);
void process();
Expand Down
16 changes: 9 additions & 7 deletions src/mbgl/util/thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,17 @@ class Thread {
}

// Invoke object->fn(args...) in the runloop thread, then invoke callback(result) in the current thread.
template <class R, typename Fn, class... Args>
void invokeWithResult(Fn fn, std::function<void (R)> callback, Args&&... args) {
loop->invokeWithResult(bind<Fn, Args...>(fn), callback, std::forward<Args>(args)...);
template <typename Fn, class... Args>
std::unique_ptr<WorkRequest>
invokeWithResult(Fn fn, std::function<void ()> callback, Args&&... args) {
return loop->invokeWithResult(bind<Fn, Args...>(fn), callback, std::forward<Args>(args)...);
}

// Invoke object->fn(args...) in the runloop thread, then invoke callback() in the current thread.
template <typename Fn, class... Args>
void invokeWithResult(Fn fn, std::function<void ()> callback, Args&&... args) {
loop->invokeWithResult(bind<Fn, Args...>(fn), callback, std::forward<Args>(args)...);
// Invoke object->fn(args...) in the runloop thread, then invoke callback(result) in the current thread.
template <class R, typename Fn, class... Args>
std::unique_ptr<WorkRequest>
invokeWithResult(Fn fn, std::function<void (R)> callback, Args&&... args) {
return loop->invokeWithResult(bind<Fn, Args...>(fn), callback, std::forward<Args>(args)...);
}

// Invoke object->fn(args...) in the runloop thread, and wait for the result.
Expand Down
Loading

0 comments on commit 3f6e434

Please sign in to comment.