Skip to content

Commit

Permalink
@@@ LSP: Prevent deadlock with synchronous client
Browse files Browse the repository at this point in the history
@@@ winders
@@@ perf. lots of copying. sadge. =[

If an LSP client sends a bunch of requests, but does not read from
quick-lint-js' stdout, quick-lint-js can hang in pipe_reader::write.
(This came up when writing some benchmarks. My LSP client naively made a
bunch of requests without reading responses during the requests.)

Prevent this (unlikely) deadlock by switching stdout to non-blocking
mode, buffering data in memory, and resuming reading from stdin as soon
as possible.
  • Loading branch information
strager committed Apr 18, 2021
1 parent d6d3afb commit 3ec0294
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 73 deletions.
5 changes: 5 additions & 0 deletions src/file-handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ std::optional<int> posix_fd_file_ref::write(const void *buffer,
return narrow_cast<int>(written_size);
}

void posix_fd_file_ref::set_pipe_non_blocking() {
int rc = ::fcntl(this->get(), F_SETFL, O_NONBLOCK);
QLJS_ASSERT(rc == 0);
}

std::string posix_fd_file_ref::get_last_error_message() {
return std::strerror(errno);
}
Expand Down
86 changes: 85 additions & 1 deletion src/lsp-pipe-writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,40 @@
// See end of file for extended copyright information.

#include <array>
#include <condition_variable>
#include <mutex>
#include <quick-lint-js/assert.h>
#include <quick-lint-js/byte-buffer.h>
#include <quick-lint-js/char8.h>
#include <quick-lint-js/file-handle.h>
#include <quick-lint-js/file.h>
#include <quick-lint-js/have.h>
#include <quick-lint-js/integer.h>
#include <quick-lint-js/lsp-pipe-writer.h>
#include <quick-lint-js/narrow-cast.h>
#include <thread>

#if QLJS_HAVE_FCNTL_H
#include <fcntl.h>
#endif

namespace quick_lint_js {
lsp_pipe_writer::lsp_pipe_writer(platform_file_ref pipe) : pipe_(pipe) {}
lsp_pipe_writer::lsp_pipe_writer(platform_file_ref pipe) : pipe_(pipe) {
#if QLJS_HAVE_FCNTL_H
int rc = ::fcntl(this->pipe_.get(), F_GETFL, O_NONBLOCK);
QLJS_ASSERT(rc >= 0);
QLJS_ASSERT(rc); // O_NONBLOCK should be set.
// @@@ else windows
#endif
}

lsp_pipe_writer::~lsp_pipe_writer() {
if (this->flushing_thread_.joinable()) {
this->stop_ = true;
this->data_is_pending_.notify_one();
this->flushing_thread_.join();
}
}

void lsp_pipe_writer::send_message(const byte_buffer& message) {
this->write(u8"Content-Length: ");
Expand All @@ -34,14 +58,74 @@ void lsp_pipe_writer::write_integer(T value) {
}

void lsp_pipe_writer::write(string8_view message) {
std::unique_lock<std::mutex> lock(this->mutex_);
QLJS_ASSERT(!this->stop_);
if (this->pending_.empty()) {
// The current thread has control over this->pipe_.
string8_view unwritten = this->write_as_much_as_possible(message);
if (!unwritten.empty()) {
// TODO(strager): Avoid copying.
this->pending_.append_copy(unwritten);
lock.unlock();
this->start_flushing_thread_if_needed();
}
} else {
// The flushing thread has control over this->pipe_.
this->pending_.append_copy(message);
}
}

string8_view lsp_pipe_writer::write_as_much_as_possible(string8_view message) {
while (!message.empty()) {
std::optional<int> bytes_written =
this->pipe_.write(message.data(), narrow_cast<int>(message.size()));
if (!bytes_written.has_value()) {
if (errno == EAGAIN) {
break;
}
QLJS_UNIMPLEMENTED();
}
message = message.substr(narrow_cast<std::size_t>(*bytes_written));
}
return message;
}

void lsp_pipe_writer::start_flushing_thread_if_needed() {
if (!this->flushing_thread_.joinable()) {
this->flushing_thread_ =
std::thread([this] { this->run_flushing_thread(); });
}
}

void lsp_pipe_writer::run_flushing_thread() {
std::unique_lock<std::mutex> lock(this->mutex_);
for (;;) {
this->data_is_pending_.wait(
lock, [this] { return this->stop_ || !this->pending_.empty(); });
if (this->stop_) {
break;
}
QLJS_ASSERT(!this->pending_.empty());

// TODO(strager): Don't copy. Write all the chunks with writev if possible.
string8 message_string;
message_string.resize(this->pending_.size());
this->pending_.copy_to(message_string.data());
this->pending_ =
byte_buffer(); // @@@ should make .clear() which reuses allocs
string8_view unwritten = this->write_as_much_as_possible(message_string);
// TODO(strager): Avoid copying.
this->pending_.append_copy(unwritten);
// @@@ thread spins 100%. we should poll()/select().

this->data_is_flushed_.notify_one();
}
}

void lsp_pipe_writer::flush() {
std::unique_lock<std::mutex> lock(this->mutex_);
QLJS_ASSERT(!this->stop_);
this->data_is_flushed_.wait(lock, [this] { return this->pending_.empty(); });
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/quick-lint-js/file-handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ class posix_fd_file_ref {
file_read_result read(void *buffer, int buffer_size) noexcept;
std::optional<int> write(const void *buffer, int buffer_size) noexcept;

void set_pipe_non_blocking();

static std::string get_last_error_message();

protected:
Expand All @@ -118,6 +120,7 @@ class posix_fd_file : private posix_fd_file_ref {
using posix_fd_file_ref::get;
using posix_fd_file_ref::get_last_error_message;
using posix_fd_file_ref::read;
using posix_fd_file_ref::set_pipe_non_blocking;
using posix_fd_file_ref::write;

private:
Expand Down
25 changes: 24 additions & 1 deletion src/quick-lint-js/lsp-pipe-writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
#ifndef QUICK_LINT_JS_LSP_PIPE_WRITER_H
#define QUICK_LINT_JS_LSP_PIPE_WRITER_H

#include <condition_variable>
#include <mutex>
#include <quick-lint-js/char8.h>
#include <quick-lint-js/file-handle.h>
#include <thread>

namespace quick_lint_js {
class byte_buffer;
Expand All @@ -18,15 +21,35 @@ class lsp_pipe_writer {
public:
explicit lsp_pipe_writer(platform_file_ref pipe);

void send_message(const byte_buffer&);
lsp_pipe_writer(const lsp_pipe_writer &) = delete;
lsp_pipe_writer &operator=(const lsp_pipe_writer &) = delete;

~lsp_pipe_writer();

void send_message(const byte_buffer &);
void flush();

private:
template <class T>
void write_integer(T);

void write(string8_view);

string8_view write_as_much_as_possible(string8_view);

void start_flushing_thread_if_needed();
void run_flushing_thread();

platform_file_ref pipe_;

std::thread flushing_thread_;
std::mutex mutex_;
std::condition_variable data_is_pending_;
std::condition_variable data_is_flushed_;

// Protected by mutex_:
byte_buffer pending_;
bool stop_ = false;
};
}

Expand Down
97 changes: 26 additions & 71 deletions test/test-lsp-pipe-writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,13 @@
#include <quick-lint-js/file-handle.h>
#include <quick-lint-js/file.h>
#include <quick-lint-js/have.h>
#include <quick-lint-js/lsp-message-parser.h>
#include <quick-lint-js/lsp-pipe-writer.h>
#include <quick-lint-js/narrow-cast.h>
#include <quick-lint-js/pipe-reader.h>
#include <quick-lint-js/pipe.h>
#include <thread>

#if QLJS_HAVE_PTHREAD_KILL
#include <pthread.h>
#include <signal.h>
#endif

#if QLJS_HAVE_FCNTL_H
#include <fcntl.h>
#endif
Expand All @@ -33,32 +30,10 @@ namespace quick_lint_js {
namespace {
std::size_t pipe_buffer_size(platform_file_ref);

#if QLJS_HAVE_PTHREAD_KILL
class sigaction_guard {
public:
explicit sigaction_guard(int signal_number) : signal_number_(signal_number) {
int rc =
::sigaction(this->signal_number_, nullptr, &this->saved_sigaction_);
QLJS_ALWAYS_ASSERT(rc == 0);
}

~sigaction_guard() {
int rc =
::sigaction(this->signal_number_, &this->saved_sigaction_, nullptr);
QLJS_ALWAYS_ASSERT(rc == 0);
}

sigaction_guard(const sigaction_guard &) = delete;
sigaction_guard &operator=(const sigaction_guard &) = delete;

private:
int signal_number_;
struct sigaction saved_sigaction_;
};
#endif

class test_lsp_pipe_writer : public ::testing::Test {
public:
explicit test_lsp_pipe_writer() { this->pipe.writer.set_pipe_non_blocking(); }

pipe_fds pipe = make_pipe();
lsp_pipe_writer writer{this->pipe.writer.ref()};
};
Expand Down Expand Up @@ -87,6 +62,7 @@ TEST_F(test_lsp_pipe_writer, large_message_sends_fully) {
u8"[" + string8(pipe_buffer_size(this->pipe.writer.ref()) * 3, u8'x') +
u8"]";
this->writer.send_message(byte_buffer_of(message));
this->writer.flush();
this->pipe.writer.close();

read_file_result data = data_future.get();
Expand All @@ -96,52 +72,31 @@ TEST_F(test_lsp_pipe_writer, large_message_sends_fully) {
EXPECT_NE(data_content.find(message), data_content.npos);
}

#if QLJS_HAVE_PTHREAD_KILL
TEST_F(test_lsp_pipe_writer, large_message_sends_fully_with_interrupt) {
sigaction_guard signal_guard(SIGALRM);

::pthread_t writer_thread_id = ::pthread_self();

ASSERT_NE(::signal(SIGALRM,
[](int) {
// Do nothing. Just interrupt syscalls.
}),
SIG_ERR)
<< std::strerror(errno);

std::future<read_file_result> data_future =
std::async(std::launch::async, [this, writer_thread_id]() {
int rc;

// Interrupt the write() syscall, causing it to return early.
std::this_thread::sleep_for(10ms); // Wait for write() to execute.
rc = ::pthread_kill(writer_thread_id, SIGALRM);
EXPECT_EQ(rc, 0) << std::strerror(rc);
// The pipe's buffer should now be full.

// Interrupt the write() syscall again, causing it to restart. This
// write() call shouldn't have written anything, because the pipe's
// buffer is already full.
std::this_thread::sleep_for(1ms); // Wait for write() to execute.
rc = ::pthread_kill(writer_thread_id, SIGALRM);
EXPECT_EQ(rc, 0) << std::strerror(rc);

return read_file("<pipe>", this->pipe.reader.ref());
});

TEST_F(test_lsp_pipe_writer, large_message_with_no_reader_does_not_block) {
string8 message =
u8"[" + string8(pipe_buffer_size(this->pipe.writer.ref()) * 3, u8'x') +
u8"]";
this->writer.send_message(byte_buffer_of(message));
this->pipe.writer.close();

read_file_result data = data_future.get();
ASSERT_TRUE(data.ok()) << data.error;

string8_view data_content = data.content.string_view();
EXPECT_NE(data_content.find(message), data_content.npos);
this->writer.send_message(byte_buffer_of(message)); // Shouldn't block.

static std::promise<string8> received_message_promise;
received_message_promise = std::promise<string8>();

// Read exactly as many bytes as needed to parse the message, then set
// received_message_promise.
std::future<void> receiving_thread = std::async(std::launch::async, [this] {
struct message_handler : lsp_message_parser<message_handler> {
void message_parsed(string8_view message_content) {
received_message_promise.set_value(string8(message_content));
}
};
pipe_reader<message_handler> reader(this->pipe.reader.ref());
reader.run();
});

string8 data = received_message_promise.get_future().get();
this->pipe.writer.close(); // Stop receiving_thread ASAP.
EXPECT_EQ(data, message);
}
#endif

std::size_t pipe_buffer_size([[maybe_unused]] platform_file_ref pipe) {
#if QLJS_HAVE_F_GETPIPE_SZ
Expand Down

0 comments on commit 3ec0294

Please sign in to comment.