Skip to content

Commit

Permalink
Merge pull request #857 from andrewdavidmackenzie/flow-850
Browse files Browse the repository at this point in the history
Handle errors in the Server submission_loop, exit gracefully letting client know. Fixes #850
  • Loading branch information
andrewdavidmackenzie authored Apr 4, 2021
2 parents 042829c + e2000db commit ff624a1
Show file tree
Hide file tree
Showing 10 changed files with 249 additions and 173 deletions.
25 changes: 5 additions & 20 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ endif

################### Doc ####################
.PHONY: docs
docs: book code-docs trim-docs
docs: build-flowc book code-docs trim-docs

.PHONY: book
book: target/html/index.html
Expand Down Expand Up @@ -283,24 +283,9 @@ copy:
################# Clean ################
.PHONY: clean
clean:
@$(MAKE) clean-dumps clean-svgs clean-guide
@cargo clean

.PHONY: clean-dumps
clean-dumps:
$(STIME)
@find . -name \*.dump -type f -exec rm -rf {} + ; true
@find . -name \*.dot -type f -exec rm -rf {} + ; true
$(ETIME)

.PHONY: clean-svgs
clean-svgs:
$(STIME)
@find . -name \*.dot.svg -type f -exec rm -rf {} + ; true
$(ETIME)

.PHONY: clean-guide
clean-guide:
$(STIME)
@find . -name \*.dot -type f -exec rm -rf {} + ; true
@rm -rf target/html
$(ETIME)
@find . -name \*.dump -type f -exec rm -rf {} + ; true
@find . -name \*.dot -type f -exec rm -rf {} + ; true
@cargo clean
2 changes: 1 addition & 1 deletion SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@

- [Developing flow](docs/developing/overview.md)
- [Pre-requisites](docs/developing/prereqs.md)
- [flow_impl](flow_impl/README.md)
- [flowcore](flowcore/README.md)
- [flow_impl_derive](flow_impl_derive/README.md)
- [flowc](flowc/README.md)
- [Test flows](flowc/tests/test-flows/README.md)
Expand Down
3 changes: 1 addition & 2 deletions docs/introduction/structure.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Here is a summary of the project components, their purpose and a link to their `README.md`:

