Skip to content

Commit

Permalink
feat(plugins): introduce 'pipes', allowing users to pipe data to and …
Browse files Browse the repository at this point in the history
…control plugins from the command line (#3066)

* prototype - working with message from the cli

* prototype - pipe from the CLI to plugins

* prototype - pipe from the CLI to plugins and back again

* prototype - working with better cli interface

* prototype - working after removing unused stuff

* prototype - working with launching plugin if it is not launched, also fixed event ordering

* refactor: change message to cli-message

* prototype - allow plugins to send messages to each other

* fix: allow cli messages to send plugin parameters (and implement backpressure)

* fix: use input_pipe_id to identify cli pipes instead of their message name

* fix: come cleanups and add skip_cache parameter

* fix: pipe/client-server communication robustness

* fix: leaking messages between plugins while loading

* feat: allow plugins to specify how a new plugin instance is launched when sending messages

* fix: add permissions

* refactor: adjust cli api

* fix: improve cli plugin loading error messages

* docs: cli pipe

* fix: take plugin configuration into account when messaging between plugins

* refactor: pipe message protobuf interface

* refactor: update(event) -> pipe

* refactor - rename CliMessage to CliPipe

* fix: add is_private to pipes and change some naming

* refactor - cli client

* refactor: various cleanups

* style(fmt): rustfmt

* fix(pipes): backpressure across multiple plugins

* style: some cleanups

* style(fmt): rustfmt

* style: fix merge conflict mistake

* style(wording): clarify pipe permission
  • Loading branch information
imsnif authored Jan 17, 2024
1 parent f6d5729 commit d780bd9
Show file tree
Hide file tree
Showing 48 changed files with 3,071 additions and 305 deletions.
31 changes: 31 additions & 0 deletions default-plugins/fixture-plugin-for-tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ struct State {
received_events: Vec<Event>,
received_payload: Option<String>,
configuration: BTreeMap<String, String>,
message_to_plugin_payload: Option<String>,
}

#[derive(Default, Serialize, Deserialize)]
Expand All @@ -34,9 +35,12 @@ impl<'de> ZellijWorker<'de> for TestWorker {
}
}

#[cfg(target_family = "wasm")]
register_plugin!(State);
#[cfg(target_family = "wasm")]
register_worker!(TestWorker, test_worker, TEST_WORKER);

