Skip to content

Commit

Permalink
refactor(ctrlp): add core agent
Browse files Browse the repository at this point in the history
Refactor the node,pool,volume agent into a single core agent.
Tidy up the Errors throughout the control plane stack allowing us to
 get more concise error kinds at the rest layer and also at each agent
 endpoint, which will allow us to expose the correct http responses
 at the openapi spec, which will be done as another PR.
  • Loading branch information
tiagolobocastro committed Mar 4, 2021
1 parent d9cebab commit 7e8bac8
Show file tree
Hide file tree
Showing 49 changed files with 3,164 additions and 2,927 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 3 additions & 15 deletions control-plane/agents/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,13 @@ authors = ["Tiago Castro <[email protected]>"]
edition = "2018"

[[bin]]
name = "kiiss"
path = "kiiss/src/server.rs"

[[bin]]
name = "node"
path = "node/src/server.rs"

[[bin]]
name = "pool"
path = "pool/src/server.rs"

[[bin]]
name = "volume"
path = "volume/src/server.rs"
name = "core"
path = "core/src/server.rs"

[[bin]]
name = "jsongrpc"
path = "jsongrpc/src/server.rs"


[lib]
name = "common"
path = "common/src/lib.rs"
Expand All @@ -50,6 +37,7 @@ tracing-futures = "0.2.4"
rpc = { path = "../../rpc" }
url = "2.2.0"
http = "0.2.1"
paste = "1.0.4"

[dev-dependencies]
composer = { path = "../../composer" }
Expand Down
300 changes: 300 additions & 0 deletions control-plane/agents/common/src/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
use mbus_api::{
message_bus::v0::BusError,
v0::*,
ErrorChain,
ReplyError,
ReplyErrorKind,
ResourceKind,
};
use snafu::{Error, Snafu};
use tonic::Code;

/// Common error type for send/receive
#[derive(Debug, Snafu)]
#[snafu(visibility = "pub")]
#[allow(missing_docs)]
pub enum SvcError {
#[snafu(display("Failed to get node '{}' from the node agent", node))]
BusGetNode { node: String, source: BusError },
#[snafu(display("Failed to get nodes from the node agent"))]
BusGetNodes { source: BusError },
#[snafu(display("Node '{}' is not online", node))]
NodeNotOnline { node: NodeId },
#[snafu(display(
"Timed out after '{:?}' attempting to connect to node '{}' via gRPC endpoint '{}'",
timeout,
node_id,
endpoint
))]
GrpcConnectTimeout {
node_id: String,
endpoint: String,
timeout: std::time::Duration,
},
#[snafu(display("Failed to connect to node via gRPC"))]
GrpcConnect { source: tonic::transport::Error },
#[snafu(display("Node '{}' has invalid gRPC URI '{}'", node_id, uri))]
GrpcConnectUri {
node_id: String,
uri: String,
source: http::uri::InvalidUri,
},
#[snafu(display(
"gRPC request '{}' for '{}' failed with '{}'",
request,
resource.to_string(),
source
))]
GrpcRequestError {
resource: ResourceKind,
request: String,
source: tonic::Status,
},
#[snafu(display("Node '{}' not found", node_id))]
NodeNotFound { node_id: NodeId },
#[snafu(display("Pool '{}' not found", pool_id))]
PoolNotFound { pool_id: PoolId },
#[snafu(display("Nexus '{}' not found", nexus_id))]
NexusNotFound { nexus_id: String },
#[snafu(display("Replica '{}' not found", replica_id))]
ReplicaNotFound { replica_id: ReplicaId },
#[snafu(display("Invalid filter value: {:?}", filter))]
InvalidFilter { filter: Filter },
#[snafu(display("Operation failed due to insufficient resources"))]
NotEnoughResources { source: NotEnough },
#[snafu(display("Failed to deserialise JsonRpc response"))]
JsonRpcDeserialise { source: serde_json::Error },
#[snafu(display(
"Json RPC call failed for method '{}' with parameters '{}'. Error {}",
method,
params,
error,
))]
JsonRpc {
method: String,
params: String,
error: String,
},
#[snafu(display("Internal error: {}", details))]
Internal { details: String },
#[snafu(display("Message Bus error"))]
MBusError { source: mbus_api::Error },
#[snafu(display("Invalid Arguments"))]
InvalidArguments {},
}

impl From<mbus_api::Error> for SvcError {
fn from(source: mbus_api::Error) -> Self {
Self::MBusError {
source,
}
}
}

impl From<NotEnough> for SvcError {
fn from(source: NotEnough) -> Self {
Self::NotEnoughResources {
source,
}
}
}

