From 5b276c3f7431dcdf647b6eeaa91ba89e1166de54 Mon Sep 17 00:00:00 2001 From: mxsm Date: Wed, 8 Jan 2025 11:11:18 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#2161]=F0=9F=9A=80Implement=20BatchUnr?= =?UTF-8?q?egistrationService=F0=9F=92=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rocketmq-namesrv/src/route.rs | 1 + .../src/route/batch_unregistration_service.rs | 66 +++++++++++++++++++ 2 files changed, 67 insertions(+) create mode 100644 rocketmq-namesrv/src/route/batch_unregistration_service.rs diff --git a/rocketmq-namesrv/src/route.rs b/rocketmq-namesrv/src/route.rs index d079a053..e3dd3305 100644 --- a/rocketmq-namesrv/src/route.rs +++ b/rocketmq-namesrv/src/route.rs @@ -15,4 +15,5 @@ * limitations under the License. */ +pub(crate) mod batch_unregistration_service; pub mod route_info_manager; diff --git a/rocketmq-namesrv/src/route/batch_unregistration_service.rs b/rocketmq-namesrv/src/route/batch_unregistration_service.rs new file mode 100644 index 00000000..7cce91be --- /dev/null +++ b/rocketmq-namesrv/src/route/batch_unregistration_service.rs @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use rocketmq_remoting::protocol::header::namesrv::broker_request::UnRegisterBrokerRequestHeader; +use rocketmq_rust::ArcMut; +use tracing::warn; + +use crate::bootstrap::NameServerRuntimeInner; + +pub(crate) struct BatchUnregistrationService { + name_server_runtime_inner: ArcMut, + tx: tokio::sync::mpsc::Sender, + rx: Option>, +} + +impl BatchUnregistrationService { + pub(crate) fn new(name_server_runtime_inner: ArcMut) -> Self { + let (tx, rx) = tokio::sync::mpsc::channel::( + name_server_runtime_inner + .name_server_config() + .unregister_broker_queue_capacity as usize, + ); + BatchUnregistrationService { + name_server_runtime_inner, + tx, + rx: Some(rx), + } + } + + pub fn submit(&self, request: UnRegisterBrokerRequestHeader) -> bool { + if let Err(e) = self.tx.try_send(request) { + warn!("submit unregister broker request failed: {:?}", e); + return false; + } + true + } + + pub fn start(&mut self) { + let mut name_server_runtime_inner = self.name_server_runtime_inner.clone(); + let mut rx = self.rx.take().expect("rx is None"); + let limit = 10; + tokio::spawn(async move { + loop { + let mut unregistration_requests = Vec::with_capacity(limit); + tokio::select! { + _ = rx.recv_many(&mut unregistration_requests,limit) => { + name_server_runtime_inner.route_info_manager_mut().un_register_broker(unregistration_requests); + } + } + } + }); + } +}