#[cfg(target_family = "wasm")]
impl ZellijPlugin for State {
fn load(&mut self, configuration: BTreeMap<String, String>) {
request_permission(&[
Expand All @@ -49,6 +53,8 @@ impl ZellijPlugin for State {
PermissionType::OpenTerminalsOrPlugins,
PermissionType::WriteToStdin,
PermissionType::WebAccess,
PermissionType::ReadCliPipes,
PermissionType::MessageAndLaunchOtherPlugins,
]);
self.configuration = configuration;
subscribe(&[
Expand Down Expand Up @@ -295,10 +301,35 @@ impl ZellijPlugin for State {
self.received_events.push(event);
should_render
}
fn pipe(&mut self, pipe_message: PipeMessage) -> bool {
let input_pipe_id = match pipe_message.source {
PipeSource::Cli(id) => id.clone(),
PipeSource::Plugin(id) => format!("{}", id),
};
let name = pipe_message.name;
let payload = pipe_message.payload;
if name == "message_name" && payload == Some("message_payload".to_owned()) {
unblock_cli_pipe_input(&input_pipe_id);
} else if name == "message_name_block" {
block_cli_pipe_input(&input_pipe_id);
} else if name == "pipe_output" {
cli_pipe_output(&name, "this_is_my_output");
} else if name == "pipe_message_to_plugin" {
pipe_message_to_plugin(
MessageToPlugin::new("message_to_plugin").with_payload("my_cool_payload"),
);
} else if name == "message_to_plugin" {
self.message_to_plugin_payload = payload.clone();
}
let should_render = true;
should_render
}

fn render(&mut self, rows: usize, cols: usize) {
if let Some(payload) = self.received_payload.as_ref() {
println!("Payload from worker: {:?}", payload);
} else if let Some(payload) = self.message_to_plugin_payload.take() {
println!("Payload from self: {:?}", payload);
} else {
println!(
"Rows: {:?}, Cols: {:?}, Received events: {:?}",
Expand Down
25 changes: 25 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,31 @@ fn main() {
commands::convert_old_theme_file(old_theme_file);
std::process::exit(0);
}
if let Some(Command::Sessions(Sessions::Pipe {
name,
payload,
args,
plugin,
plugin_configuration,
})) = opts.command
{
let command_cli_action = CliAction::Pipe {
name,
payload,
args,
plugin,
plugin_configuration,

force_launch_plugin: false,
skip_plugin_cache: false,
floating_plugin: None,
in_place_plugin: None,
plugin_cwd: None,
plugin_title: None,
};
commands::send_action_to_session(command_cli_action, opts.session, config);
std::process::exit(0);
}
}

if let Some(Command::Sessions(Sessions::ListSessions {
Expand Down
181 changes: 177 additions & 4 deletions zellij-client/src/cli_client.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
//! The `[cli_client]` is used to attach to a running server session
//! and dispatch actions, that are specified through the command line.
use std::collections::BTreeMap;
use std::io::BufRead;
use std::process;
use std::{fs, path::PathBuf};

use crate::os_input_output::ClientOsApi;
use zellij_utils::{
errors::prelude::*,
input::actions::Action,
ipc::{ClientToServerMsg, ServerToClientMsg},
ipc::{ClientToServerMsg, ExitReason, ServerToClientMsg},
uuid::Uuid,
};

pub fn start_cli_client(os_input: Box<dyn ClientOsApi>, session_name: &str, actions: Vec<Action>) {
pub fn start_cli_client(
mut os_input: Box<dyn ClientOsApi>,
session_name: &str,
actions: Vec<Action>,
) {
let zellij_ipc_pipe: PathBuf = {
let mut sock_dir = zellij_utils::consts::ZELLIJ_SOCK_DIR.clone();
fs::create_dir_all(&sock_dir).unwrap();
Expand All @@ -21,10 +29,166 @@ pub fn start_cli_client(os_input: Box<dyn ClientOsApi>, session_name: &str, acti
let pane_id = os_input
.env_variable("ZELLIJ_PANE_ID")
.and_then(|e| e.trim().parse().ok());

for action in actions {
let msg = ClientToServerMsg::Action(action, pane_id, None);
os_input.send_to_server(msg);
match action {
Action::CliPipe {
pipe_id,
name,
payload,
plugin,
args,
configuration,
launch_new,
skip_cache,
floating,
in_place,
cwd,
pane_title,
} => {
pipe_client(
&mut os_input,
pipe_id,
name,
payload,
plugin,
args,
configuration,
launch_new,
skip_cache,
floating,
in_place,
pane_id,
cwd,
pane_title,
);
},
action => {
single_message_client(&mut os_input, action, pane_id);
},
}
}
}

fn pipe_client(
os_input: &mut Box<dyn ClientOsApi>,
pipe_id: String,
mut name: Option<String>,
payload: Option<String>,
plugin: Option<String>,
args: Option<BTreeMap<String, String>>,
mut configuration: Option<BTreeMap<String, String>>,
launch_new: bool,
skip_cache: bool,
floating: Option<bool>,
in_place: Option<bool>,
pane_id: Option<u32>,
cwd: Option<PathBuf>,
pane_title: Option<String>,
) {
let mut stdin = os_input.get_stdin_reader();
let name = name.take().or_else(|| Some(Uuid::new_v4().to_string()));
if launch_new {
// we do this to make sure the plugin is unique (has a unique configuration parameter) so
// that a new one would be launched, but we'll still send it to the same instance rather
// than launching a new one in every iteration of the loop
configuration
.get_or_insert_with(BTreeMap::new)
.insert("_zellij_id".to_owned(), Uuid::new_v4().to_string());
}
let create_msg = |payload: Option<String>| -> ClientToServerMsg {
ClientToServerMsg::Action(
Action::CliPipe {
pipe_id: pipe_id.clone(),
name: name.clone(),
payload,
args: args.clone(),
plugin: plugin.clone(),
configuration: configuration.clone(),
floating,
in_place,
launch_new,
skip_cache,
cwd: cwd.clone(),
pane_title: pane_title.clone(),
},
pane_id,
None,
)
};
loop {
if payload.is_some() {
// we got payload from the command line, we should use it and not wait for more
let msg = create_msg(payload);
os_input.send_to_server(msg);
break;
}
// we didn't get payload from the command line, meaning we listen on STDIN because this
// signifies the user is about to pipe more (eg. cat my-large-file | zellij pipe ...)
let mut buffer = String::new();
let _ = stdin.read_line(&mut buffer);
if buffer.is_empty() {
// end of pipe, send an empty message down the pipe
let msg = create_msg(None);
os_input.send_to_server(msg);
break;
} else {
// we've got data! send it down the pipe (most common)
let msg = create_msg(Some(buffer));
os_input.send_to_server(msg);
}
loop {
// wait for a response and act accordingly
match os_input.recv_from_server() {
Some((ServerToClientMsg::UnblockCliPipeInput(pipe_name), _)) => {
// unblock this pipe, meaning we need to stop waiting for a response and read
// once more from STDIN
if pipe_name == pipe_id {
break;
}
},
Some((ServerToClientMsg::CliPipeOutput(pipe_name, output), _)) => {
// send data to STDOUT, this *does not* mean we need to unblock the input
let err_context = "Failed to write to stdout";
if pipe_name == pipe_id {
let mut stdout = os_input.get_stdout_writer();
stdout
.write_all(output.as_bytes())
.context(err_context)
.non_fatal();
stdout.flush().context(err_context).non_fatal();
}
},
Some((ServerToClientMsg::Log(log_lines), _)) => {
log_lines.iter().for_each(|line| println!("{line}"));
process::exit(0);
},
Some((ServerToClientMsg::LogError(log_lines), _)) => {
log_lines.iter().for_each(|line| eprintln!("{line}"));
process::exit(2);
},
Some((ServerToClientMsg::Exit(exit_reason), _)) => match exit_reason {
ExitReason::Error(e) => {
eprintln!("{}", e);
process::exit(2);
},
_ => {
process::exit(0);
},
},
_ => {},
}
}
}
}

fn single_message_client(
os_input: &mut Box<dyn ClientOsApi>,
action: Action,
pane_id: Option<u32>,
) {
let msg = ClientToServerMsg::Action(action, pane_id, None);
os_input.send_to_server(msg);
loop {
match os_input.recv_from_server() {
Some((ServerToClientMsg::UnblockInputThread, _)) => {
Expand All @@ -39,6 +203,15 @@ pub fn start_cli_client(os_input: Box<dyn ClientOsApi>, session_name: &str, acti
log_lines.iter().for_each(|line| eprintln!("{line}"));
process::exit(2);
},
Some((ServerToClientMsg::Exit(exit_reason), _)) => match exit_reason {
ExitReason::Error(e) => {
eprintln!("{}", e);
process::exit(2);
},
_ => {
process::exit(0);
},
},
_ => {},
}
}
Expand Down
10 changes: 10 additions & 0 deletions zellij-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub(crate) enum ClientInstruction {
LogError(Vec<String>),
SwitchSession(ConnectToSession),
SetSynchronizedOutput(Option<SyncOutput>),
UnblockCliPipeInput(String), // String -> pipe name
CliPipeOutput(String, String), // String -> pipe name, String -> output
}

impl From<ServerToClientMsg> for ClientInstruction {
Expand All @@ -67,6 +69,12 @@ impl From<ServerToClientMsg> for ClientInstruction {
ServerToClientMsg::SwitchSession(connect_to_session) => {
ClientInstruction::SwitchSession(connect_to_session)
},
ServerToClientMsg::UnblockCliPipeInput(pipe_name) => {
ClientInstruction::UnblockCliPipeInput(pipe_name)
},
ServerToClientMsg::CliPipeOutput(pipe_name, output) => {
ClientInstruction::CliPipeOutput(pipe_name, output)
},
}
}
}
Expand All @@ -87,6 +95,8 @@ impl From<&ClientInstruction> for ClientContext {
ClientInstruction::DoneParsingStdinQuery => ClientContext::DoneParsingStdinQuery,
ClientInstruction::SwitchSession(..) => ClientContext::SwitchSession,
ClientInstruction::SetSynchronizedOutput(..) => ClientContext::SetSynchronisedOutput,
ClientInstruction::UnblockCliPipeInput(..) => ClientContext::UnblockCliPipeInput,
ClientInstruction::CliPipeOutput(..) => ClientContext::CliPipeOutput,
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions zellij-client/src/os_input_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ pub trait ClientOsApi: Send + Sync {
fn unset_raw_mode(&self, fd: RawFd) -> Result<(), nix::Error>;
/// Returns the writer that allows writing to standard output.
fn get_stdout_writer(&self) -> Box<dyn io::Write>;
fn get_stdin_reader(&self) -> Box<dyn io::Read>;
/// Returns a BufReader that allows to read from STDIN line by line, also locks STDIN
fn get_stdin_reader(&self) -> Box<dyn io::BufRead>;
fn update_session_name(&mut self, new_session_name: String);
/// Returns the raw contents of standard input.
fn read_from_stdin(&mut self) -> Result<Vec<u8>, &'static str>;
Expand Down Expand Up @@ -186,9 +187,10 @@ impl ClientOsApi for ClientOsInputOutput {
let stdout = ::std::io::stdout();
Box::new(stdout)
}
fn get_stdin_reader(&self) -> Box<dyn io::Read> {

fn get_stdin_reader(&self) -> Box<dyn io::BufRead> {
let stdin = ::std::io::stdin();
Box::new(stdin)
Box::new(stdin.lock())
}

fn send_to_server(&self, msg: ClientToServerMsg) {
Expand Down
4 changes: 2 additions & 2 deletions zellij-client/src/unit/stdin_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,10 @@ impl ClientOsApi for FakeClientOsApi {
let fake_stdout_writer = FakeStdoutWriter::new(self.stdout_buffer.clone());
Box::new(fake_stdout_writer)
}
fn get_stdin_reader(&self) -> Box<dyn io::Read> {
fn get_stdin_reader(&self) -> Box<dyn io::BufRead> {
unimplemented!()
}
fn update_session_name(&mut self, new_session_name: String) {}
fn update_session_name(&mut self, _new_session_name: String) {}
fn read_from_stdin(&mut self) -> Result<Vec<u8>, &'static str> {
Ok(self.stdin_buffer.drain(..).collect())
}
Expand Down
Loading

0 comments on commit d780bd9

Please sign in to comment.