Skip to content

Commit

Permalink
Use event loop for pipe_writer (POSIX only)
Browse files Browse the repository at this point in the history
For the LSP server, on POSIX platforms, we call read() on one thread and
write() on the other for the read and write LSP pipes. Use poll() to
multiples the read and write pipes, making the LSP server
single-threaded.

This commit does not change anything on Windows.
  • Loading branch information
strager committed Jun 16, 2021
1 parent a6875d5 commit 5fdcb45
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 30 deletions.
14 changes: 14 additions & 0 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <quick-lint-js/padded-string.h>
#include <quick-lint-js/parse-visitor.h>
#include <quick-lint-js/parse.h>
#include <quick-lint-js/pipe-writer.h>
#include <quick-lint-js/text-error-reporter.h>
#include <quick-lint-js/translation.h>
#include <quick-lint-js/unreachable.h>
Expand Down Expand Up @@ -404,6 +405,16 @@ void run_lsp_server() {

void append(string8_view data) { this->endpoint_.append(data); }

#if QLJS_HAVE_POLL
std::optional<::pollfd> get_pipe_write_pollfd() {
return this->endpoint_.remote().get_pollfd();
}

void on_pipe_write_event(const ::pollfd &event) {
this->endpoint_.remote().on_poll_event(event);
}
#endif

private:
platform_file_ref input_pipe_;
lsp_endpoint<linting_lsp_server_handler<lsp_javascript_linter>,
Expand All @@ -413,6 +424,9 @@ void run_lsp_server() {

#if QLJS_EVENT_LOOP_READ_PIPE_NON_BLOCKING
input_pipe.set_pipe_non_blocking();
#endif
#if !QLJS_PIPE_WRITER_SEPARATE_THREAD
output_pipe.set_pipe_non_blocking();
#endif
lsp_event_loop server(input_pipe, output_pipe, &fs);
server.run();
Expand Down
26 changes: 15 additions & 11 deletions src/pipe-writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,30 +111,34 @@ non_blocking_pipe_writer::non_blocking_pipe_writer(platform_file_ref pipe)
}

void non_blocking_pipe_writer::flush() {
while (!this->pending_.empty()) {
#if QLJS_HAVE_POLL
::pollfd fds[] = {this->get_pollfd()};
int rc = ::poll(fds, std::size(fds), /*timeout=*/-1);
while (std::optional<::pollfd> event = this->get_pollfd()) {
int rc = ::poll(&*event, 1, /*timeout=*/-1);
if (rc == -1) {
QLJS_UNIMPLEMENTED();
}
this->on_poll_event(fds[0]);
this->on_poll_event(*event);
}
#else
#error "Unsupported platform"
#endif
}
}

#if QLJS_HAVE_POLL
::pollfd non_blocking_pipe_writer::get_pollfd() noexcept {
return ::pollfd{
.fd = this->pipe_.get(),
.events = POLLOUT,
.revents = 0,
};
std::optional<::pollfd> non_blocking_pipe_writer::get_pollfd() noexcept {
if (this->pending_.empty()) {
return std::nullopt;
} else {
return ::pollfd{
.fd = this->pipe_.get(),
.events = POLLOUT,
.revents = 0,
};
}
}

void non_blocking_pipe_writer::on_poll_event(const ::pollfd& fd) {
QLJS_ASSERT(fd.revents != 0);
if (fd.revents & POLLERR) {
QLJS_UNIMPLEMENTED();
}
Expand Down
52 changes: 41 additions & 11 deletions src/quick-lint-js/event-loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,17 @@ namespace quick_lint_js {
#if QLJS_HAVE_CXX_CONCEPTS
template <class Delegate>
concept event_loop_delegate = requires(Delegate d, const Delegate cd,
#if QLJS_HAVE_POLL
const ::pollfd poll_event,
#endif
string8_view data) {
{cd.get_readable_pipe()};
{d.append(data)};

#if QLJS_HAVE_POLL
{d.get_pipe_write_pollfd()};
{d.on_pipe_write_event(poll_event)};
#endif
};
#endif

Expand All @@ -52,32 +60,54 @@ class event_loop {
public:
void run() {
while (!this->done_) {
// TODO(strager): Only call read() if poll() tells us that data is
// available.
this->read_from_pipe();
if (this->done_) {
break;
}

#if QLJS_HAVE_POLL
static_assert(QLJS_EVENT_LOOP_READ_PIPE_NON_BLOCKING);
platform_file_ref pipe = this->const_derived().get_readable_pipe();
::pollfd pollfds[] = {
#if QLJS_EVENT_LOOP_READ_PIPE_NON_BLOCKING
{.fd = pipe.get(), .events = POLLIN, .revents = 0},
#endif
};
QLJS_ASSERT(std::size(pollfds) > 0);
int rc = ::poll(pollfds, std::size(pollfds), /*timeout=*/-1);
QLJS_ASSERT(pipe.is_pipe_non_blocking());

std::array<::pollfd, 2> pollfds;
std::size_t pollfd_count = 0;

std::size_t read_pipe_index = pollfd_count;
pollfds[pollfd_count++] =
::pollfd{.fd = pipe.get(), .events = POLLIN, .revents = 0};

std::size_t write_pipe_index = pollfd_count;
if (std::optional<::pollfd> event =
this->derived().get_pipe_write_pollfd()) {
pollfds[pollfd_count++] = *event;
}

QLJS_ASSERT(pollfd_count > 0);
QLJS_ASSERT(pollfd_count <= pollfds.size());
int rc = ::poll(pollfds.data(), narrow_cast<::nfds_t>(pollfd_count),
/*timeout=*/-1);
if (rc == -1) {
QLJS_UNIMPLEMENTED();
}
QLJS_ASSERT(rc > 0);
#if QLJS_EVENT_LOOP_READ_PIPE_NON_BLOCKING
if (pollfds[0].revents & POLLIN) {

const ::pollfd& read_pipe_event = pollfds[read_pipe_index];
if (read_pipe_event.revents & POLLIN) {
continue;
}
if (pollfds[0].revents & POLLERR) {
if (read_pipe_event.revents & POLLERR) {
QLJS_UNIMPLEMENTED();
}
#endif

if (pollfd_count > 1) {
const ::pollfd& write_pipe_event = pollfds[write_pipe_index];
if (write_pipe_event.revents != 0) {
this->derived().on_pipe_write_event(write_pipe_event);
}
}
#elif defined(_WIN32)
// Nothing to wait for.
static_assert(!QLJS_EVENT_LOOP_READ_PIPE_NON_BLOCKING);
Expand Down
6 changes: 2 additions & 4 deletions src/quick-lint-js/pipe-writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
#if defined(_WIN32)
#define QLJS_PIPE_WRITER_SEPARATE_THREAD 1
#else
// TODO(strager): Use non-blocking I/O after we change the LSP server to use an
// event loop.
#define QLJS_PIPE_WRITER_SEPARATE_THREAD 1
#define QLJS_PIPE_WRITER_SEPARATE_THREAD 0
#endif

namespace quick_lint_js {
Expand Down Expand Up @@ -89,7 +87,7 @@ class non_blocking_pipe_writer {
void flush();

#if QLJS_HAVE_POLL
::pollfd get_pollfd() noexcept;
std::optional<::pollfd> get_pollfd() noexcept;
void on_poll_event(const ::pollfd &);
#endif

Expand Down
80 changes: 76 additions & 4 deletions test/test-event-loop.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (C) 2020 Matthew "strager" Glazar
// See end of file for extended copyright information.

#include <chrono>
#include <condition_variable>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
Expand All @@ -13,6 +14,7 @@

using ::testing::ElementsAre;
using ::testing::IsEmpty;
using namespace std::literals::chrono_literals;

namespace quick_lint_js {
namespace {
Expand All @@ -35,13 +37,29 @@ struct spy_event_loop : public event_loop<spy_event_loop> {
}

template <class Func>
void wait_until_data(Func &&predicate) {
void wait_until_data(Func&& predicate) {
std::unique_lock lock(this->mutex_);
this->new_data_.wait(lock, [this, &predicate]() -> bool {
return predicate(this->read_data_);
});
}

#if QLJS_HAVE_POLL
std::optional<::pollfd> get_pipe_write_pollfd() const {
return this->pipe_write_pollfd_;
}

void on_pipe_write_event(const ::pollfd& event) {
this->pipe_write_event_callback_(event);
}

void set_pipe_write_pollfd(const ::pollfd& event,
std::function<void(const ::pollfd&)> on_event) {
this->pipe_write_pollfd_ = event;
this->pipe_write_event_callback_ = on_event;
}
#endif

private:
platform_file_ref pipe_;

Expand All @@ -50,6 +68,11 @@ struct spy_event_loop : public event_loop<spy_event_loop> {

// Protected by mutex_:
string8 read_data_;

#if QLJS_HAVE_POLL
std::optional<::pollfd> pipe_write_pollfd_;
std::function<void(const ::pollfd&)> pipe_write_event_callback_;
#endif
};

class test_event_loop : public ::testing::Test {
Expand All @@ -67,7 +90,7 @@ class test_event_loop : public ::testing::Test {
}
};

TEST_F(test_event_loop, stops_on_eof) {
TEST_F(test_event_loop, stops_on_pipe_read_eof) {
this->pipe.writer.close();

this->loop.run();
Expand All @@ -87,11 +110,11 @@ TEST_F(test_event_loop, reads_many_messages) {
std::thread writer_thread([this]() {
write_full_message(this->pipe.writer.ref(), u8"first");
this->loop.wait_until_data(
[](const string8 &data) -> bool { return data == u8"first"; });
[](const string8& data) -> bool { return data == u8"first"; });

write_full_message(this->pipe.writer.ref(), u8"SECOND");
this->loop.wait_until_data(
[](const string8 &data) -> bool { return data == u8"firstSECOND"; });
[](const string8& data) -> bool { return data == u8"firstSECOND"; });

this->pipe.writer.close();
});
Expand All @@ -102,6 +125,55 @@ TEST_F(test_event_loop, reads_many_messages) {
EXPECT_EQ(this->loop.get_read_data(), u8"firstSECOND");
}

#if QLJS_HAVE_POLL
TEST_F(test_event_loop, signals_writable_pipe) {
bool called = false;
this->loop.set_pipe_write_pollfd(
::pollfd{
.fd = this->pipe.writer.get(),
.events = POLLOUT,
.revents = 0,
},
[this, &called](const ::pollfd& event) {
called = true;
EXPECT_EQ(event.fd, this->pipe.writer.get());
EXPECT_TRUE(event.revents & POLLOUT);
// Stop event_loop::run.
this->pipe.writer.close();
});

this->loop.run();
EXPECT_TRUE(called);
}

TEST_F(test_event_loop, does_not_write_to_unwritable_pipe) {
// Make a pipe such that POLLOUT will not be signalled.
pipe_fds full_pipe = make_pipe();
full_pipe.writer.set_pipe_non_blocking();
write_full_message(full_pipe.writer.ref(),
string8(full_pipe.writer.get_pipe_buffer_size(), 'x'));

this->loop.set_pipe_write_pollfd(
::pollfd{
.fd = full_pipe.writer.get(),
.events = POLLOUT,
.revents = 0,
},
[](const ::pollfd&) {
ADD_FAILURE() << "on_pipe_write_event should not be called";
});

std::thread writer_thread([this]() {
std::this_thread::sleep_for(10ms);
// Interrupt event_loop::run on the main thread.
this->pipe.writer.close();
});
this->loop.run();

writer_thread.join();
}
#endif

void write_full_message(platform_file_ref file, string8_view message) {
std::optional<int> bytes_written =
file.write(message.data(), narrow_cast<int>(message.size()));
Expand Down

0 comments on commit 5fdcb45

Please sign in to comment.