Skip to content

Commit

Permalink
Fix Jms drop record (#30218)
Browse files Browse the repository at this point in the history
* Fix JmsIO read drop message

* Close consumer before finalizing

* Close last consumer and session on finalizeCheckpoint

* Acknowlege single message in a session

* Remove unnecessary write lock in JmsCheckpointMakr (it's immutable)

* Add unit test

* Add message when test fail

* remove test leftover

* address comments

* enable integration test on amqp
  • Loading branch information
Abacn authored Feb 7, 2024
1 parent 2c5d153 commit c72a9f8
Show file tree
Hide file tree
Showing 4 changed files with 301 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -39,87 +40,67 @@ class JmsCheckpointMark implements UnboundedSource.CheckpointMark, Serializable

private static final Logger LOG = LoggerFactory.getLogger(JmsCheckpointMark.class);

private Instant oldestMessageTimestamp = Instant.now();
private transient List<Message> messages = new ArrayList<>();
private Instant oldestMessageTimestamp;
private transient @Nullable Message lastMessage;
private transient @Nullable MessageConsumer consumer;
private transient @Nullable Session session;

@VisibleForTesting transient boolean discarded = false;

@VisibleForTesting final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

JmsCheckpointMark() {}
private JmsCheckpointMark(
Instant oldestMessageTimestamp,
@Nullable Message lastMessage,
@Nullable MessageConsumer consumer,
@Nullable Session session) {
this.oldestMessageTimestamp = oldestMessageTimestamp;
this.lastMessage = lastMessage;
this.consumer = consumer;
this.session = session;
}

void add(Message message) throws Exception {
lock.writeLock().lock();
/** Acknowledge all outstanding message. */
@Override
public void finalizeCheckpoint() {
try {
if (discarded) {
throw new IllegalStateException(
String.format(
"Attempting to add message %s to checkpoint that is discarded.", message));
}
Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
if (currentMessageTimestamp.isBefore(oldestMessageTimestamp)) {
oldestMessageTimestamp = currentMessageTimestamp;
// Jms spec will implicitly acknowledge _all_ messaged already received by the same
// session if one message in this session is being acknowledged.
if (lastMessage != null) {
lastMessage.acknowledge();
}
messages.add(message);
} finally {
lock.writeLock().unlock();
} catch (JMSException e) {
// The effect of this is message not get acknowledged and thus will be redelivered. It is
// not fatal, so we just raise error log. Similar below.
LOG.error(
"Failed to acknowledge the message. Will redeliver and might cause duplication.", e);
}
}

Instant getOldestMessageTimestamp() {
lock.readLock().lock();
try {
return this.oldestMessageTimestamp;
} finally {
lock.readLock().unlock();
// session is closed after message acknowledged otherwise other consumer may receive duplicate
// messages.
if (consumer != null) {
try {
consumer.close();
consumer = null;
} catch (JMSException e) {
LOG.info("Error closing JMS consumer. It may have already been closed.");
}
}
}

void discard() {
lock.writeLock().lock();
try {
this.discarded = true;
} finally {
lock.writeLock().unlock();
}
}

/**
* Acknowledge all outstanding message. Since we believe that messages will be delivered in
* timestamp order, and acknowledged messages will not be retried, the newest message in this
* batch is a good bound for future messages.
*/
@Override
public void finalizeCheckpoint() {
lock.writeLock().lock();
try {
if (discarded) {
messages.clear();
return;
// session needs to be closed after message acknowledged because the latter needs session remain
// active.
if (session != null) {
try {
session.close();
session = null;
} catch (JMSException e) {
LOG.info("Error closing JMS session. It may have already been closed.");
}
for (Message message : messages) {
try {
message.acknowledge();
Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
oldestMessageTimestamp = currentMessageTimestamp;
}
} catch (Exception e) {
LOG.error("Exception while finalizing message: ", e);
}
}
messages.clear();
} finally {
lock.writeLock().unlock();
}
}

// set an empty list to messages when deserialize
private void readObject(java.io.ObjectInputStream stream)
throws IOException, ClassNotFoundException {
stream.defaultReadObject();
messages = new ArrayList<>();
discarded = false;
lastMessage = null;
session = null;
}

@Override
Expand All @@ -138,4 +119,90 @@ public boolean equals(@Nullable Object o) {
public int hashCode() {
return Objects.hash(oldestMessageTimestamp);
}

static Preparer newPreparer() {
return new Preparer();
}

/**
* A class preparing the immutable checkpoint. It is mutable so that new messages can be added.
*/
static class Preparer {
private Instant oldestMessageTimestamp = Instant.now();
private transient @Nullable Message lastMessage = null;

@VisibleForTesting transient boolean discarded = false;

@VisibleForTesting final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

private Preparer() {}

void add(Message message) throws Exception {
lock.writeLock().lock();
try {
if (discarded) {
throw new IllegalStateException(
String.format(
"Attempting to add message %s to checkpoint that is discarded.", message));
}
Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
if (currentMessageTimestamp.isBefore(oldestMessageTimestamp)) {
oldestMessageTimestamp = currentMessageTimestamp;
}
lastMessage = message;
} finally {
lock.writeLock().unlock();
}
}

Instant getOldestMessageTimestamp() {
lock.readLock().lock();
try {
return this.oldestMessageTimestamp;
} finally {
lock.readLock().unlock();
}
}

void discard() {
lock.writeLock().lock();
try {
this.discarded = true;
} finally {
lock.writeLock().unlock();
}
}

/**
* Create a new checkpoint mark based on the current preparer. This will reset the messages held
* by the preparer, and the owner of the preparer is responsible to create a new Jms session
* after this call.
*/
JmsCheckpointMark newCheckpoint(@Nullable MessageConsumer consumer, @Nullable Session session) {
JmsCheckpointMark checkpointMark;
lock.writeLock().lock();
try {
if (discarded) {
lastMessage = null;
checkpointMark = this.emptyCheckpoint();
} else {
checkpointMark =
new JmsCheckpointMark(oldestMessageTimestamp, lastMessage, consumer, session);
lastMessage = null;
oldestMessageTimestamp = Instant.now();
}
} finally {
lock.writeLock().unlock();
}
return checkpointMark;
}

JmsCheckpointMark emptyCheckpoint() {
return new JmsCheckpointMark(oldestMessageTimestamp, null, null, null);
}

boolean isEmpty() {
return lastMessage == null;
}
}
}
Loading

0 comments on commit c72a9f8

Please sign in to comment.