Skip to content

Commit

Permalink
chore(bors): merge pull request #850
Browse files Browse the repository at this point in the history
850: Add/Remove label to/from pool r=sinhaashish a=sinhaashish



Co-authored-by: sinhaashish <[email protected]>
  • Loading branch information
mayastor-bors and sinhaashish committed Sep 6, 2024
1 parent 210a382 commit 5b86344
Show file tree
Hide file tree
Showing 25 changed files with 1,342 additions and 161 deletions.
6 changes: 4 additions & 2 deletions control-plane/agents/src/bin/core/node/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ impl SpecOperationsHelper for NodeSpec {
let (existing, conflict) = self.label_collisions(labels);
if !*overwrite && !existing.is_empty() {
Err(SvcError::LabelsExists {
node_id: self.id().to_string(),
resource: ResourceKind::Node,
id: self.id().to_string(),
labels: format!("{existing:?}"),
conflict,
})
Expand All @@ -209,7 +210,8 @@ impl SpecOperationsHelper for NodeSpec {
// Check that the label is present.
if !self.has_labels_key(label_key) {
Err(SvcError::LabelNotFound {
node_id: self.id().to_string(),
resource: ResourceKind::Node,
id: self.id().to_string(),
label_key: label_key.to_string(),
})
} else {
Expand Down
53 changes: 50 additions & 3 deletions control-plane/agents/src/bin/core/pool/pool_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ use crate::controller::{
io_engine::PoolApi,
registry::Registry,
resources::{
operations::ResourceLifecycle,
operations::{ResourceLabel, ResourceLifecycle},
operations_helper::{GuardedOperationsHelper, OnCreateFail, OperationSequenceGuard},
OperationGuardArc,
},
};
use agents::errors::{SvcError, SvcError::CordonedNode};
use std::collections::HashMap;
use stor_port::{
transport_api::ResourceKind,
types::v0::{
store::pool::PoolSpec,
store::pool::{PoolOperation, PoolSpec},
transport::{CreatePool, CtrlPoolState, DestroyPool, Pool},
},
};
Expand Down Expand Up @@ -67,7 +68,7 @@ impl ResourceLifecycle for OperationGuardArc<PoolSpec> {

let state = pool.complete_create(result, registry, on_fail).await?;
let spec = pool.lock().clone();
Ok(Pool::new(spec, CtrlPoolState::new(state)))
Ok(Pool::new(spec, Some(CtrlPoolState::new(state))))
}

async fn destroy(
Expand Down Expand Up @@ -135,3 +136,49 @@ impl ResourceLifecycle for Option<OperationGuardArc<PoolSpec>> {
}
}
}

/// Resource Label Operations.
#[async_trait::async_trait]
impl ResourceLabel for OperationGuardArc<PoolSpec> {
type LabelOutput = PoolSpec;
type UnlabelOutput = PoolSpec;

/// Label a node via operation guard functions.
async fn label(
&mut self,
registry: &Registry,
labels: HashMap<String, String>,
overwrite: bool,
) -> Result<Self::LabelOutput, SvcError> {
let cloned_pool_spec = self.lock().clone();
let spec_clone = self
.start_update(
registry,
&cloned_pool_spec,
PoolOperation::Label((labels, overwrite).into()),
)
.await?;

self.complete_update(registry, Ok(()), spec_clone).await?;
Ok(self.as_ref().clone())
}

/// Unlabel a node via operation guard functions.
async fn unlabel(
&mut self,
registry: &Registry,
label_key: String,
) -> Result<Self::UnlabelOutput, SvcError> {
let cloned_pool_spec = self.lock().clone();
let spec_clone = self
.start_update(
registry,
&cloned_pool_spec,
PoolOperation::Unlabel(label_key.into()),
)
.await?;

self.complete_update(registry, Ok(()), spec_clone).await?;
Ok(self.as_ref().clone())
}
}
63 changes: 59 additions & 4 deletions control-plane/agents/src/bin/core/pool/service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::controller::{
registry::Registry,
resources::{
operations::{ResourceLifecycle, ResourceResize, ResourceSharing},
operations::{ResourceLabel, ResourceLifecycle, ResourceResize, ResourceSharing},
operations_helper::{OperationSequenceGuard, ResourceSpecsLocked},
OperationGuardArc, ResourceMutex,
},
Expand All @@ -11,7 +11,9 @@ use agents::errors::{PoolNotFound, ReplicaNotFound, SvcError};
use grpc::{
context::Context,
operations::{
pool::traits::{CreatePoolInfo, DestroyPoolInfo, PoolOperations},
pool::traits::{
CreatePoolInfo, DestroyPoolInfo, LabelPoolInfo, PoolOperations, UnlabelPoolInfo,
},
replica::traits::{
CreateReplicaInfo, DestroyReplicaInfo, ReplicaOperations, ResizeReplicaInfo,
ShareReplicaInfo, UnshareReplicaInfo,
Expand All @@ -21,13 +23,14 @@ use grpc::{
use stor_port::{
transport_api::{
v0::{Pools, Replicas},
ReplyError,
ReplyError, ResourceKind,
},
types::v0::{
store::{pool::PoolSpec, replica::ReplicaSpec},
transport::{
CreatePool, CreateReplica, DestroyPool, DestroyReplica, Filter, GetPools, GetReplicas,
NodeId, Pool, PoolId, Replica, ResizeReplica, ShareReplica, UnshareReplica, VolumeId,
LabelPool, NodeId, Pool, PoolId, Replica, ResizeReplica, ShareReplica, UnlabelPool,
UnshareReplica, VolumeId,
},
},
};
Expand Down Expand Up @@ -68,6 +71,36 @@ impl PoolOperations for Service {
let pools = self.get_pools(&req).await?;
Ok(pools)
}

async fn label(
&self,
pool: &dyn LabelPoolInfo,
_ctx: Option<Context>,
) -> Result<Pool, ReplyError> {
let req = pool.into();
let service = self.clone();
let pool = Context::spawn(async move { service.label(&req).await }).await??;
Ok(pool)
}

/// Remove the specified label key from the pool.
async fn unlabel(
&self,
pool: &dyn UnlabelPoolInfo,
_ctx: Option<Context>,
) -> Result<Pool, ReplyError> {
let req: UnlabelPool = pool.into();
let service = self.clone();
if req.label_key().is_empty() {
return Err(SvcError::InvalidLabel {
labels: req.label_key(),
resource_kind: ResourceKind::Pool,
}
.into());
}
let pool = Context::spawn(async move { service.unlabel(&req).await }).await??;
Ok(pool)
}
}

#[tonic::async_trait]
Expand Down Expand Up @@ -334,4 +367,26 @@ impl Service {
let mut replica = self.specs().replica(&request.uuid).await?;
replica.resize(&self.registry, request).await
}

/// Label the specified pool.
#[tracing::instrument(level = "info", skip(self), err, fields(pool.id = %request.pool_id))]
async fn label(&self, request: &LabelPool) -> Result<Pool, SvcError> {
let mut guarded_pool = self.specs().guarded_pool(&request.pool_id).await?;
let spec = guarded_pool
.label(&self.registry, request.labels(), request.overwrite())
.await?;
let state = self.registry.ctrl_pool_state(&request.pool_id).await.ok();
Ok(Pool::new(spec, state))
}

/// Remove the specified label from the specified pool.
#[tracing::instrument(level = "info", skip(self), err, fields(pool.id = %request.pool_id))]
async fn unlabel(&self, request: &UnlabelPool) -> Result<Pool, SvcError> {
let mut guarded_pool = self.specs().guarded_pool(&request.pool_id).await?;
let spec = guarded_pool
.unlabel(&self.registry, request.label_key())
.await?;
let state = self.registry.ctrl_pool_state(&request.pool_id).await.ok();
Ok(Pool::new(spec, state))
}
}
65 changes: 59 additions & 6 deletions control-plane/agents/src/bin/core/pool/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ use stor_port::{
transport_api::ResourceKind,
types::v0::{
store::{
pool::{PoolOperation, PoolSpec},
pool::{PoolLabelOp, PoolOperation, PoolSpec, PoolUnLabelOp},
replica::{ReplicaOperation, ReplicaSpec},
SpecStatus, SpecTransaction,
},
transport::{
CreatePool, CreateReplica, NodeId, PoolId, PoolState, PoolStatus, Replica, ReplicaId,
CreatePool, CreateReplica, NodeId, PoolId, PoolStatus, Replica, ReplicaId,
ReplicaOwners, ReplicaStatus,
},
},
Expand All @@ -29,8 +29,8 @@ impl GuardedOperationsHelper for OperationGuardArc<PoolSpec> {
type Create = CreatePool;
type Owners = ();
type Status = PoolStatus;
type State = PoolState;
type UpdateOp = ();
type State = PoolSpec;
type UpdateOp = PoolOperation;
type Inner = PoolSpec;

fn validate_destroy(&self, registry: &Registry) -> Result<(), SvcError> {
Expand Down Expand Up @@ -59,15 +59,55 @@ impl SpecOperationsHelper for PoolSpec {
type Create = CreatePool;
type Owners = ();
type Status = PoolStatus;
type State = PoolState;
type UpdateOp = ();
type State = PoolSpec;
type UpdateOp = PoolOperation;

fn start_create_op(&mut self, _request: &Self::Create) {
self.start_op(PoolOperation::Create);
}
fn start_destroy_op(&mut self) {
self.start_op(PoolOperation::Destroy);
}
async fn start_update_op(
&mut self,
_: &Registry,
_state: &Self::State,
op: Self::UpdateOp,
) -> Result<(), SvcError> {
match &op {
PoolOperation::Label(PoolLabelOp { labels, overwrite }) => {
let (existing, conflict) = self.label_collisions(labels);
if !*overwrite && !existing.is_empty() {
Err(SvcError::LabelsExists {
resource: ResourceKind::Pool,
id: self.id().to_string(),
labels: format!("{existing:?}"),
conflict,
})
} else {
self.start_op(op);
Ok(())
}
}
PoolOperation::Unlabel(PoolUnLabelOp { label_key }) => {
// Check that the label is present.
if !self.has_labels_key(label_key) {
Err(SvcError::LabelNotFound {
resource: ResourceKind::Pool,
id: self.id().to_string(),
label_key: label_key.to_string(),
})
} else {
self.start_op(op);
Ok(())
}
}
_ => {
self.start_op(op);
Ok(())
}
}
}

fn dirty(&self) -> bool {
// The pool spec can be dirty if a pool create operation fails to complete because it cannot
Expand Down Expand Up @@ -411,4 +451,17 @@ impl ResourceSpecsLocked {
}
pending_ops
}

/// Get guarded pool spec by its `PoolId`.
pub async fn guarded_pool(
&self,
pool_id: &PoolId,
) -> Result<OperationGuardArc<PoolSpec>, SvcError> {
match self.pool_rsc(pool_id) {
Some(pool) => pool.operation_guard_wait().await,
None => Err(PoolNotFound {
pool_id: pool_id.to_owned(),
}),
}
}
}
23 changes: 15 additions & 8 deletions control-plane/agents/src/common/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,19 @@ pub enum SvcError {
NoNodes {},
#[snafu(display("Node {} is cordoned", node_id))]
CordonedNode { node_id: String },
#[snafu(display("Node {node_id} is already labelled with labels '{labels}'"))]
#[snafu(display("{resource} {id} is already labelled with labels '{labels}'"))]
LabelsExists {
node_id: String,
resource: ResourceKind,
id: String,
labels: String,
conflict: bool,
},
#[snafu(display("Node {node_id} doesn't have the label key '{label_key}'"))]
LabelNotFound { node_id: String, label_key: String },
#[snafu(display("{resource} {id} doesn't have the label key '{label_key}'"))]
LabelNotFound {
resource: ResourceKind,
id: String,
label_key: String,
},
#[snafu(display("Node {node_id} is already cordoned with label '{label}'"))]
CordonLabel { node_id: String, label: String },
#[snafu(display("Node {node_id} does not have a cordon label '{label}'"))]
Expand Down Expand Up @@ -598,20 +603,22 @@ impl From<SvcError> for ReplyError {
extra,
},

SvcError::LabelsExists { conflict, .. } => ReplyError {
SvcError::LabelsExists {
resource, conflict, ..
} => ReplyError {
kind: if conflict {
ReplyErrorKind::FailedPrecondition
} else {
ReplyErrorKind::AlreadyExists
},
resource: ResourceKind::Node,
resource,
source,
extra,
},

SvcError::LabelNotFound { .. } => ReplyError {
SvcError::LabelNotFound { resource, .. } => ReplyError {
kind: ReplyErrorKind::FailedPrecondition,
resource: ResourceKind::Node,
resource,
source,
extra,
},
Expand Down
35 changes: 35 additions & 0 deletions control-plane/grpc/proto/v1/pool/pool.proto
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,44 @@ message GetPoolsReply {
}
}

// Label Pool request
message LabelPoolRequest {
// Pool identification
string pool_id = 1;
// Pool label map
map<string, string> labels = 2;
// Overwrite an existing key
bool overwrite = 3;
}

// Reply type for a LabelPool request
message LabelPoolReply {
oneof reply {
Pool pool = 1;
common.ReplyError error = 2;
}
}

message UnlabelPoolRequest {
// Pool identification
string pool_id = 1;
// Pool label key to remove
string label_key = 2;
}

message UnlabelPoolReply {
oneof reply {
Pool pool = 1;
common.ReplyError error = 2;
}
}


// Service for managing storage pools
service PoolGrpc {
rpc CreatePool (CreatePoolRequest) returns (CreatePoolReply) {}
rpc DestroyPool (DestroyPoolRequest) returns (DestroyPoolReply) {}
rpc GetPools (GetPoolsRequest) returns (GetPoolsReply) {}
rpc LabelPool (LabelPoolRequest) returns (LabelPoolReply) {}
rpc UnlabelPool (UnlabelPoolRequest) returns (UnlabelPoolReply) {}
}
Loading

0 comments on commit 5b86344

Please sign in to comment.