Skip to content

Commit

Permalink
pw_async: Remove periodic support
Browse files Browse the repository at this point in the history
Periodic support requires a number of policy decisions that can better
be made by libraries, and is generally implementable outside of the
basic dispatcher through user calls to `Post`/`PostAt`.

Change-Id: I56d954fad8dedf33a6eb1a6f6ea81f9b6de46414
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/145495
Commit-Queue: Taylor Cramer <[email protected]>
Reviewed-by: Keir Mierle <[email protected]>
Presubmit-Verified: CQ Bot Account <[email protected]>
Reviewed-by: Ali Saeed <[email protected]>
  • Loading branch information
cramertj authored and CQ Bot Account committed May 5, 2023
1 parent 9844064 commit 47d294d
Show file tree
Hide file tree
Showing 10 changed files with 0 additions and 172 deletions.
36 changes: 0 additions & 36 deletions pw_async/fake_dispatcher_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,42 +222,6 @@ TEST(FakeDispatcher, RequestStopInsideOtherTaskCancelsOtherTask) {
EXPECT_EQ(task_counter.counts, CallCounts{.cancelled = 1});
}

TEST(FakeDispatcher, PeriodicTasks) {
FakeDispatcher dispatcher;

CallCounter periodic_counter;
Task periodic_task(periodic_counter.fn());
dispatcher.PostPeriodicAt(periodic_task, 20ms, dispatcher.now() + 50ms);

// Cancel periodic task after it has run thrice, at +50ms, +70ms, and +90ms.
Task cancel_task([&periodic_task](Context& c, Status status) {
ASSERT_OK(status);
c.dispatcher->Cancel(periodic_task);
});
dispatcher.PostAfter(cancel_task, 100ms);

dispatcher.RunFor(300ms);
dispatcher.RequestStop();
dispatcher.RunUntilIdle();
EXPECT_EQ(periodic_counter.counts, CallCounts{.ok = 3});
}

TEST(FakeDispatcher, PostPeriodicAfter) {
FakeDispatcher dispatcher;
CallCounter counter;
Task periodic_task(counter.fn());
dispatcher.PostPeriodicAfter(periodic_task, /*interval=*/5ms, /*delay=*/20ms);

dispatcher.RunUntilIdle();
EXPECT_EQ(counter.counts, CallCounts{});
dispatcher.RunFor(20ms);
EXPECT_EQ(counter.counts, CallCounts{.ok = 1});
dispatcher.RunFor(10ms);
EXPECT_EQ(counter.counts, CallCounts{.ok = 3});
dispatcher.RunUntilIdle();
EXPECT_EQ(counter.counts, CallCounts{.ok = 3});
}