* [flow_impl](../../flow_impl/README.md) - Definition of trait that functions must implement
* [flowcore](../../flowcore/README.md) - A set of core structs and traits used by `flowr` and `flowc`
* [flow_impl_derive](../../flow_impl_derive/README.md) - A derive macro used to help functions be compiled natively
and to wasm
* [flowc](../../flowc/README.md) - The `flowc` flow compiler binary is a CLI built around the 'flowclib' and that
Expand All @@ -14,7 +14,6 @@ Here is a summary of the project components, their purpose and a link to their `
* [flowrlib](../../flowr/src/lib/README.md) - The flow runner library that loads and executes compiled flows.
* [flowruntime](../../flowr/src/lib/flowruntime/README.md) - A set of core functions provided by any flow runtime
for all flows to interact with the environment and perform IO
* [flowcore](../../flowcore/README.md) - A set of core structs used by `flowr` and `flowc`
* [flowstdlib](../../flowstdlib/README.md) - the flow "standard library" which contains a set of functions that can be
used by flows being defined by the user
* [provider](../../provider/README.md) - Library used to fetch content from file/http and find and fetch library
Expand Down
8 changes: 4 additions & 4 deletions flowcore/README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
# `flowcore`

See also: [Code docs](http://andrewdavidmackenzie.github.io/flow/code/doc/flowcore/index.html)
[comment]: <> (See also: [Code docs]&#40;http://andrewdavidmackenzie.github.io/flow/code/doc/flowcore/index.html&#41;)

`flowcore` is a library of core structs and traits related to flow that is shared between multiple flow
crates, and separate to avoid a cyclic dependency.

# `flow_impl`

See also: [Code docs](http://andrewdavidmackenzie.github.io/flow/code/doc/flow_impl/index.html)

This is a trait that implementations of flow 'functions' must implement in order for them to be invoked
by the flowrlib (or other) run-time library.

An example of a function implementing the `Implementation` trait can be found in the
docs for [`Implementation`](http://andrewdavidmackenzie.github.io/flow/code/doc/flowcore/trait.Implementation.html)
docs

[comment]: <> (for [`Implementation`]&#40;http://andrewdavidmackenzie.github.io/flow/code/doc/flowcore/trait.Implementation.html&#41;)
139 changes: 78 additions & 61 deletions flowr/src/cli_runtime_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use log::{debug, error, info, trace};
use flowrlib::client_server::RuntimeClientConnection;
use flowrlib::coordinator::Submission;
use flowrlib::errors::*;
use flowrlib::runtime::{Event, Response};
use flowrlib::runtime::Response::ClientSubmission;
use flowrlib::runtime::{Event, Response};

#[derive(Debug, Clone)]
pub struct CLIRuntimeClient {
Expand All @@ -22,10 +22,7 @@ pub struct CLIRuntimeClient {
}

impl CLIRuntimeClient {
fn new(args: Vec<String>,
#[cfg(feature = "metrics")]
display_metrics: bool
) -> Self {
fn new(args: Vec<String>, #[cfg(feature = "metrics")] display_metrics: bool) -> Self {
CLIRuntimeClient {
args,
image_buffers: HashMap::<String, ImageBuffer<Rgb<u8>, Vec<u8>>>::new(),
Expand All @@ -35,32 +32,34 @@ impl CLIRuntimeClient {
}

/*
Enter a loop where we receive events as a client and respond to them
*/
pub fn start(mut connection: RuntimeClientConnection,
submission: Submission,
flow_args: Vec<String>,
#[cfg(feature = "metrics")]
display_metrics: bool,
Enter a loop where we receive events as a client and respond to them
*/
pub fn start(
mut connection: RuntimeClientConnection,
submission: Submission,
flow_args: Vec<String>,
#[cfg(feature = "metrics")] display_metrics: bool,
) -> Result<()> {
connection.start()?;
trace!("Connection from Runtime client to Runtime server started");

debug!("Runtime client sending submission to server");
debug!("Client sending submission to server");
connection.client_send(ClientSubmission(submission))?;

let mut runtime_client = CLIRuntimeClient::new(flow_args,
#[cfg(feature = "metrics")]
display_metrics
let mut runtime_client = CLIRuntimeClient::new(
flow_args,
#[cfg(feature = "metrics")]
display_metrics,
);

loop {
debug!("Runtime client waiting for message from server");
debug!("Client waiting for message from server");
match connection.client_recv() {
Ok(event) => {
trace!("Runtime client received event from server: {:?}", event);
let response = runtime_client.process_event(event);
if response == Response::ClientExiting {
debug!("Server is exiting, so client will exit also");
return Ok(());
}

Expand Down Expand Up @@ -88,24 +87,32 @@ impl CLIRuntimeClient {

for (filename, image_buffer) in self.image_buffers.drain() {
info!("Flushing ImageBuffer to file: {}", filename);
if let Err(e) = image_buffer.save_with_format(Path::new(&filename), ImageFormat::Png) {
if let Err(e) =
image_buffer.save_with_format(Path::new(&filename), ImageFormat::Png)
{
error!("Error saving ImageBuffer '{}': '{}'", filename, e);
}
}
Response::ClientExiting
}
Event::ServerExiting => {
debug!("Server is exiting");
Response::ClientExiting
}
#[cfg(not(feature = "metrics"))]
Event::FlowEnd => {
debug!("=========================== Flow execution ended ======================================");
for (filename, image_buffer) in self.image_buffers.drain() {
info!("Flushing ImageBuffer to file: {}", filename);
if let Err(e) = image_buffer.save_with_format(Path::new(&filename), ImageFormat::Png) {
if let Err(e) =
image_buffer.save_with_format(Path::new(&filename), ImageFormat::Png)
{
error!("Error saving ImageBuffer '{}': '{}'", filename, e);
}
}
Response::ClientExiting
}
Event::StdoutEOF => Response::Ack,
Event::StdoutEof => Response::Ack,
Event::Stdout(contents) => {
println!("{}", contents);
Response::Ack
Expand All @@ -132,39 +139,38 @@ impl CLIRuntimeClient {
match io::stdin().read_line(&mut input) {
Ok(n) if n > 0 => Response::Line(input.trim().to_string()),
Ok(n) if n == 0 => Response::GetLineEOF,
_ => Response::Error("Could not read Readline".into())
_ => Response::Error("Could not read Readline".into()),
}
}
Event::Write(filename, bytes) => {
match File::create(&filename) {
Ok(mut file) => {
match file.write_all(bytes.as_slice()) {
Ok(_) => Response::Ack,
Err(e) => {
let msg = format!("Error writing to file: '{}': '{}'", filename, e);
error!("{}", msg);
Response::Error(msg)
}
}
}
Event::Write(filename, bytes) => match File::create(&filename) {
Ok(mut file) => match file.write_all(bytes.as_slice()) {
Ok(_) => Response::Ack,
Err(e) => {
let msg = format!("Error creating file: '{}': '{}'", filename, e);
let msg = format!("Error writing to file: '{}': '{}'", filename, e);
error!("{}", msg);
Response::Error(msg)
}
},
Err(e) => {
let msg = format!("Error creating file: '{}': '{}'", filename, e);
error!("{}", msg);
Response::Error(msg)
}
}
},
Event::PixelWrite((x, y), (r, g, b), (width, height), name) => {
let image = self.image_buffers.entry(name)
let image = self
.image_buffers
.entry(name)
.or_insert_with(|| RgbImage::new(width, height));
image.put_pixel(x, y, Rgb([r, g, b]));
Response::Ack
}
Event::GetArgs => { // Response gets serialized and sent over channel/network so needs to args be owned
Event::GetArgs => {
// Response gets serialized and sent over channel/network so needs to args be owned
Response::Args(self.args.clone())
}
Event::StderrEOF => Response::Ack,
Event::Invalid => Response::Ack
Event::StderrEof => Response::Ack,
Event::Invalid => Response::Ack,
}
}
}
Expand All @@ -183,14 +189,18 @@ mod test {

#[test]
fn test_arg_passing() {
let mut client = CLIRuntimeClient::new(vec!("file:///test_flow.toml".to_string(), "1".to_string()),
#[cfg(feature = "metrics")]
false
let mut client = CLIRuntimeClient::new(
vec!["file:///test_flow.toml".to_string(), "1".to_string()],
#[cfg(feature = "metrics")]
false,
);

match client.process_event(Event::GetArgs) {
Response::Args(args) => assert_eq!(vec!("file:///test_flow.toml".to_string(), "1".to_string()), args),
_ => panic!("Didn't get Args response as expected")
Response::Args(args) => assert_eq!(
vec!("file:///test_flow.toml".to_string(), "1".to_string()),
args
),
_ => panic!("Didn't get Args response as expected"),
}
}

Expand All @@ -199,22 +209,27 @@ mod test {
let temp = tempdir::TempDir::new("flow").unwrap().into_path();
let file = temp.join("test");

let mut client = CLIRuntimeClient::new(vec!("file:///test_flow.toml".to_string()),
#[cfg(feature = "metrics")]
false
let mut client = CLIRuntimeClient::new(
vec!["file:///test_flow.toml".to_string()],
#[cfg(feature = "metrics")]
false,
);

if client.process_event(Event::Write(file.to_str().unwrap().to_string(), b"Hello".to_vec()))
!= Response::Ack {
if client.process_event(Event::Write(
file.to_str().unwrap().to_string(),
b"Hello".to_vec(),
)) != Response::Ack
{
panic!("Didn't get Write response as expected")
}
}

#[test]
fn test_stdout() {
let mut client = CLIRuntimeClient::new(vec!("file:///test_flow.toml".to_string()),
#[cfg(feature = "metrics")]
false
let mut client = CLIRuntimeClient::new(
vec!["file:///test_flow.toml".to_string()],
#[cfg(feature = "metrics")]
false,
);
if client.process_event(Event::Stdout("Hello".into())) != Response::Ack {
panic!("Didn't get Stdout response as expected")
Expand All @@ -223,9 +238,10 @@ mod test {

#[test]
fn test_stderr() {
let mut client = CLIRuntimeClient::new(vec!("file:///test_flow.toml".to_string()),
#[cfg(feature = "metrics")]
false
let mut client = CLIRuntimeClient::new(
vec!["file:///test_flow.toml".to_string()],
#[cfg(feature = "metrics")]
false,
);
if client.process_event(Event::Stderr("Hello".into())) != Response::Ack {
panic!("Didn't get Stderr response as expected")
Expand All @@ -234,9 +250,10 @@ mod test {

#[test]
fn test_image_writing() {
let mut client = CLIRuntimeClient::new(vec!("file:///test_flow.toml".to_string()),
#[cfg(feature = "metrics")]
false
let mut client = CLIRuntimeClient::new(
vec!["file:///test_flow.toml".to_string()],
#[cfg(feature = "metrics")]
false,
);

let temp_dir = TempDir::new("flow").unwrap().into_path();
Expand All @@ -251,10 +268,10 @@ mod test {
panic!("Didn't get pixel write response as expected")
}
#[cfg(feature = "metrics")]
client.process_event(Event::FlowEnd(Metrics::new(1)));
client.process_event(Event::FlowEnd(Metrics::new(1)));
#[cfg(not(feature = "metrics"))]
client.process_event(Event::FlowEnd);
client.process_event(Event::FlowEnd);

assert!(path.exists(), "Image file was not created");
}
}
}
Loading

0 comments on commit ff624a1

Please sign in to comment.