Skip to content

Commit

Permalink
- Fixed getReceiverEntry() (https://issues.redhat.com/browse/JGRP-2852)
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Nov 11, 2024
1 parent c9c963f commit 508acac
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 63 deletions.
117 changes: 67 additions & 50 deletions src/org/jgroups/protocols/ReliableUnicast.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ public <T extends Protocol> T setLevel(String level) {
public ReliableUnicast sendAtomically(boolean f) {send_atomically=f; return this;}
public boolean loopback() {return loopback;}
public ReliableUnicast loopback(boolean b) {this.loopback=b; return this;}
public ReliableUnicast timeService(TimeService ts) {this.time_service=ts; return this;} // testing only!
public ReliableUnicast lastSync(ExpiryCache<Address> c) {this.last_sync_sent=c; return this;} // testing only!


@ManagedOperation
Expand Down Expand Up @@ -438,30 +440,6 @@ public void stop() {
msg_cache.clear();
}

public Object up(Message msg) {
Address dest=msg.dest(), sender=msg.src();
if(dest == null || dest.isMulticast() || msg.isFlagSet(NO_RELIABILITY)) // only handle unicast messages
return up_prot.up(msg); // pass up

UnicastHeader hdr=msg.getHeader(this.id);
if(hdr == null)
return up_prot.up(msg);
switch(hdr.type) {
case DATA: // received regular message
if(is_trace)
log.trace("%s <-- %s: DATA(#%d, conn_id=%d%s)", local_addr, sender, hdr.seqno, hdr.conn_id, hdr.first? ", first" : "");
if(Objects.equals(local_addr, sender))
handleDataReceivedFromSelf(sender, hdr.seqno, msg);
else
handleDataReceived(sender, hdr.seqno, hdr.conn_id, hdr.first, msg);
break; // we pass the deliverable message up in handleDataReceived()
default:
handleUpEvent(sender, msg, hdr);
break;
}
return null;
}

protected void handleUpEvent(Address sender, Message msg, UnicastHeader hdr) {
try {
switch(hdr.type) {
Expand Down Expand Up @@ -494,6 +472,30 @@ protected void handleUpEvent(Address sender, Message msg, UnicastHeader hdr) {
}
}

public Object up(Message msg) {
Address dest=msg.dest(), sender=msg.src();
if(dest == null || dest.isMulticast() || msg.isFlagSet(NO_RELIABILITY)) // only handle unicast messages
return up_prot.up(msg); // pass up

UnicastHeader hdr=msg.getHeader(this.id);
if(hdr == null)
return up_prot.up(msg);
switch(hdr.type) {
case DATA: // received regular message
if(is_trace)
log.trace("%s <-- %s: DATA(#%d, conn_id=%d%s)", local_addr, sender, hdr.seqno, hdr.conn_id, hdr.first? ", first" : "");
if(Objects.equals(local_addr, sender))
handleDataReceivedFromSelf(sender, hdr.seqno, msg);
else
handleDataReceived(sender, hdr.seqno, hdr.conn_id, hdr.first, msg);
break; // we pass the deliverable message up in handleDataReceived()
default:
handleUpEvent(sender, msg, hdr);
break;
}
return null;
}

public void up(MessageBatch batch) {
if(batch.dest() == null || batch.dest().isMulticast()) { // not a unicast batch
up_prot.up(batch);
Expand Down Expand Up @@ -553,6 +555,10 @@ else if(entry == null) {
if(queued_msgs != null)
addQueuedMessages(sender, entry, queued_msgs);
}

// the code below removes messages that have a HIGHER conn_id; instead we should replace the
// ReceiverEntry with one with the higher conn_id!

if(msgs.keySet().retainAll(Collections.singletonList(entry.connId()))) // remove all conn-ids that don't match
sendRequestForFirstSeqno(sender, batch.dest());
List<LongTuple<Message>> list=msgs.get(entry.connId());
Expand Down Expand Up @@ -611,8 +617,6 @@ protected void handleBatchFromSelf(MessageBatch batch, Entry entry) {
up_prot.up(batch);
}



public Object down(Event evt) {
switch (evt.getType()) {

Expand Down Expand Up @@ -751,7 +755,6 @@ public void removeReceiveConnection(Address mbr) {
entry.state(State.CLOSED);
}


/**
* This method is public only so it can be invoked by unit testing, but should not otherwise be used !
*/
Expand All @@ -761,7 +764,6 @@ public void removeAllConnections() {
recv_table.clear();
}


/** Sends a retransmit request to the given sender */
protected void retransmit(SeqnoList missing, Address sender, Address real_dest) {
Message xmit_msg=new ObjectMessage(sender, missing).setFlag(OOB, NO_FC)
Expand All @@ -774,7 +776,6 @@ protected void retransmit(SeqnoList missing, Address sender, Address real_dest)
xmit_reqs_sent.add(missing.size());
}


/** Called by the sender to resend messages for which no ACK has been received yet */
protected void retransmit(Message msg) {
if(is_trace) {
Expand Down Expand Up @@ -926,7 +927,6 @@ protected void removeAndDeliver(Entry entry, Address sender, AsciiString cluster
while(mb != null || adders.decrementAndGet() != 0);
}


protected String printMessageList(List<LongTuple<Message>> list) {
StringBuilder sb=new StringBuilder();
int size=list.size();
Expand All @@ -945,41 +945,57 @@ protected String printMessageList(List<LongTuple<Message>> list) {
return sb.toString();
}

protected ReceiverEntry getReceiverEntry(Address sender, long seqno, boolean first, short conn_id, Address dest) {
protected ReceiverEntry getReceiverEntry(Address sender, long seqno, boolean first, short conn_id, Address real_dest) {
ReceiverEntry entry=recv_table.get(sender);
if(entry != null && entry.connId() == conn_id)
return entry;
return _getReceiverEntry(sender, seqno, first, conn_id, real_dest);
}

// public for unit testing - don't use in app code!
public ReceiverEntry _getReceiverEntry(Address sender, long seqno, boolean first, short conn_id, Address real_dest) {
ReceiverEntry entry;
recv_table_lock.lock();
try {
entry=recv_table.get(sender);
if(first) {
if(entry == null) {
entry=createReceiverEntry(sender, seqno, conn_id, dest);
}
else { // entry != null && win != null
if(conn_id != entry.connId()) {
log.trace("%s: conn_id=%d != %d; resetting receiver window", local_addr, conn_id, entry.connId());
recv_table.remove(sender);
entry=createReceiverEntry(sender, seqno, conn_id, dest);
}
}
}
else { // entry == null && win == null OR entry != null && win == null OR entry != null && win != null
if(entry == null || entry.connId() != conn_id) {
if(entry == null) {
if(first)
return createReceiverEntry(sender, seqno, conn_id, real_dest);
else {
recv_table_lock.unlock();
sendRequestForFirstSeqno(sender, dest); // drops the message and returns (see below)
sendRequestForFirstSeqno(sender, real_dest); // drops the message and returns (see below)
return null;
}
}
return entry;
// entry != null
return compareConnIds(conn_id, entry.connId(), first, entry, sender, seqno, real_dest);
}
finally {
if(recv_table_lock.isHeldByCurrentThread())
recv_table_lock.unlock();
}
}

protected ReceiverEntry compareConnIds(short other, short mine, boolean first, ReceiverEntry e,
Address sender, long seqno, Address real_dest) {
if(other == mine)
return e;
if(other < mine)
return null;
// other_conn_id > my_conn_id
if(first) {
log.trace("%s: other conn_id (%d) > mine (%d); creating new receiver window", local_addr, other, mine);
recv_table.remove(sender);
return createReceiverEntry(sender, seqno, other, real_dest);
}
else {
log.trace("%s: other conn_id (%d) > mine (%d) (!first); asking for first message", local_addr, other, mine);
recv_table_lock.unlock();
sendRequestForFirstSeqno(sender, real_dest); // drops the message and returns (see below)
return null;
}
}

protected SenderEntry getSenderEntry(Address dst) {
SenderEntry entry=send_table.get(dst);
if(entry == null || entry.state() == State.CLOSED) {
Expand Down Expand Up @@ -1459,8 +1475,8 @@ protected Entry(short conn_id, Buffer<Message> buf) {
update();
}

protected Buffer<Message> buf() {return buf;}
protected short connId() {return conn_id;}
public Buffer<Message> buf() {return buf;}
public short connId() {return conn_id;}
protected void update() {timestamp.set(getTimestamp());}
protected State state() {return state;}
protected Entry state(State s) {if(this.state != s) {this.state=s; update();} return this;}
Expand Down Expand Up @@ -1513,7 +1529,8 @@ public String toString() {
}
}

protected final class ReceiverEntry extends Entry {
// public for unit testing
public final class ReceiverEntry extends Entry {
private final Address real_dest ; // if real_dest != local_addr (https://issues.redhat.com/browse/JGRP-2729)

public ReceiverEntry(Buffer<Message> received_msgs, short recv_conn_id, Address real_dest) {
Expand Down
47 changes: 34 additions & 13 deletions tests/junit-functional/org/jgroups/protocols/UNICAST_OOB_Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ protected void setup(Class<? extends Protocol> unicast_class) throws Exception {
a.connect("UNICAST_OOB_Test");
b.connect("UNICAST_OOB_Test");
Util.waitUntilAllChannelsHaveSameView(3000, 100, a,b);
System.out.printf("-- cluster formed: %s\n", b.view());
}

@AfterMethod
Expand Down Expand Up @@ -75,6 +76,7 @@ public void testSecondMessageReceivedFirstOOB(Class<? extends Protocol> unicast_
_testSecondMessageReceivedFirst(true, false);
}

// @Test(invocationCount=100)
public void testSecondMessageReceivedFirstOOBBatched(Class<? extends Protocol> unicast_class) throws Exception {
setup(unicast_class);
_testSecondMessageReceivedFirst(true, true);
Expand All @@ -84,14 +86,22 @@ protected void _testSecondMessageReceivedFirst(boolean oob, boolean use_batches)
Address dest=a.getAddress(), src=b.getAddress();
Protocol u_a=a.getProtocolStack().findProtocol(Util.getUnicastProtocols()),
u_b=b.getProtocolStack().findProtocol(Util.getUnicastProtocols());
Util.invoke(u_a, "removeReceiveConnection", src);
Util.invoke(u_a, "removeSendConnection", src);
Util.invoke(u_b, "removeReceiveConnection", dest);
Util.invoke(u_b, "removeSendConnection", dest);
System.out.println("=============== removed connection between A and B ===========");

REVERSE reverse=new REVERSE().numMessagesToReverse(5)
.filter(msg -> msg.getDest() != null && src.equals(msg.getSrc()) && (msg.getFlags(false) == 0 || msg.isFlagSet(Message.Flag.OOB)));
for(int i=0; i < 10; i++) {
Util.invoke(u_a, "removeReceiveConnection", src);
Util.invoke(u_a, "removeSendConnection", src);
Util.invoke(u_b, "removeReceiveConnection", dest);
Util.invoke(u_b, "removeSendConnection", dest);
int num_connections=(int)Util.invoke(u_a, "getNumConnections") +
(int)Util.invoke(u_b, "getNumConnections");
if(num_connections == 0)
break;
Util.sleep(100);
}
System.out.println("=============== removed connections between A and B ===========");

Protocol reverse=new REVERSE().numMessagesToReverse(5)
.filter(msg -> msg.getDest() != null && src.equals(msg.src()) && (msg.getFlags(false) == 0 || msg.isFlagSet(Message.Flag.OOB)));
// REVERSE2 reverse=new REVERSE2().filter(m -> m.dest() != null && m.isFlagSet(Message.Flag.OOB) && src.equals(m.src()));
a.getProtocolStack().insertProtocol(reverse, ProtocolStack.Position.BELOW, UNICAST3.class,UNICAST4.class);

if(use_batches) {
Expand All @@ -100,17 +110,28 @@ protected void _testSecondMessageReceivedFirst(boolean oob, boolean use_batches)
mb.start();
}

MyReceiver<Long> r=new MyReceiver<>();
MyReceiver<Long> r=new MyReceiver<Long>().name(a.getName()).verbose(true);
a.setReceiver(r);

System.out.println("========== B sends messages 1-5 to A ==========");
System.out.printf("========== B sending %s messages 1-5 to A ==========\n", oob? "OOB" : "regular");
//u_a.setLevel("trace"); u_b.setLevel("trace");

long start=System.currentTimeMillis();
for(int i=1; i <= 5; i++) {
Message msg=new ObjectMessage(dest, (long)i);
if(oob) msg.setFlag(Message.Flag.OOB);
b.send(msg);
System.out.printf("-- %s: sent %s, hdrs: %s\n", b.address(), msg, msg.printHeaders());
}
Util.waitUntil(10000, 100, () -> r.size() == 5);
if(reverse instanceof REVERSE2) {
REVERSE2 rr=((REVERSE2)reverse);
Util.waitUntilTrue(2000, 100, () -> rr.size() == 5);
rr.filter(null); // from now on, all msgs are passed up
rr.deliver();
}

Util.waitUntil(5000, 100, () -> r.size() == 5,
() -> String.format("expected 5 messages but got %s", r.list()));
long time=System.currentTimeMillis() - start;
System.out.printf("===== list: %s (in %d ms)\n", r.list(), time);
long expected_time=XMIT_INTERVAL * 10; // increased because times might increase with the increase in parallel tests
Expand Down Expand Up @@ -167,11 +188,11 @@ protected static JChannel createChannel(String name, Class<? extends Protocol> u
Protocol p=unicast_class.getConstructor().newInstance();
Util.invoke(p, "setXmitInterval", XMIT_INTERVAL);
return new JChannel(
new SHARED_LOOPBACK(),
new SHARED_LOOPBACK(), // .bundler("nb"),
new SHARED_LOOPBACK_PING(),
new NAKACK2(),
p,
new GMS())
new GMS().printLocalAddress(false))
.name(name);
}

Expand Down
Loading

0 comments on commit 508acac

Please sign in to comment.