TEST(FakeDispatcher, TasksCancelledByDispatcherDestructor) {
CallCounter counter;
Task task0(counter.fn()), task1(counter.fn()), task2(counter.fn());
Expand Down
21 changes: 0 additions & 21 deletions pw_async/public/pw_async/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,27 +61,6 @@ class Dispatcher : public chrono::VirtualSystemClock {
/// Post caller owned |task| to be run at |time|.
virtual void PostAt(Task& task, chrono::SystemClock::time_point time) = 0;

/// Post caller owned |task| to be run immediately then rerun at a regular
/// |interval|.
virtual void PostPeriodic(Task& task,
chrono::SystemClock::duration interval) {
PostPeriodicAt(task, interval, now());
}

/// Post caller owned |task| to be run after |delay| then rerun at a regular
/// |interval|.
virtual void PostPeriodicAfter(Task& task,
chrono::SystemClock::duration interval,
chrono::SystemClock::duration delay) {
PostPeriodicAt(task, interval, now() + delay);
}

/// Post caller owned |task| to be run at |time| then rerun at a regular
/// |interval|. |interval| must not be zero.
virtual void PostPeriodicAt(Task& task,
chrono::SystemClock::duration interval,
chrono::SystemClock::time_point time) = 0;

/// Request that a task not be invoked again.
///
/// Periodic tasks may be posted once more after they are canceled. Tasks may
Expand Down
14 changes: 0 additions & 14 deletions pw_async/public/pw_async/fake_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,6 @@ class FakeDispatcher final : public Dispatcher {
void PostAt(Task& task, chrono::SystemClock::time_point time) override {
native_dispatcher_.PostAt(task, time);
}
void PostPeriodic(Task& task,
chrono::SystemClock::duration interval) override {
native_dispatcher_.PostPeriodic(task, interval);
}
void PostPeriodicAfter(Task& task,
chrono::SystemClock::duration interval,
chrono::SystemClock::duration delay) override {
native_dispatcher_.PostPeriodicAfter(task, interval, delay);
}
void PostPeriodicAt(Task& task,
chrono::SystemClock::duration interval,
chrono::SystemClock::time_point start_time) override {
native_dispatcher_.PostPeriodicAt(task, interval, start_time);
}
bool Cancel(Task& task) override { return native_dispatcher_.Cancel(task); }

// VirtualSystemClock overrides:
Expand Down
5 changes: 0 additions & 5 deletions pw_async/public/pw_async/heap_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ class HeapDispatcher final : public FunctionDispatcher {
chrono::SystemClock::time_point time) override {
return dispatcher_.PostAt(task, time);
}
inline void PostPeriodicAt(Task& task,
chrono::SystemClock::duration interval,
chrono::SystemClock::time_point time) override {
return dispatcher_.PostPeriodicAt(task, interval, time);
}
inline bool Cancel(Task& task) override { return dispatcher_.Cancel(task); }

// VirtualSystemClock overrides:
Expand Down
13 changes: 0 additions & 13 deletions pw_async_basic/dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,6 @@ void BasicDispatcher::ExecuteDueTasks() {
backend::NativeTask& task = task_queue_.front();
task_queue_.pop_front();

if (task.interval().has_value()) {
PostTaskInternal(task, task.due_time_ + task.interval().value());
}

lock_.unlock();
PW_LOG_DEBUG("running task");
Context ctx{this, &task.task_};
Expand Down Expand Up @@ -127,15 +123,6 @@ void BasicDispatcher::PostAt(Task& task, chrono::SystemClock::time_point time) {
lock_.unlock();
}

void BasicDispatcher::PostPeriodicAt(
Task& task,
chrono::SystemClock::duration interval,
chrono::SystemClock::time_point start_time) {
PW_DCHECK(interval != chrono::SystemClock::duration::zero());
task.native_type().set_interval(interval);
PostAt(task, start_time);
}

bool BasicDispatcher::Cancel(Task& task) {
std::lock_guard lock(lock_);
return task_queue_.remove(task.native_type());
Expand Down
30 changes: 0 additions & 30 deletions pw_async_basic/dispatcher_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,34 +198,4 @@ TEST(DispatcherBasic, TasksCancelledByRunFor) {
ASSERT_EQ(count, 3);
}

TEST(DispatcherBasic, PostPeriodicAfter) {
BasicDispatcher dispatcher;
thread::Thread work_thread(thread::stl::Options(), dispatcher);

TaskPair tp;
tp.task_a.set_function([&tp](Context& c, Status status) {
if (status.IsCancelled()) {
return;
}
++tp.count;

if (tp.count == 3) {
static_cast<BasicDispatcher*>(c.dispatcher)->RequestStop();
}
});
tp.task_b.set_function([&tp](Context& /*ctx*/, Status status) {
ASSERT_TRUE(status.ok());
ASSERT_EQ(tp.count, 0);
});

Task task([&tp](Context& c, Status) {
c.dispatcher->PostAfter(tp.task_b, 25ms);
c.dispatcher->PostPeriodicAfter(tp.task_a, 10ms, 75ms);
});
dispatcher.Post(task);

work_thread.join();
ASSERT_EQ(tp.count, 3);
}

} // namespace pw::async
24 changes: 0 additions & 24 deletions pw_async_basic/fake_dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@ void NativeFakeDispatcher::ExecuteDueTasks() {
::pw::async::backend::NativeTask& task = task_queue_.front();
task_queue_.pop_front();

if (task.interval().has_value()) {
PostTaskInternal(task, task.due_time() + task.interval().value());
}

Context ctx{&dispatcher_, &task.task_};
task(ctx, OkStatus());
}
Expand Down Expand Up @@ -101,26 +97,6 @@ void NativeFakeDispatcher::PostAt(Task& task,
PostTaskInternal(task.native_type(), time);
}

void NativeFakeDispatcher::PostPeriodic(
Task& task, chrono::SystemClock::duration interval) {
PostPeriodicAt(task, interval, now());
}

void NativeFakeDispatcher::PostPeriodicAfter(
Task& task,
chrono::SystemClock::duration interval,
chrono::SystemClock::duration delay) {
PostPeriodicAt(task, interval, now() + delay);
}

void NativeFakeDispatcher::PostPeriodicAt(
Task& task,
chrono::SystemClock::duration interval,
chrono::SystemClock::time_point start_time) {
task.native_type().set_interval(interval);
PostAt(task, start_time);
}

bool NativeFakeDispatcher::Cancel(Task& task) {
return task_queue_.remove(task.native_type());
}
Expand Down
3 changes: 0 additions & 3 deletions pw_async_basic/public/pw_async_basic/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ class BasicDispatcher final : public Dispatcher, public thread::ThreadCore {

// Dispatcher overrides:
void PostAt(Task& task, chrono::SystemClock::time_point time) override;
void PostPeriodicAt(Task& task,
chrono::SystemClock::duration interval,
chrono::SystemClock::time_point start_time) override;
bool Cancel(Task& task) override PW_LOCKS_EXCLUDED(lock_);

// VirtualSystemClock overrides:
Expand Down
10 changes: 0 additions & 10 deletions pw_async_basic/public/pw_async_basic/fake_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,6 @@ class NativeFakeDispatcher final {

void PostAt(Task& task, chrono::SystemClock::time_point time);

void PostPeriodic(Task& task, chrono::SystemClock::duration interval);

void PostPeriodicAfter(Task& task,
chrono::SystemClock::duration interval,
chrono::SystemClock::duration delay);

void PostPeriodicAt(Task& task,
chrono::SystemClock::duration interval,
chrono::SystemClock::time_point start_time);

bool Cancel(Task& task);

void RunUntilIdle();
Expand Down
16 changes: 0 additions & 16 deletions pw_async_basic/public/pw_async_basic/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,6 @@ class NativeTask final : public IntrusiveList<NativeTask>::Item {
void set_due_time(chrono::SystemClock::time_point due_time) {
due_time_ = due_time;
}
std::optional<chrono::SystemClock::duration> interval() const {
if (interval_ == chrono::SystemClock::duration::zero()) {
return std::nullopt;
}
return interval_;
}
void set_interval(std::optional<chrono::SystemClock::duration> interval) {
if (!interval.has_value()) {
interval_ = chrono::SystemClock::duration::zero();
return;
}
interval_ = *interval;
}

TaskFunction func_ = nullptr;
// task_ is placed after func_ to take advantage of the padding that would
Expand All @@ -64,9 +51,6 @@ class NativeTask final : public IntrusiveList<NativeTask>::Item {
// padding would be added here, which is just enough for a pointer.
Task& task_;
pw::chrono::SystemClock::time_point due_time_;
// A duration of 0 indicates that the task is not periodic.
chrono::SystemClock::duration interval_ =
chrono::SystemClock::duration::zero();
};

using NativeTaskHandle = NativeTask&;
Expand Down

0 comments on commit 47d294d

Please sign in to comment.