Skip to content

Commit

Permalink
[ISSUE #669]🚀Support pull message consume-3 (#670)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Jun 20, 2024
1 parent 52fd565 commit 070743e
Show file tree
Hide file tree
Showing 34 changed files with 1,154 additions and 157 deletions.
15 changes: 11 additions & 4 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use tracing::info;
use tracing::warn;

use crate::broker::broker_hook::BrokerShutdownHook;
use crate::client::default_consumer_ids_change_listener::DefaultConsumerIdsChangeListener;
use crate::client::manager::consumer_manager::ConsumerManager;
use crate::client::manager::producer_manager::ProducerManager;
use crate::filter::manager::consumer_filter_manager::ConsumerFilterManager;
Expand Down Expand Up @@ -88,6 +89,7 @@ pub(crate) struct BrokerRuntime {

broker_runtime: Option<RocketMQRuntime>,
producer_manager: Arc<ProducerManager>,
consumer_manager: Arc<ConsumerManager>,
drop: Arc<AtomicBool>,
shutdown: Arc<AtomicBool>,
shutdown_hook: Option<BrokerShutdownHook>,
Expand Down Expand Up @@ -117,6 +119,7 @@ impl Clone for BrokerRuntime {
broker_out_api: self.broker_out_api.clone(),
broker_runtime: None,
producer_manager: self.producer_manager.clone(),
consumer_manager: self.consumer_manager.clone(),
drop: self.drop.clone(),
shutdown: self.shutdown.clone(),
shutdown_hook: self.shutdown_hook.clone(),
Expand Down Expand Up @@ -154,17 +157,20 @@ impl BrokerRuntime {
TopicConfigManager::new(broker_config.clone(), broker_runtime_inner);
let mut stats_manager = BrokerStatsManager::new(broker_config.clone());
let producer_manager = Arc::new(ProducerManager::new());
let consumer_manager = Arc::new(ConsumerManager::new_with_broker_stats(
Arc::new(DefaultConsumerIdsChangeListener {}),
broker_config.clone(),
));
stats_manager.set_producer_state_getter(Arc::new(ProducerStateGetter {
topic_config_manager: topic_config_manager.clone(),
producer_manager: producer_manager.clone(),
}));
stats_manager.set_consumer_state_getter(Arc::new(ConsumerStateGetter {
topic_config_manager: topic_config_manager.clone(),
consumer_manager: ConsumerManager {},
consumer_manager: consumer_manager.clone(),
}));

let broker_stats_manager = Arc::new(stats_manager);

consumer_manager.set_broker_stats_manager(Some(Arc::downgrade(&broker_stats_manager)));
Self {
broker_config: broker_config.clone(),
message_store_config,
Expand All @@ -182,6 +188,7 @@ impl BrokerRuntime {
broker_out_api: broker_outer_api,
broker_runtime: Some(runtime),
producer_manager,
consumer_manager,
drop: Arc::new(AtomicBool::new(false)),
shutdown: Arc::new(AtomicBool::new(false)),
shutdown_hook: None,
Expand Down Expand Up @@ -907,7 +914,7 @@ impl StateGetter for ProducerStateGetter {

struct ConsumerStateGetter {
topic_config_manager: TopicConfigManager,
consumer_manager: ConsumerManager,
consumer_manager: Arc<ConsumerManager>,
}
impl StateGetter for ConsumerStateGetter {
fn online(&self, instance_id: &str, group: &str, topic: &str) -> bool {
Expand Down
2 changes: 2 additions & 0 deletions rocketmq-broker/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/

pub(crate) mod client_channel_info;
pub(crate) mod consumer_group_event;
pub(crate) mod consumer_group_info;
pub(crate) mod consumer_ids_change_listener;
pub(crate) mod default_consumer_ids_change_listener;
pub(crate) mod manager;
pub(crate) mod net;
37 changes: 16 additions & 21 deletions rocketmq-broker/src/client/client_channel_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,29 @@
*/

use rocketmq_common::TimeUtils::get_current_millis;
use rocketmq_remoting::net::channel::Channel;
use rocketmq_remoting::protocol::LanguageCode;

#[derive(Default, Debug, Clone, Hash)]
#[derive(Debug, Clone, Hash)]
pub struct ClientChannelInfo {
socket_addr: String,
channel: Channel,
client_id: String,
language: LanguageCode,
version: i32,
last_update_timestamp: i64,
last_update_timestamp: u64,
}

impl ClientChannelInfo {
pub fn new(
socket_addr: String,
client_id: String,
language: LanguageCode,
version: i32,
) -> Self {
pub fn new(channel: Channel, client_id: String, language: LanguageCode, version: i32) -> Self {
Self {
socket_addr,
channel,
client_id,
language,
version,
last_update_timestamp: get_current_millis() as i64,
last_update_timestamp: get_current_millis(),
}
}

pub fn socket_addr(&self) -> &String {
&self.socket_addr
}

pub fn client_id(&self) -> &String {
&self.client_id
}
Expand All @@ -59,14 +51,10 @@ impl ClientChannelInfo {
self.version
}

pub fn last_update_timestamp(&self) -> i64 {
pub fn last_update_timestamp(&self) -> u64 {
self.last_update_timestamp
}

pub fn set_socket_addr(&mut self, socket_addr: String) {
self.socket_addr = socket_addr;
}

pub fn set_client_id(&mut self, client_id: String) {
self.client_id = client_id;
}
Expand All @@ -79,7 +67,14 @@ impl ClientChannelInfo {
self.version = version;
}

pub fn set_last_update_timestamp(&mut self, last_update_timestamp: i64) {
pub fn set_last_update_timestamp(&mut self, last_update_timestamp: u64) {
self.last_update_timestamp = last_update_timestamp;
}

pub fn channel(&self) -> &Channel {
&self.channel
}
pub fn set_channel(&mut self, channel: Channel) {
self.channel = channel;
}
}
67 changes: 67 additions & 0 deletions rocketmq-broker/src/client/consumer_group_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#[derive(Debug, Clone)]
pub enum ConsumerGroupEvent {
/// Some consumers in the group are changed.
Change,
/// The group of consumer is unregistered.
Unregister,
/// The group of consumer is registered.
Register,
/// The client of this consumer is new registered.
ClientRegister,
/// The client of this consumer is unregistered.
ClientUnregister,
}
#[cfg(test)]
mod tests {
use super::*;

#[test]
fn consumer_group_event_variants() {
let change = ConsumerGroupEvent::Change;
let unregister = ConsumerGroupEvent::Unregister;
let register = ConsumerGroupEvent::Register;
let client_register = ConsumerGroupEvent::ClientRegister;
let client_unregister = ConsumerGroupEvent::ClientUnregister;

match change {
ConsumerGroupEvent::Change => assert!(true),
_ => assert!(false),
}

match unregister {
ConsumerGroupEvent::Unregister => assert!(true),
_ => assert!(false),
}

match register {
ConsumerGroupEvent::Register => assert!(true),
_ => assert!(false),
}

match client_register {
ConsumerGroupEvent::ClientRegister => assert!(true),
_ => assert!(false),
}

match client_unregister {
ConsumerGroupEvent::ClientUnregister => assert!(true),
_ => assert!(false),
}
}
}
Loading

0 comments on commit 070743e

Please sign in to comment.