From e08f5b26040dffaae0c708af5c40cc1ad225a5b4 Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Thu, 8 Feb 2024 19:22:26 +0100 Subject: [PATCH] Prevent overriding of message flags with RequestOption flags (https://issues.redhat.com/browse/JGRP-2763) --- src/org/jgroups/BaseMessage.java | 28 +++++++++++++------ src/org/jgroups/Message.java | 15 ++++++++-- src/org/jgroups/blocks/RequestCorrelator.java | 9 +++--- 3 files changed, 37 insertions(+), 15 deletions(-) diff --git a/src/org/jgroups/BaseMessage.java b/src/org/jgroups/BaseMessage.java index 6c685e13f8..61ca49edab 100644 --- a/src/org/jgroups/BaseMessage.java +++ b/src/org/jgroups/BaseMessage.java @@ -60,7 +60,7 @@ public BaseMessage(Address dest) { public Message setFlag(Flag... flags) { if(flags != null) { short tmp=this.flags; - for(Flag flag : flags) { + for(Flag flag: flags) { if(flag != null) tmp|=flag.value(); } @@ -84,12 +84,24 @@ public Message setFlag(TransientFlag... flags) { return this; } - - public Message setFlag(short flag, boolean transient_flags) { - if(transient_flags) - this.transient_flags=(byte)flag; - else - this.flags=flag; + @Override + public Message setFlag(short flag, boolean transient_flags, boolean xor) { + if(transient_flags) { + if(xor) { + byte tmp=this.transient_flags; + this.transient_flags=(byte)(tmp | (byte)flag); + } + else + this.transient_flags=(byte)flag; + } + else { + if(xor) { + short tmp=this.flags; + this.flags=(short)(tmp | flag); + } + else + this.flags=flag; + } return this; } @@ -131,7 +143,7 @@ public Message clearFlag(TransientFlag... flags) { /** * Checks if a given flag is set * @param flag The flag - * @return Whether or not the flag is currently set + * @return Whether the flag is currently set */ public boolean isFlagSet(Flag flag) { return Util.isFlagSet(flags, flag); diff --git a/src/org/jgroups/Message.java b/src/org/jgroups/Message.java index 8a21f47858..155cb7384a 100644 --- a/src/org/jgroups/Message.java +++ b/src/org/jgroups/Message.java @@ -67,7 +67,7 @@ public interface Message extends SizeStreamable, Constructable { /** Removes all headers: use carefully! */ Message clearHeaders(); - /** Sets one or more flags */ + /** Sets one or more flags (xor-ing existing flags) */ Message setFlag(Flag... flags); @@ -75,9 +75,18 @@ public interface Message extends SizeStreamable, Constructable { * @param flag The flag to be set (as a short). Overrides existing flags (no xor-ing) * @param transient_flags True if the flag is transient, false otherwise */ - Message setFlag(short flag, boolean transient_flags); + default Message setFlag(short flag, boolean transient_flags) { + return setFlag(flag, transient_flags, false); + } + + /** Sets the flags as a short; this way, multiple flags can be set in one operation + * @param flag The flag to be set (as a short). Overrides existing flags (no xor-ing) + * @param transient_flags True if the flag is transient, false otherwise + * @param xor When true, existing flags will be xor-ed with flag, otherwise not + */ + Message setFlag(short flag, boolean transient_flags, boolean xor); - /** Sets one or more transient flags. Transient flags are not marshalled */ + /** Sets one or more transient flags (xor-ing). Transient flags are not marshalled */ Message setFlag(TransientFlag... flags); /** Atomically sets a transient flag if not set. Returns true if the flags was set, else false (already set) */ diff --git a/src/org/jgroups/blocks/RequestCorrelator.java b/src/org/jgroups/blocks/RequestCorrelator.java index b16a8f5823..bbcdbad21b 100644 --- a/src/org/jgroups/blocks/RequestCorrelator.java +++ b/src/org/jgroups/blocks/RequestCorrelator.java @@ -120,7 +120,8 @@ public void sendMulticastRequest(Collection
dest_mbrs, Message msg, : new Header(Header.REQ, 0, this.corr_id); msg.putHeader(this.corr_id, hdr) - .setFlag(opts.flags(), false).setFlag(opts.transientFlags(), true); + .setFlag(opts.flags(), false, true) + .setFlag(opts.transientFlags(), true, true); if(req != null) // sync addEntry(req, hdr, false); @@ -141,8 +142,8 @@ public void sendMulticastRequest(Collection
dest_mbrs, Message msg, public void sendUnicastRequest(Message msg, Request req, RequestOptions opts) throws Exception { Address dest=msg.getDest(); Header hdr=new Header(Header.REQ, 0, this.corr_id); - msg.putHeader(this.corr_id, hdr).setFlag(opts.flags(), false) - .setFlag(opts.transientFlags(), true); + msg.putHeader(this.corr_id, hdr).setFlag(opts.flags(), false, true) + .setFlag(opts.transientFlags(), true, true); if(req != null) // sync RPC addEntry(req, hdr, true); @@ -424,7 +425,7 @@ protected void handleResponse(Message rsp, Header hdr) { protected void sendReply(final Message req, final long req_id, Object reply, boolean is_exception) { - Message rsp=makeReply(req).setFlag(req.getFlags(false), false) + Message rsp=makeReply(req).setFlag(req.getFlags(false), false, true) .setPayload(reply) .clearFlag(Message.Flag.RSVP); // JGRP-1940 sendResponse(rsp, req_id, is_exception);