Skip to content
This repository has been archived by the owner on Dec 29, 2022. It is now read-only.

Managed concurrency #923

Merged
merged 1 commit into from
Jul 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ url = "1.1.0"
walkdir = "2.1"
regex = "1"
ordslice = "0.3"
crossbeam-channel = "0.2.1"

[dev-dependencies]
json = "0.11"
Expand Down
14 changes: 14 additions & 0 deletions src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::build::*;
use crate::lsp_data;
use crate::lsp_data::*;
use crate::server::Output;
use crate::concurrency::{ConcurrentJob, Jobs};

use std::collections::HashMap;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -150,6 +151,7 @@ pub struct InitActionContext {
prev_changes: Arc<Mutex<HashMap<PathBuf, u64>>>,

config: Arc<Mutex<Config>>,
jobs: Arc<Mutex<Jobs>>,
client_capabilities: Arc<lsp_data::ClientCapabilities>,
client_supports_cmd_run: bool,
/// Whether the server is performing cleanup (after having received
Expand Down Expand Up @@ -199,6 +201,7 @@ impl InitActionContext {
analysis_queue,
vfs,
config,
jobs: Arc::new(Mutex::new(Jobs::new())),
current_project,
previous_build_results: Arc::new(Mutex::new(HashMap::new())),
build_queue,
Expand Down Expand Up @@ -238,6 +241,8 @@ impl InitActionContext {
}

fn build<O: Output>(&self, project_path: &Path, priority: BuildPriority, out: &O) {
let (job, token) = ConcurrentJob::new();
self.add_job(job);

let pbh = {
let config = self.config.lock().unwrap();
Expand All @@ -253,6 +258,7 @@ impl InitActionContext {
use_black_list: config.use_crate_blacklist,
notifier: Box::new(BuildDiagnosticsNotifier::new(out.clone())),
blocked_threads: vec![],
token,
}
};

Expand All @@ -267,6 +273,14 @@ impl InitActionContext {
self.build(&self.current_project, priority, out);
}

pub fn add_job(&self, job: ConcurrentJob) {
self.jobs.lock().unwrap().add(job);
}

pub fn wait_for_concurrent_jobs(&self) {
self.jobs.lock().unwrap().wait_for_all();
}

/// Block until any builds and analysis tasks are complete.
fn block_on_build(&self) {
self.build_queue.block_on_build();
Expand Down
5 changes: 4 additions & 1 deletion src/actions/post_build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ use std::thread::{self, Thread};
use crate::actions::diagnostics::{parse_diagnostics, Diagnostic, ParsedDiagnostics, Suggestion};
use crate::actions::progress::DiagnosticsNotifier;
use crate::build::BuildResult;
use crate::lsp_data::PublishDiagnosticsParams;
use crate::concurrency::JobToken;
use languageserver_types::DiagnosticSeverity;
use itertools::Itertools;
use crate::lsp_data::PublishDiagnosticsParams;

use rls_analysis::AnalysisHost;
use rls_data::Analysis;
Expand All @@ -46,6 +47,8 @@ pub struct PostBuildHandler {
pub active_build_count: Arc<AtomicUsize>,
pub notifier: Box<dyn DiagnosticsNotifier>,
pub blocked_threads: Vec<thread::Thread>,
#[allow(unused)] // for drop
pub token: JobToken,
}

impl PostBuildHandler {
Expand Down
1 change: 0 additions & 1 deletion src/build/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ impl BuildQueue {
if needs_compilation_ctx_from_cargo {
priority = BuildPriority::Cargo;
}

let build = PendingBuild {
build_dir: new_build_dir.to_owned(),
built_files: self.internals.dirty_files.lock().unwrap().clone(),
Expand Down
100 changes: 100 additions & 0 deletions src/concurrency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use std::{thread};

use crossbeam_channel::{bounded, Receiver, Sender};

/// `ConcurrentJob` is a handle for some long-running computation
/// off the main thread. It can be used, indirectly, to wait for
/// the completion of the said computation.
///
/// All `ConncurrentJob`s must eventually be stored in a `Jobs` table.
///
/// All concurrent activities, like spawning a thread or pushing
/// a work item to a job queue, should be covered by `ConcurrentJob`.
/// This way, the set of `Jobs` table will give a complete overview of
/// concurrency in the system, and it will be possinle to wait for all
/// jobs to finish, which helps tremendously with making tests deterministic.
///
/// `JobToken` is the worker-side counterpart of `ConcurrentJob`. Dropping
/// a `JobToken` signals that the corresponding job has finished.
#[must_use]
pub struct ConcurrentJob {
chan: Receiver<Never>,
}

pub struct JobToken {
#[allow(unused)] // for drop
chan: Sender<Never>,
}

pub struct Jobs {
jobs: Vec<ConcurrentJob>,
}

impl Jobs {
pub fn new() -> Jobs {
Jobs { jobs: Vec::new() }
}

pub fn add(&mut self, job: ConcurrentJob) {
self.gc();
self.jobs.push(job);
}

/// Blocks the current thread until all pending jobs are finished.
pub fn wait_for_all(&mut self) {
while !self.jobs.is_empty() {
let done: usize = {
let chans = self.jobs.iter().map(|j| &j.chan);
select! {
recv(chans, msg, from) => {
assert!(msg.is_none());
self.jobs.iter().position(|j| &j.chan == from).unwrap()
}
}
};
drop(self.jobs.swap_remove(done));
}
}

fn gc(&mut self) {
self.jobs.retain(|job| !job.is_completed())
}
}

impl ConcurrentJob {
pub fn new() -> (ConcurrentJob, JobToken) {
let (tx, rx) = bounded(0);
let job = ConcurrentJob { chan: rx };
let token = JobToken { chan: tx };
(job, token)
}

fn is_completed(&self) -> bool {
is_closed(&self.chan)
}
}

impl Drop for ConcurrentJob {
fn drop(&mut self) {
if self.is_completed() || thread::panicking() {
return;
}
panic!("orphaned concurrent job");
}
}

// We don't actually send messages through the channels,
// and instead just check if the channel is closed,
// so we use uninhabited enum as a message type
enum Never {}

/// Nonblocking
fn is_closed(chan: &Receiver<Never>) -> bool {
select! {
recv(chan, msg) => match msg {
None => true,
Some(never) => match never {}
}
default => false,
}
}
3 changes: 3 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ extern crate log;
extern crate serde_derive;
#[macro_use]
extern crate serde_json;
#[macro_use]
extern crate crossbeam_channel;

use std::env;
use std::sync::Arc;
Expand All @@ -57,6 +59,7 @@ pub mod cmd;
pub mod config;
pub mod lsp_data;
pub mod server;
pub mod concurrency;

#[cfg(test)]
mod test;
Expand Down
Loading