Skip to content

Commit

Permalink
[ISSUE #2161]🚀Implement BatchUnregistrationService💫
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Jan 8, 2025
1 parent 4ff2d72 commit 5b276c3
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 0 deletions.
1 change: 1 addition & 0 deletions rocketmq-namesrv/src/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@
* limitations under the License.
*/

pub(crate) mod batch_unregistration_service;
pub mod route_info_manager;
66 changes: 66 additions & 0 deletions rocketmq-namesrv/src/route/batch_unregistration_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use rocketmq_remoting::protocol::header::namesrv::broker_request::UnRegisterBrokerRequestHeader;
use rocketmq_rust::ArcMut;
use tracing::warn;

use crate::bootstrap::NameServerRuntimeInner;

pub(crate) struct BatchUnregistrationService {
name_server_runtime_inner: ArcMut<NameServerRuntimeInner>,
tx: tokio::sync::mpsc::Sender<UnRegisterBrokerRequestHeader>,
rx: Option<tokio::sync::mpsc::Receiver<UnRegisterBrokerRequestHeader>>,
}

impl BatchUnregistrationService {
pub(crate) fn new(name_server_runtime_inner: ArcMut<NameServerRuntimeInner>) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel::<UnRegisterBrokerRequestHeader>(
name_server_runtime_inner
.name_server_config()
.unregister_broker_queue_capacity as usize,
);
BatchUnregistrationService {
name_server_runtime_inner,
tx,
rx: Some(rx),
}
}

Check warning on line 41 in rocketmq-namesrv/src/route/batch_unregistration_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/batch_unregistration_service.rs#L30-L41

Added lines #L30 - L41 were not covered by tests

pub fn submit(&self, request: UnRegisterBrokerRequestHeader) -> bool {
if let Err(e) = self.tx.try_send(request) {
warn!("submit unregister broker request failed: {:?}", e);
return false;
}
true
}

Check warning on line 49 in rocketmq-namesrv/src/route/batch_unregistration_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/batch_unregistration_service.rs#L43-L49

Added lines #L43 - L49 were not covered by tests

pub fn start(&mut self) {
let mut name_server_runtime_inner = self.name_server_runtime_inner.clone();
let mut rx = self.rx.take().expect("rx is None");
let limit = 10;
tokio::spawn(async move {

Check warning on line 55 in rocketmq-namesrv/src/route/batch_unregistration_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/batch_unregistration_service.rs#L51-L55

Added lines #L51 - L55 were not covered by tests
loop {
let mut unregistration_requests = Vec::with_capacity(limit);
tokio::select! {
_ = rx.recv_many(&mut unregistration_requests,limit) => {
name_server_runtime_inner.route_info_manager_mut().un_register_broker(unregistration_requests);
}

Check warning on line 61 in rocketmq-namesrv/src/route/batch_unregistration_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/batch_unregistration_service.rs#L57-L61

Added lines #L57 - L61 were not covered by tests
}
}
});
}

Check warning on line 65 in rocketmq-namesrv/src/route/batch_unregistration_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/batch_unregistration_service.rs#L64-L65

Added lines #L64 - L65 were not covered by tests
}

0 comments on commit 5b276c3

Please sign in to comment.