Skip to content

Commit

Permalink
Use poll() on POSIX in event_loop
Browse files Browse the repository at this point in the history
On POSIX platforms (Linux and macOS), the event_loop class blocks in
read(). This forces us to call write() on a separate thread. Make the
read pipe non-blocking and use poll() to wait for data to arrive. This
will let us multiplex read() and write() in the future.

On Windows, keep event_loop as-is (blocking ReadFile).

This commit should not change behavior.
  • Loading branch information
strager committed Jun 16, 2021
1 parent 53e4f3b commit a6875d5
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 15 deletions.
5 changes: 4 additions & 1 deletion src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ void run_lsp_server() {
endpoint_(std::forward_as_tuple(fs),
std::forward_as_tuple(output_pipe)) {}

platform_file_ref get_readable_pipe() { return this->input_pipe_; }
platform_file_ref get_readable_pipe() const { return this->input_pipe_; }

void append(string8_view data) { this->endpoint_.append(data); }

Expand All @@ -411,6 +411,9 @@ void run_lsp_server() {
endpoint_;
};

#if QLJS_EVENT_LOOP_READ_PIPE_NON_BLOCKING
input_pipe.set_pipe_non_blocking();
#endif
lsp_event_loop server(input_pipe, output_pipe, &fs);
server.run();
}
Expand Down
102 changes: 90 additions & 12 deletions src/quick-lint-js/event-loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,26 @@
#include <quick-lint-js/have.h>
#include <quick-lint-js/narrow-cast.h>

#if QLJS_HAVE_WINDOWS_H
#include <Windows.h>
#endif

#if QLJS_HAVE_POLL
#include <poll.h>
#endif

#if defined(_WIN32)
#define QLJS_EVENT_LOOP_READ_PIPE_NON_BLOCKING 0
#else
#define QLJS_EVENT_LOOP_READ_PIPE_NON_BLOCKING 1
#endif

namespace quick_lint_js {
#if QLJS_HAVE_CXX_CONCEPTS
template <class Delegate>
concept event_loop_delegate = requires(Delegate d, string8_view data) {
{d.get_readable_pipe()};
concept event_loop_delegate = requires(Delegate d, const Delegate cd,
string8_view data) {
{cd.get_readable_pipe()};
{d.append(data)};
};
#endif
Expand All @@ -30,34 +45,97 @@ concept event_loop_delegate = requires(Delegate d, string8_view data) {
//
// event_loop uses the CRTP pattern. Inherit from event_loop<your_class>.
// your_class must satisfy the event_loop_delegate concept.
//
// event_loop will never call non-const member functions in parallel.
template <class Derived>
class event_loop {
public:
void run() {
for (;;) {
// TODO(strager): Pick buffer size intelligently.
std::array<char8, 1024> buffer;
file_read_result read_result = this->derived().get_readable_pipe().read(
buffer.data(), buffer.size());
if (read_result.at_end_of_file) {
while (!this->done_) {
this->read_from_pipe();
if (this->done_) {
break;
} else if (read_result.error_message.has_value()) {
}

#if QLJS_HAVE_POLL
platform_file_ref pipe = this->const_derived().get_readable_pipe();
::pollfd pollfds[] = {
#if QLJS_EVENT_LOOP_READ_PIPE_NON_BLOCKING
{.fd = pipe.get(), .events = POLLIN, .revents = 0},
#endif
};
QLJS_ASSERT(std::size(pollfds) > 0);
int rc = ::poll(pollfds, std::size(pollfds), /*timeout=*/-1);
if (rc == -1) {
QLJS_UNIMPLEMENTED();
} else {
this->derived().append(string8_view(
buffer.data(), narrow_cast<std::size_t>(read_result.bytes_read)));
}
QLJS_ASSERT(rc > 0);
#if QLJS_EVENT_LOOP_READ_PIPE_NON_BLOCKING
if (pollfds[0].revents & POLLIN) {
continue;
}
if (pollfds[0].revents & POLLERR) {
QLJS_UNIMPLEMENTED();
}
#endif
#elif defined(_WIN32)
// Nothing to wait for.
static_assert(!QLJS_EVENT_LOOP_READ_PIPE_NON_BLOCKING);
#else
#error "Unsupported platform"
#endif
}
}

private:
void read_from_pipe() {
// TODO(strager): Pick buffer size intelligently.
std::array<char8, 1024> buffer;
platform_file_ref pipe = this->const_derived().get_readable_pipe();
#if QLJS_EVENT_LOOP_READ_PIPE_NON_BLOCKING
QLJS_ASSERT(pipe.is_pipe_non_blocking());
#else
QLJS_ASSERT(!pipe.is_pipe_non_blocking());
#endif
file_read_result read_result = pipe.read(buffer.data(), buffer.size());
if (read_result.at_end_of_file) {
this->done_ = true;
} else if (read_result.error_message.has_value()) {
#if QLJS_HAVE_UNISTD_H
if (errno == EAGAIN) {
#if QLJS_EVENT_LOOP_READ_PIPE_NON_BLOCKING
return;
#else
QLJS_UNREACHABLE();
#endif
}
#endif
QLJS_UNIMPLEMENTED();
} else {
QLJS_ASSERT(read_result.bytes_read != 0);
this->derived().append(string8_view(
buffer.data(), narrow_cast<std::size_t>(read_result.bytes_read)));
}
}

#if QLJS_HAVE_CXX_CONCEPTS
event_loop_delegate
#endif
auto&
derived() {
return *static_cast<Derived*>(this);
}

const
#if QLJS_HAVE_CXX_CONCEPTS
event_loop_delegate
#endif
auto&
const_derived() const {
return *static_cast<const Derived*>(this);
}

bool done_ = false;
};
}

Expand Down
13 changes: 11 additions & 2 deletions test/test-event-loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ void write_full_message(platform_file_ref, string8_view);
struct spy_event_loop : public event_loop<spy_event_loop> {
explicit spy_event_loop(platform_file_ref pipe) : pipe_(pipe) {}

platform_file_ref get_readable_pipe() { return this->pipe_; }
platform_file_ref get_readable_pipe() const { return this->pipe_; }

void append(string8_view data) {
std::unique_lock lock(this->mutex_);
Expand Down Expand Up @@ -54,8 +54,17 @@ struct spy_event_loop : public event_loop<spy_event_loop> {

class test_event_loop : public ::testing::Test {
public:
pipe_fds pipe = make_pipe();
pipe_fds pipe = make_pipe_for_event_loop();
spy_event_loop loop{this->pipe.reader.ref()};

private:
static pipe_fds make_pipe_for_event_loop() {
pipe_fds pipe = make_pipe();
#if QLJS_EVENT_LOOP_READ_PIPE_NON_BLOCKING
pipe.reader.set_pipe_non_blocking();
#endif
return pipe;
}
};

TEST_F(test_event_loop, stops_on_eof) {
Expand Down

0 comments on commit a6875d5

Please sign in to comment.