Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

833 to v2 alien partitions api #865

Open
wants to merge 5 commits into
base: release_v2.0.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Bob versions changelog

## [Unreleased]
#### Added
- Partitions API for aliens (#833)


#### Changed
Expand Down
4 changes: 2 additions & 2 deletions bob-backend/src/pearl/disk_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ impl DiskController {
for holder in holders.iter() {
let storage = holder.storage().read().await;
let storage = storage.storage().clone();
let id = holder.get_id();
let id = holder.get_id().to_owned();
futures.push(async move {
match storage.close().await {
Ok(_) => debug!("holder {} closed", id),
Expand Down Expand Up @@ -599,7 +599,7 @@ impl DiskController {
.await
}

pub(crate) fn groups(&self) -> Arc<RwLock<Vec<Group>>> {
pub fn groups(&self) -> Arc<RwLock<Vec<Group>>> {
self.groups.clone()
}
}
21 changes: 21 additions & 0 deletions bob-backend/src/pearl/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,27 @@ impl Group {
Ok(removed)
}

pub async fn detach_by_id(&self, id: &str) -> BackendResult<Holder> {
let mut holders = self.holders.write().await;
debug!("write lock acquired");
let ts = get_current_timestamp();
for ind in 0..holders.len() {
if let Some(holder) = holders.get_child(ind) {
if holder.data.get_id() == id {
if !holder.data.gets_into_interval(ts) {
let removed = holders.remove(ind).expect("should be presented");
removed.close_storage().await;
return Ok(removed);
}
else {
return Err(Error::pearl_change_state(format!("Cannot detach active partition (pearl:{})", id)));
}
}
}
}
Err(Error::pearl_change_state(format!("pearl:{} not found", id)))
}

pub async fn detach_all(&self) -> BackendResult<()> {
let mut holders_lock = self.holders.write().await;
let holders: Vec<_> = holders_lock.clear_and_get_values();
Expand Down
3 changes: 1 addition & 2 deletions bob-backend/src/pearl/holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,11 @@ impl Holder {
self.end_timestamp
}

pub fn get_id(&self) -> String {
pub fn get_id(&self) -> &str {
self.disk_path
.file_name()
.and_then(std::ffi::OsStr::to_str)
.unwrap_or("unparsable string")
.to_owned()
}

pub fn storage(&self) -> &RwLock<PearlSync> {
Expand Down
157 changes: 149 additions & 8 deletions bob/src/api/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ pub(crate) struct Partition {
records_count: usize,
}

#[derive(Debug, Serialize, Clone)]
pub(crate) struct PartitionSlim {
id: String,
timestamp: u64
}

#[derive(Debug)]
pub(crate) struct StatusExt {
status: Status,
Expand Down Expand Up @@ -142,7 +148,11 @@ pub(crate) fn spawn(bob: BobServer, address: IpAddr, port: u16) {
distribution_function,
get_data,
put_data,
metrics
metrics,
partitions_by_disk_vdisk,
alien_partitions_by_node_vdisk,
delete_partition_by_id,
alien_delete_partition_by_id
];
info!("API server started");
let mut config = Config::release_default();
Expand Down Expand Up @@ -445,7 +455,7 @@ async fn partitions(
debug!("get pearl holders: OK");
let mut partitions = vec![];
for pearl in pearls.iter() {
partitions.push(pearl.get_id());
partitions.push(pearl.get_id().to_owned());
}
let ps = VDiskPartitions {
node_name: group.node_name().to_owned(),
Expand Down Expand Up @@ -544,7 +554,7 @@ async fn delete_partition(
let group = find_group(bob, vdisk_id).await?;
let pearls = group.detach(timestamp).await;
if let Ok(holders) = pearls {
drop_directories(holders, timestamp, vdisk_id).await
drop_directories(holders, format!("timestamp {} on vdisk {}", timestamp, vdisk_id)).await
} else {
let msg = format!(
"partitions with timestamp {} not found on vdisk {} or it is active",
Expand All @@ -556,20 +566,19 @@ async fn delete_partition(

async fn drop_directories(
holders: Vec<Holder>,
timestamp: u64,
vdisk_id: u32,
partitions_description: String
) -> Result<StatusExt, StatusExt> {
let mut result = String::new();
let mut error = false;
for holder in holders {
let msg = if let Err(e) = holder.drop_directory().await {
error = true;
format!(
"partitions with timestamp {} delete failed on vdisk {}, error: {}",
timestamp, vdisk_id, e
"partitions {} delete failed, error: {}",
partitions_description, e
)
} else {
format!("partitions deleted with timestamp {}", timestamp)
format!("partitions {} deleted", partitions_description)
};
result.push_str(&msg);
result.push('\n');
Expand Down Expand Up @@ -705,6 +714,138 @@ impl FromParam<'_> for DataKey {
}
}

#[get("/disks/<disk_name>/vdisks/<vdisk_id>/partitions")]
async fn partitions_by_disk_vdisk(
bob: &State<BobServer>,
disk_name: String,
vdisk_id: u32,
) -> Result<Json<Vec<PartitionSlim>>, StatusExt> {
let group = find_group_on_disk(&bob, &disk_name, vdisk_id).await?;
let partitions = create_slim_partitions(group).await;
Ok(Json(partitions))
}

#[get("/alien/nodes/<node_name>/vdisks/<vdisk_id>/partitions")]
async fn alien_partitions_by_node_vdisk(
bob: &State<BobServer>,
node_name: String,
vdisk_id: u32,
) -> Result<Json<Vec<PartitionSlim>>, StatusExt> {
let group = find_alien_group_on_disk(&bob, &node_name, vdisk_id).await?;
let partitions = create_slim_partitions(group).await;
Ok(Json(partitions))
}

#[delete("/disks/<disk_name>/vdisks/<vdisk_id>/partitions/<partition_id>")]
async fn delete_partition_by_id(
bob: &State<BobServer>,
disk_name: String,
vdisk_id: u32,
partition_id: String,
) -> Result<StatusExt, StatusExt> {
let group = find_group_on_disk(&bob, &disk_name, vdisk_id).await?;
let pearl = group.detach_by_id(&partition_id).await.ok();
if let Some(holder) = pearl {
drop_directories(vec![holder], format!("id {} in vdisk {} on disk {}", partition_id, vdisk_id, disk_name)).await
} else {
let msg = format!(
"partition {} not found on vdisk {} on disk {} or it is active",
partition_id, vdisk_id, disk_name
);
Err(StatusExt::new(Status::NotFound, true, msg))
}
}

// DELETE /alien/nodes/:node_name/vdisks/:vdisk_id/partitions/:partition_id
#[delete("/alien/nodes/<node_name>/vdisks/<vdisk_id>/partitions/<partition_id>")]
async fn alien_delete_partition_by_id(
bob: &State<BobServer>,
node_name: String,
vdisk_id: u32,
partition_id: String,
) -> Result<StatusExt, StatusExt> {
let group = find_alien_group_on_disk(&bob, &node_name, vdisk_id).await?;
let pearl = group.detach_by_id(&partition_id).await.ok();
if let Some(holder) = pearl {
drop_directories(vec![holder], format!("id {} in vdisk {} for node {}", partition_id, vdisk_id, node_name)).await
} else {
let msg = format!(
"alien partition {} not found on vdisk {} for node {} or it is active",
partition_id, vdisk_id, node_name
);
Err(StatusExt::new(Status::NotFound, true, msg))
}
}

async fn find_group_on_disk(
bob: &BobServer,
disk_name: &str,
vdisk_id: u32
) -> Result<PearlGroup, StatusExt> {
let backend = bob.grinder().backend().inner();
let (dcs, _) = backend
.disk_controllers()
.ok_or_else(not_acceptable_backend)?;
find_disk_vdisk_group(dcs, disk_name, vdisk_id).await
}

async fn find_alien_group_on_disk(
bob: &BobServer,
node_name: &str,
vdisk_id: u32
) -> Result<PearlGroup, StatusExt> {
let backend = bob.grinder().backend().inner();
let (_, adc) = backend
.disk_controllers()
.ok_or_else(not_acceptable_backend)?;
let groups = adc.groups();
let pearls = groups.read().await;
pearls.iter()
.find(|g| g.node_name() == node_name && g.vdisk_id() == vdisk_id)
.cloned()
.ok_or_else(|| {
let msg = format!("Alien vdisk group for node {} vdisk {} not found", node_name, vdisk_id);
StatusExt::new(Status::NotFound, false, msg)
})
}

async fn find_disk_vdisk_group(
dcs: &[std::sync::Arc<bob_backend::pearl::DiskController>],
disk_name: &str,
vdisk_id: u32
) -> Result<PearlGroup, StatusExt> {
let needed_dc = dcs
.iter()
.find(|dc| dc.disk().name() == disk_name)
.ok_or_else(|| {
let dcs = dcs.iter()
.map(|dc| format!("DC: {}, vdisks: {}",
dc.disk().name(),
dc.vdisks().iter().map(|v| format!("#{}", v)).collect::<Vec<_>>().join(", ")))
.collect::<Vec<_>>()
.join(", ");
let err = format!("Disk Controller {} with vdisk #{} not found, available dcs: {}", disk_name, vdisk_id, dcs);
warn!("{}", err);
StatusExt::new(Status::NotFound, false, err)
})?;
needed_dc.vdisk_group(vdisk_id).await.map_err(|e| {
let err = format!("VDiskGroup #{} is missing on disk controller '{}', available vdisks: {}",
vdisk_id,
needed_dc.disk().name(),
needed_dc.vdisks().iter().map(|v| format!("#{}", v)).collect::<Vec<_>>().join(", "));
warn!("{}. Error: {:?}", err, e);
StatusExt::new(Status::NotFound, false, err)
})
}

async fn create_slim_partitions(group: PearlGroup) -> Vec<PartitionSlim> {
let holders = group.holders();
let pearls = holders.read().await;
return pearls.iter()
.map(|h| PartitionSlim { id: h.get_id().to_owned(), timestamp: h.start_timestamp() })
.collect();
}

fn internal(message: String) -> StatusExt {
StatusExt::new(Status::InternalServerError, false, message)
}
Expand Down
Loading
Loading