Skip to content

Commit

Permalink
[ISSUE #777]🔥Fix Broker can not started🐛 (#778)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Jul 14, 2024
1 parent b635421 commit 9d756e4
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 5 deletions.
6 changes: 3 additions & 3 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::any::Any;
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
Expand Down Expand Up @@ -416,10 +415,11 @@ impl BrokerRuntime {
self.broker_config.clone(),
));

let pull_message_result_handler = pull_message_result_handler.as_mut() as &mut dyn Any;
let pull_message_result_handler = pull_message_result_handler.as_mut().as_mut();
pull_message_result_handler
.as_any_mut()
.downcast_mut::<DefaultPullMessageResultHandler>()
.unwrap()
.expect("downcast DefaultPullMessageResultHandler failed")
.set_pull_request_hold_service(Some(Arc::new(
self.pull_request_hold_service.clone().unwrap(),
)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use std::any::Any;
use std::net::SocketAddr;
use std::sync::Arc;

Expand Down Expand Up @@ -231,6 +231,14 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler {
_ => None,
}
}

fn as_any_mut(&mut self) -> &mut dyn Any {
self
}

fn as_any(&self) -> &dyn Any {
self
}
}

impl DefaultPullMessageResultHandler {
Expand Down
43 changes: 42 additions & 1 deletion rocketmq-broker/src/processor/pull_message_result_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::any::Any;

use rocketmq_remoting::net::channel::Channel;
use rocketmq_remoting::protocol::header::pull_message_request_header::PullMessageRequestHeader;
Expand All @@ -25,7 +26,37 @@ use rocketmq_remoting::runtime::server::ConnectionHandlerContext;
use rocketmq_store::base::get_message_result::GetMessageResult;
use rocketmq_store::filter::MessageFilter;

pub trait PullMessageResultHandler: Sync + Send + 'static {
/// Trait defining the behavior for handling the result of a pull message request.
///
/// This trait is designed to be implemented by types that handle the result of a pull message
/// request in a RocketMQ broker. It provides a method for processing the result of a message
/// retrieval operation, along with various parameters related to the request and the broker's
/// state.
pub trait PullMessageResultHandler: Sync + Send + Any + 'static {
/// Handles the result of a pull message request.
///
/// This method processes the result of a message retrieval operation (`get_message_result`),
/// using the provided request information, channel, context, subscription data, and other
/// parameters to generate an appropriate response.
///
/// # Parameters
/// - `get_message_result`: The result of the message retrieval operation.
/// - `request`: The original remoting command representing the pull message request.
/// - `request_header`: The header of the pull message request, containing request-specific
/// information.
/// - `channel`: The channel through which the request was received.
/// - `ctx`: The connection handler context associated with the request.
/// - `subscription_data`: Subscription data for the consumer making the request.
/// - `subscription_group_config`: Configuration for the subscription group of the consumer.
/// - `broker_allow_suspend`: Flag indicating whether the broker allows suspending the request.
/// - `message_filter`: The message filter to apply to the retrieved messages.
/// - `response`: The initial response remoting command to be potentially modified and returned.
/// - `mapping_context`: Context for topic-queue mapping.
/// - `begin_time_mills`: The timestamp (in milliseconds) when the request began processing.
///
/// # Returns
/// An optional `RemotingCommand` representing the response to the pull message request.
/// If `None`, it indicates that no response should be sent back to the client.
fn handle(
&self,
get_message_result: GetMessageResult,
Expand All @@ -41,4 +72,14 @@ pub trait PullMessageResultHandler: Sync + Send + 'static {
mapping_context: TopicQueueMappingContext,
begin_time_mills: u64,
) -> Option<RemotingCommand>;

/// Returns a mutable reference to `self` as a trait object of type `Any`.
///
/// This method is useful for downcasting the trait object to its concrete type.
fn as_any_mut(&mut self) -> &mut dyn Any;

/// Returns a reference to `self` as a trait object of type `Any`.
///
/// This method is useful for downcasting the trait object to its concrete type.
fn as_any(&self) -> &dyn Any;
}

0 comments on commit 9d756e4

Please sign in to comment.