From 5fdcb45945d903944d296cdd081787313e9c441d Mon Sep 17 00:00:00 2001 From: "Matthew \"strager\" Glazar" Date: Tue, 15 Jun 2021 17:41:02 -0700 Subject: [PATCH] Use event loop for pipe_writer (POSIX only) For the LSP server, on POSIX platforms, we call read() on one thread and write() on the other for the read and write LSP pipes. Use poll() to multiples the read and write pipes, making the LSP server single-threaded. This commit does not change anything on Windows. --- src/main.cpp | 14 ++++++ src/pipe-writer.cpp | 26 ++++++----- src/quick-lint-js/event-loop.h | 52 ++++++++++++++++----- src/quick-lint-js/pipe-writer.h | 6 +-- test/test-event-loop.cpp | 80 +++++++++++++++++++++++++++++++-- 5 files changed, 148 insertions(+), 30 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index 3b77be25a5..4873a50676 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -404,6 +405,16 @@ void run_lsp_server() { void append(string8_view data) { this->endpoint_.append(data); } +#if QLJS_HAVE_POLL + std::optional<::pollfd> get_pipe_write_pollfd() { + return this->endpoint_.remote().get_pollfd(); + } + + void on_pipe_write_event(const ::pollfd &event) { + this->endpoint_.remote().on_poll_event(event); + } +#endif + private: platform_file_ref input_pipe_; lsp_endpoint, @@ -413,6 +424,9 @@ void run_lsp_server() { #if QLJS_EVENT_LOOP_READ_PIPE_NON_BLOCKING input_pipe.set_pipe_non_blocking(); +#endif +#if !QLJS_PIPE_WRITER_SEPARATE_THREAD + output_pipe.set_pipe_non_blocking(); #endif lsp_event_loop server(input_pipe, output_pipe, &fs); server.run(); diff --git a/src/pipe-writer.cpp b/src/pipe-writer.cpp index 9cf02935c0..fba0208991 100644 --- a/src/pipe-writer.cpp +++ b/src/pipe-writer.cpp @@ -111,30 +111,34 @@ non_blocking_pipe_writer::non_blocking_pipe_writer(platform_file_ref pipe) } void non_blocking_pipe_writer::flush() { - while (!this->pending_.empty()) { #if QLJS_HAVE_POLL - ::pollfd fds[] = {this->get_pollfd()}; - int rc = ::poll(fds, std::size(fds), /*timeout=*/-1); + while (std::optional<::pollfd> event = this->get_pollfd()) { + int rc = ::poll(&*event, 1, /*timeout=*/-1); if (rc == -1) { QLJS_UNIMPLEMENTED(); } - this->on_poll_event(fds[0]); + this->on_poll_event(*event); + } #else #error "Unsupported platform" #endif - } } #if QLJS_HAVE_POLL -::pollfd non_blocking_pipe_writer::get_pollfd() noexcept { - return ::pollfd{ - .fd = this->pipe_.get(), - .events = POLLOUT, - .revents = 0, - }; +std::optional<::pollfd> non_blocking_pipe_writer::get_pollfd() noexcept { + if (this->pending_.empty()) { + return std::nullopt; + } else { + return ::pollfd{ + .fd = this->pipe_.get(), + .events = POLLOUT, + .revents = 0, + }; + } } void non_blocking_pipe_writer::on_poll_event(const ::pollfd& fd) { + QLJS_ASSERT(fd.revents != 0); if (fd.revents & POLLERR) { QLJS_UNIMPLEMENTED(); } diff --git a/src/quick-lint-js/event-loop.h b/src/quick-lint-js/event-loop.h index 9866a1452d..23f899e208 100644 --- a/src/quick-lint-js/event-loop.h +++ b/src/quick-lint-js/event-loop.h @@ -31,9 +31,17 @@ namespace quick_lint_js { #if QLJS_HAVE_CXX_CONCEPTS template concept event_loop_delegate = requires(Delegate d, const Delegate cd, +#if QLJS_HAVE_POLL + const ::pollfd poll_event, +#endif string8_view data) { {cd.get_readable_pipe()}; {d.append(data)}; + +#if QLJS_HAVE_POLL + {d.get_pipe_write_pollfd()}; + {d.on_pipe_write_event(poll_event)}; +#endif }; #endif @@ -52,32 +60,54 @@ class event_loop { public: void run() { while (!this->done_) { + // TODO(strager): Only call read() if poll() tells us that data is + // available. this->read_from_pipe(); if (this->done_) { break; } #if QLJS_HAVE_POLL + static_assert(QLJS_EVENT_LOOP_READ_PIPE_NON_BLOCKING); platform_file_ref pipe = this->const_derived().get_readable_pipe(); - ::pollfd pollfds[] = { -#if QLJS_EVENT_LOOP_READ_PIPE_NON_BLOCKING - {.fd = pipe.get(), .events = POLLIN, .revents = 0}, -#endif - }; - QLJS_ASSERT(std::size(pollfds) > 0); - int rc = ::poll(pollfds, std::size(pollfds), /*timeout=*/-1); + QLJS_ASSERT(pipe.is_pipe_non_blocking()); + + std::array<::pollfd, 2> pollfds; + std::size_t pollfd_count = 0; + + std::size_t read_pipe_index = pollfd_count; + pollfds[pollfd_count++] = + ::pollfd{.fd = pipe.get(), .events = POLLIN, .revents = 0}; + + std::size_t write_pipe_index = pollfd_count; + if (std::optional<::pollfd> event = + this->derived().get_pipe_write_pollfd()) { + pollfds[pollfd_count++] = *event; + } + + QLJS_ASSERT(pollfd_count > 0); + QLJS_ASSERT(pollfd_count <= pollfds.size()); + int rc = ::poll(pollfds.data(), narrow_cast<::nfds_t>(pollfd_count), + /*timeout=*/-1); if (rc == -1) { QLJS_UNIMPLEMENTED(); } QLJS_ASSERT(rc > 0); -#if QLJS_EVENT_LOOP_READ_PIPE_NON_BLOCKING - if (pollfds[0].revents & POLLIN) { + + const ::pollfd& read_pipe_event = pollfds[read_pipe_index]; + if (read_pipe_event.revents & POLLIN) { continue; } - if (pollfds[0].revents & POLLERR) { + if (read_pipe_event.revents & POLLERR) { QLJS_UNIMPLEMENTED(); } -#endif + + if (pollfd_count > 1) { + const ::pollfd& write_pipe_event = pollfds[write_pipe_index]; + if (write_pipe_event.revents != 0) { + this->derived().on_pipe_write_event(write_pipe_event); + } + } #elif defined(_WIN32) // Nothing to wait for. static_assert(!QLJS_EVENT_LOOP_READ_PIPE_NON_BLOCKING); diff --git a/src/quick-lint-js/pipe-writer.h b/src/quick-lint-js/pipe-writer.h index 4cda5adb02..495eabb924 100644 --- a/src/quick-lint-js/pipe-writer.h +++ b/src/quick-lint-js/pipe-writer.h @@ -18,9 +18,7 @@ #if defined(_WIN32) #define QLJS_PIPE_WRITER_SEPARATE_THREAD 1 #else -// TODO(strager): Use non-blocking I/O after we change the LSP server to use an -// event loop. -#define QLJS_PIPE_WRITER_SEPARATE_THREAD 1 +#define QLJS_PIPE_WRITER_SEPARATE_THREAD 0 #endif namespace quick_lint_js { @@ -89,7 +87,7 @@ class non_blocking_pipe_writer { void flush(); #if QLJS_HAVE_POLL - ::pollfd get_pollfd() noexcept; + std::optional<::pollfd> get_pollfd() noexcept; void on_poll_event(const ::pollfd &); #endif diff --git a/test/test-event-loop.cpp b/test/test-event-loop.cpp index e584898e37..84c8f64f45 100644 --- a/test/test-event-loop.cpp +++ b/test/test-event-loop.cpp @@ -1,6 +1,7 @@ // Copyright (C) 2020 Matthew "strager" Glazar // See end of file for extended copyright information. +#include #include #include #include @@ -13,6 +14,7 @@ using ::testing::ElementsAre; using ::testing::IsEmpty; +using namespace std::literals::chrono_literals; namespace quick_lint_js { namespace { @@ -35,13 +37,29 @@ struct spy_event_loop : public event_loop { } template - void wait_until_data(Func &&predicate) { + void wait_until_data(Func&& predicate) { std::unique_lock lock(this->mutex_); this->new_data_.wait(lock, [this, &predicate]() -> bool { return predicate(this->read_data_); }); } +#if QLJS_HAVE_POLL + std::optional<::pollfd> get_pipe_write_pollfd() const { + return this->pipe_write_pollfd_; + } + + void on_pipe_write_event(const ::pollfd& event) { + this->pipe_write_event_callback_(event); + } + + void set_pipe_write_pollfd(const ::pollfd& event, + std::function on_event) { + this->pipe_write_pollfd_ = event; + this->pipe_write_event_callback_ = on_event; + } +#endif + private: platform_file_ref pipe_; @@ -50,6 +68,11 @@ struct spy_event_loop : public event_loop { // Protected by mutex_: string8 read_data_; + +#if QLJS_HAVE_POLL + std::optional<::pollfd> pipe_write_pollfd_; + std::function pipe_write_event_callback_; +#endif }; class test_event_loop : public ::testing::Test { @@ -67,7 +90,7 @@ class test_event_loop : public ::testing::Test { } }; -TEST_F(test_event_loop, stops_on_eof) { +TEST_F(test_event_loop, stops_on_pipe_read_eof) { this->pipe.writer.close(); this->loop.run(); @@ -87,11 +110,11 @@ TEST_F(test_event_loop, reads_many_messages) { std::thread writer_thread([this]() { write_full_message(this->pipe.writer.ref(), u8"first"); this->loop.wait_until_data( - [](const string8 &data) -> bool { return data == u8"first"; }); + [](const string8& data) -> bool { return data == u8"first"; }); write_full_message(this->pipe.writer.ref(), u8"SECOND"); this->loop.wait_until_data( - [](const string8 &data) -> bool { return data == u8"firstSECOND"; }); + [](const string8& data) -> bool { return data == u8"firstSECOND"; }); this->pipe.writer.close(); }); @@ -102,6 +125,55 @@ TEST_F(test_event_loop, reads_many_messages) { EXPECT_EQ(this->loop.get_read_data(), u8"firstSECOND"); } +#if QLJS_HAVE_POLL +TEST_F(test_event_loop, signals_writable_pipe) { + bool called = false; + this->loop.set_pipe_write_pollfd( + ::pollfd{ + .fd = this->pipe.writer.get(), + .events = POLLOUT, + .revents = 0, + }, + [this, &called](const ::pollfd& event) { + called = true; + EXPECT_EQ(event.fd, this->pipe.writer.get()); + EXPECT_TRUE(event.revents & POLLOUT); + // Stop event_loop::run. + this->pipe.writer.close(); + }); + + this->loop.run(); + EXPECT_TRUE(called); +} + +TEST_F(test_event_loop, does_not_write_to_unwritable_pipe) { + // Make a pipe such that POLLOUT will not be signalled. + pipe_fds full_pipe = make_pipe(); + full_pipe.writer.set_pipe_non_blocking(); + write_full_message(full_pipe.writer.ref(), + string8(full_pipe.writer.get_pipe_buffer_size(), 'x')); + + this->loop.set_pipe_write_pollfd( + ::pollfd{ + .fd = full_pipe.writer.get(), + .events = POLLOUT, + .revents = 0, + }, + [](const ::pollfd&) { + ADD_FAILURE() << "on_pipe_write_event should not be called"; + }); + + std::thread writer_thread([this]() { + std::this_thread::sleep_for(10ms); + // Interrupt event_loop::run on the main thread. + this->pipe.writer.close(); + }); + this->loop.run(); + + writer_thread.join(); +} +#endif + void write_full_message(platform_file_ref file, string8_view message) { std::optional bytes_written = file.write(message.data(), narrow_cast(message.size()));