impl From<SvcError> for ReplyError {
fn from(error: SvcError) -> Self {
#[allow(deprecated)]
let desc: &String = &error.description().to_string();
match error {
SvcError::BusGetNode {
source, ..
} => source,
SvcError::BusGetNodes {
source,
} => source,
SvcError::GrpcRequestError {
source,
request,
resource,
} => grpc_to_reply_error(SvcError::GrpcRequestError {
source,
request,
resource,
}),

SvcError::InvalidArguments {
..
} => ReplyError {
kind: ReplyErrorKind::InvalidArgument,
resource: ResourceKind::Unknown,
source: desc.to_string(),
extra: error.full_string(),
},

SvcError::NodeNotOnline {
..
} => ReplyError {
kind: ReplyErrorKind::FailedPrecondition,
resource: ResourceKind::Node,
source: desc.to_string(),
extra: error.full_string(),
},

SvcError::GrpcConnectTimeout {
..
} => ReplyError {
kind: ReplyErrorKind::Timeout,
resource: ResourceKind::Unknown,
source: desc.to_string(),
extra: error.full_string(),
},

SvcError::GrpcConnectUri {
..
} => ReplyError {
kind: ReplyErrorKind::Internal,
resource: ResourceKind::Unknown,
source: desc.to_string(),
extra: error.full_string(),
},

SvcError::GrpcConnect {
source,
} => ReplyError {
kind: ReplyErrorKind::Internal,
resource: ResourceKind::Unknown,
source: desc.to_string(),
extra: source.to_string(),
},

SvcError::NotEnoughResources {
..
} => ReplyError {
kind: ReplyErrorKind::ResourceExhausted,
resource: ResourceKind::Unknown,
source: desc.to_string(),
extra: error.full_string(),
},
SvcError::JsonRpcDeserialise {
..
} => ReplyError {
kind: ReplyErrorKind::Internal,
resource: ResourceKind::JsonGrpc,
source: desc.to_string(),
extra: error.full_string(),
},
SvcError::JsonRpc {
..
} => ReplyError {
kind: ReplyErrorKind::Internal,
resource: ResourceKind::JsonGrpc,
source: desc.to_string(),
extra: error.full_string(),
},
SvcError::NodeNotFound {
..
} => ReplyError {
kind: ReplyErrorKind::NotFound,
resource: ResourceKind::Node,
source: desc.to_string(),
extra: error.full_string(),
},
SvcError::PoolNotFound {
..
} => ReplyError {
kind: ReplyErrorKind::NotFound,
resource: ResourceKind::Pool,
source: desc.to_string(),
extra: error.full_string(),
},
SvcError::ReplicaNotFound {
..
} => ReplyError {
kind: ReplyErrorKind::NotFound,
resource: ResourceKind::Replica,
source: desc.to_string(),
extra: error.full_string(),
},
SvcError::NexusNotFound {
..
} => ReplyError {
kind: ReplyErrorKind::NotFound,
resource: ResourceKind::Nexus,
source: desc.to_string(),
extra: error.full_string(),
},
SvcError::InvalidFilter {
..
} => ReplyError {
kind: ReplyErrorKind::Internal,
resource: ResourceKind::Unknown,
source: desc.to_string(),
extra: error.full_string(),
},
SvcError::Internal {
..
} => ReplyError {
kind: ReplyErrorKind::Internal,
resource: ResourceKind::Unknown,
source: desc.to_string(),
extra: error.full_string(),
},
SvcError::MBusError {
source,
} => source.into(),
}
}
}

fn grpc_to_reply_error(error: SvcError) -> ReplyError {
match error {
SvcError::GrpcRequestError {
source,
request,
resource,
} => {
let kind = match source.code() {
Code::Ok => ReplyErrorKind::Internal,
Code::Cancelled => ReplyErrorKind::Internal,
Code::Unknown => ReplyErrorKind::Internal,
Code::InvalidArgument => ReplyErrorKind::InvalidArgument,
Code::DeadlineExceeded => ReplyErrorKind::DeadlineExceeded,
Code::NotFound => ReplyErrorKind::NotFound,
Code::AlreadyExists => ReplyErrorKind::AlreadyExists,
Code::PermissionDenied => ReplyErrorKind::PermissionDenied,
Code::ResourceExhausted => ReplyErrorKind::ResourceExhausted,
Code::FailedPrecondition => ReplyErrorKind::FailedPrecondition,
Code::Aborted => ReplyErrorKind::Aborted,
Code::OutOfRange => ReplyErrorKind::OutOfRange,
Code::Unimplemented => ReplyErrorKind::Unimplemented,
Code::Internal => ReplyErrorKind::Internal,
Code::Unavailable => ReplyErrorKind::Unavailable,
Code::DataLoss => ReplyErrorKind::Internal,
Code::Unauthenticated => ReplyErrorKind::Unauthenticated,
Code::__NonExhaustive => ReplyErrorKind::Internal,
};
let extra = format!("{}::{}", request, source.to_string());
ReplyError {
kind,
resource,
source: "SvcError::GrpcRequestError".to_string(),
extra,
}
}
_ => unreachable!("Expected a GrpcRequestError!"),
}
}

/// Not enough resources available
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
pub enum NotEnough {
#[snafu(display(
"Not enough suitable pools available, {}/{}",
have,
need
))]
OfPools { have: u64, need: u64 },
#[snafu(display("Not enough replicas available, {}/{}", have, need))]
OfReplicas { have: u64, need: u64 },
#[snafu(display("Not enough nexuses available, {}/{}", have, need))]
OfNexuses { have: u64, need: u64 },
}
Loading

0 comments on commit 7e8bac8

Please sign in to comment.