Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add/Remove label to/from pool #850

Merged
merged 5 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions control-plane/agents/src/bin/core/node/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ impl SpecOperationsHelper for NodeSpec {
let (existing, conflict) = self.label_collisions(labels);
if !*overwrite && !existing.is_empty() {
Err(SvcError::LabelsExists {
node_id: self.id().to_string(),
resource: ResourceKind::Node,
id: self.id().to_string(),
labels: format!("{existing:?}"),
conflict,
})
Expand All @@ -209,7 +210,8 @@ impl SpecOperationsHelper for NodeSpec {
// Check that the label is present.
if !self.has_labels_key(label_key) {
Err(SvcError::LabelNotFound {
node_id: self.id().to_string(),
resource: ResourceKind::Node,
id: self.id().to_string(),
label_key: label_key.to_string(),
})
} else {
Expand Down
53 changes: 50 additions & 3 deletions control-plane/agents/src/bin/core/pool/pool_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ use crate::controller::{
io_engine::PoolApi,
registry::Registry,
resources::{
operations::ResourceLifecycle,
operations::{ResourceLabel, ResourceLifecycle},
operations_helper::{GuardedOperationsHelper, OnCreateFail, OperationSequenceGuard},
OperationGuardArc,
},
};
use agents::errors::{SvcError, SvcError::CordonedNode};
use std::collections::HashMap;
use stor_port::{
transport_api::ResourceKind,
types::v0::{
store::pool::PoolSpec,
store::pool::{PoolOperation, PoolSpec},
transport::{CreatePool, CtrlPoolState, DestroyPool, Pool},
},
};
Expand Down Expand Up @@ -67,7 +68,7 @@ impl ResourceLifecycle for OperationGuardArc<PoolSpec> {

let state = pool.complete_create(result, registry, on_fail).await?;
let spec = pool.lock().clone();
Ok(Pool::new(spec, CtrlPoolState::new(state)))
Ok(Pool::new(spec, Some(CtrlPoolState::new(state))))
}

async fn destroy(
Expand Down Expand Up @@ -135,3 +136,49 @@ impl ResourceLifecycle for Option<OperationGuardArc<PoolSpec>> {
}
}
}

/// Resource Label Operations.
#[async_trait::async_trait]
impl ResourceLabel for OperationGuardArc<PoolSpec> {
type LabelOutput = PoolSpec;
type UnlabelOutput = PoolSpec;

/// Label a node via operation guard functions.
async fn label(
&mut self,
registry: &Registry,
labels: HashMap<String, String>,
overwrite: bool,
) -> Result<Self::LabelOutput, SvcError> {
let cloned_pool_spec = self.lock().clone();
let spec_clone = self
.start_update(
registry,
&cloned_pool_spec,
PoolOperation::Label((labels, overwrite).into()),
)
.await?;

self.complete_update(registry, Ok(()), spec_clone).await?;
Ok(self.as_ref().clone())
}

/// Unlabel a node via operation guard functions.
async fn unlabel(
&mut self,
registry: &Registry,
label_key: String,
) -> Result<Self::UnlabelOutput, SvcError> {
let cloned_pool_spec = self.lock().clone();
let spec_clone = self
.start_update(
registry,
&cloned_pool_spec,
PoolOperation::Unlabel(label_key.into()),
)
.await?;

self.complete_update(registry, Ok(()), spec_clone).await?;
Ok(self.as_ref().clone())
}
}
63 changes: 59 additions & 4 deletions control-plane/agents/src/bin/core/pool/service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::controller::{
registry::Registry,
resources::{
operations::{ResourceLifecycle, ResourceResize, ResourceSharing},
operations::{ResourceLabel, ResourceLifecycle, ResourceResize, ResourceSharing},
operations_helper::{OperationSequenceGuard, ResourceSpecsLocked},
OperationGuardArc, ResourceMutex,
},
Expand All @@ -11,7 +11,9 @@ use agents::errors::{PoolNotFound, ReplicaNotFound, SvcError};
use grpc::{
context::Context,
operations::{
pool::traits::{CreatePoolInfo, DestroyPoolInfo, PoolOperations},
pool::traits::{
CreatePoolInfo, DestroyPoolInfo, LabelPoolInfo, PoolOperations, UnlabelPoolInfo,
},
replica::traits::{
CreateReplicaInfo, DestroyReplicaInfo, ReplicaOperations, ResizeReplicaInfo,
ShareReplicaInfo, UnshareReplicaInfo,
Expand All @@ -21,13 +23,14 @@ use grpc::{
use stor_port::{
transport_api::{
v0::{Pools, Replicas},
ReplyError,
ReplyError, ResourceKind,
},
types::v0::{
store::{pool::PoolSpec, replica::ReplicaSpec},
transport::{
CreatePool, CreateReplica, DestroyPool, DestroyReplica, Filter, GetPools, GetReplicas,
NodeId, Pool, PoolId, Replica, ResizeReplica, ShareReplica, UnshareReplica, VolumeId,
LabelPool, NodeId, Pool, PoolId, Replica, ResizeReplica, ShareReplica, UnlabelPool,
UnshareReplica, VolumeId,
},
},
};
Expand Down Expand Up @@ -68,6 +71,36 @@ impl PoolOperations for Service {
let pools = self.get_pools(&req).await?;
Ok(pools)
}

async fn label(
&self,
pool: &dyn LabelPoolInfo,
_ctx: Option<Context>,
) -> Result<Pool, ReplyError> {
let req = pool.into();
let service = self.clone();
let pool = Context::spawn(async move { service.label(&req).await }).await??;
Ok(pool)
}

/// Remove the specified label key from the pool.
async fn unlabel(
&self,
pool: &dyn UnlabelPoolInfo,
_ctx: Option<Context>,
) -> Result<Pool, ReplyError> {
let req: UnlabelPool = pool.into();
let service = self.clone();
if req.label_key().is_empty() {
return Err(SvcError::InvalidLabel {
labels: req.label_key(),
resource_kind: ResourceKind::Pool,
}
.into());
}
let pool = Context::spawn(async move { service.unlabel(&req).await }).await??;
Ok(pool)
}
}

#[tonic::async_trait]
Expand Down Expand Up @@ -334,4 +367,26 @@ impl Service {
let mut replica = self.specs().replica(&request.uuid).await?;
replica.resize(&self.registry, request).await
}

/// Label the specified pool.
sinhaashish marked this conversation as resolved.
Show resolved Hide resolved
#[tracing::instrument(level = "info", skip(self), err, fields(pool.id = %request.pool_id))]
async fn label(&self, request: &LabelPool) -> Result<Pool, SvcError> {
let mut guarded_pool = self.specs().guarded_pool(&request.pool_id).await?;
let spec = guarded_pool
.label(&self.registry, request.labels(), request.overwrite())
.await?;
let state = self.registry.ctrl_pool_state(&request.pool_id).await.ok();
Ok(Pool::new(spec, state))
}

/// Remove the specified label from the specified pool.
sinhaashish marked this conversation as resolved.
Show resolved Hide resolved
#[tracing::instrument(level = "info", skip(self), err, fields(pool.id = %request.pool_id))]
async fn unlabel(&self, request: &UnlabelPool) -> Result<Pool, SvcError> {
let mut guarded_pool = self.specs().guarded_pool(&request.pool_id).await?;
let spec = guarded_pool
.unlabel(&self.registry, request.label_key())
.await?;
let state = self.registry.ctrl_pool_state(&request.pool_id).await.ok();
Ok(Pool::new(spec, state))
}
}
65 changes: 59 additions & 6 deletions control-plane/agents/src/bin/core/pool/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ use stor_port::{
transport_api::ResourceKind,
types::v0::{
store::{
pool::{PoolOperation, PoolSpec},
pool::{PoolLabelOp, PoolOperation, PoolSpec, PoolUnLabelOp},
replica::{ReplicaOperation, ReplicaSpec},
SpecStatus, SpecTransaction,
},
transport::{
CreatePool, CreateReplica, NodeId, PoolId, PoolState, PoolStatus, Replica, ReplicaId,
CreatePool, CreateReplica, NodeId, PoolId, PoolStatus, Replica, ReplicaId,
ReplicaOwners, ReplicaStatus,
},
},
Expand All @@ -29,8 +29,8 @@ impl GuardedOperationsHelper for OperationGuardArc<PoolSpec> {
type Create = CreatePool;
type Owners = ();
type Status = PoolStatus;
type State = PoolState;
type UpdateOp = ();
type State = PoolSpec;
type UpdateOp = PoolOperation;
type Inner = PoolSpec;

fn validate_destroy(&self, registry: &Registry) -> Result<(), SvcError> {
Expand Down Expand Up @@ -59,15 +59,55 @@ impl SpecOperationsHelper for PoolSpec {
type Create = CreatePool;
type Owners = ();
type Status = PoolStatus;
type State = PoolState;
type UpdateOp = ();
type State = PoolSpec;
type UpdateOp = PoolOperation;

fn start_create_op(&mut self, _request: &Self::Create) {
self.start_op(PoolOperation::Create);
}
fn start_destroy_op(&mut self) {
self.start_op(PoolOperation::Destroy);
}
async fn start_update_op(
&mut self,
_: &Registry,
_state: &Self::State,
op: Self::UpdateOp,
) -> Result<(), SvcError> {
match &op {
PoolOperation::Label(PoolLabelOp { labels, overwrite }) => {
let (existing, conflict) = self.label_collisions(labels);
if !*overwrite && !existing.is_empty() {
Err(SvcError::LabelsExists {
resource: ResourceKind::Pool,
id: self.id().to_string(),
labels: format!("{existing:?}"),
conflict,
})
} else {
self.start_op(op);
Ok(())
}
}
PoolOperation::Unlabel(PoolUnLabelOp { label_key }) => {
// Check that the label is present.
if !self.has_labels_key(label_key) {
Err(SvcError::LabelNotFound {
resource: ResourceKind::Pool,
id: self.id().to_string(),
label_key: label_key.to_string(),
})
} else {
self.start_op(op);
Ok(())
}
}
_ => {
self.start_op(op);
Ok(())
}
}
}

fn dirty(&self) -> bool {
// The pool spec can be dirty if a pool create operation fails to complete because it cannot
Expand Down Expand Up @@ -411,4 +451,17 @@ impl ResourceSpecsLocked {
}
pending_ops
}

/// Get guarded pool spec by its `PoolId`.
pub async fn guarded_pool(
&self,
pool_id: &PoolId,
) -> Result<OperationGuardArc<PoolSpec>, SvcError> {
match self.pool_rsc(pool_id) {
Some(pool) => pool.operation_guard_wait().await,
None => Err(PoolNotFound {
pool_id: pool_id.to_owned(),
}),
}
}
}
23 changes: 15 additions & 8 deletions control-plane/agents/src/common/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,19 @@ pub enum SvcError {
NoNodes {},
#[snafu(display("Node {} is cordoned", node_id))]
CordonedNode { node_id: String },
#[snafu(display("Node {node_id} is already labelled with labels '{labels}'"))]
#[snafu(display("{resource} {id} is already labelled with labels '{labels}'"))]
LabelsExists {
node_id: String,
resource: ResourceKind,
id: String,
labels: String,
conflict: bool,
},
#[snafu(display("Node {node_id} doesn't have the label key '{label_key}'"))]
LabelNotFound { node_id: String, label_key: String },
#[snafu(display("{resource} {id} doesn't have the label key '{label_key}'"))]
LabelNotFound {
resource: ResourceKind,
id: String,
label_key: String,
},
#[snafu(display("Node {node_id} is already cordoned with label '{label}'"))]
CordonLabel { node_id: String, label: String },
#[snafu(display("Node {node_id} does not have a cordon label '{label}'"))]
Expand Down Expand Up @@ -598,20 +603,22 @@ impl From<SvcError> for ReplyError {
extra,
},

SvcError::LabelsExists { conflict, .. } => ReplyError {
SvcError::LabelsExists {
resource, conflict, ..
} => ReplyError {
kind: if conflict {
ReplyErrorKind::FailedPrecondition
} else {
ReplyErrorKind::AlreadyExists
},
resource: ResourceKind::Node,
resource,
source,
extra,
},

SvcError::LabelNotFound { .. } => ReplyError {
SvcError::LabelNotFound { resource, .. } => ReplyError {
kind: ReplyErrorKind::FailedPrecondition,
resource: ResourceKind::Node,
resource,
source,
extra,
},
Expand Down
35 changes: 35 additions & 0 deletions control-plane/grpc/proto/v1/pool/pool.proto
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,44 @@ message GetPoolsReply {
}
}

// Label Pool request
message LabelPoolRequest {
// Pool identification
string pool_id = 1;
// Pool label map
map<string, string> labels = 2;
// Overwrite an existing key
bool overwrite = 3;
}

// Reply type for a LabelPool request
message LabelPoolReply {
oneof reply {
Pool pool = 1;
common.ReplyError error = 2;
}
}

message UnlabelPoolRequest {
// Pool identification
string pool_id = 1;
// Pool label key to remove
string label_key = 2;
}

message UnlabelPoolReply {
oneof reply {
Pool pool = 1;
common.ReplyError error = 2;
}
}


// Service for managing storage pools
service PoolGrpc {
rpc CreatePool (CreatePoolRequest) returns (CreatePoolReply) {}
rpc DestroyPool (DestroyPoolRequest) returns (DestroyPoolReply) {}
rpc GetPools (GetPoolsRequest) returns (GetPoolsReply) {}
rpc LabelPool (LabelPoolRequest) returns (LabelPoolReply) {}
rpc UnlabelPool (UnlabelPoolRequest) returns (UnlabelPoolReply) {}
}
Loading