Skip to content

Commit

Permalink
[improve][cli][branch-3.0] PIP-353: Improve transaction message visib…
Browse files Browse the repository at this point in the history
…ility for peek-message (apache#22788)

(cherry picked from commit 3fd59d2)
  • Loading branch information
shibd authored and srinath-ctds committed Jun 7, 2024
1 parent f0c5776 commit b2ccc5f
Show file tree
Hide file tree
Showing 7 changed files with 364 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
Expand Down Expand Up @@ -2854,7 +2855,7 @@ public void readEntryFailed(ManagedLedgerException exception,
@Override
public void readEntryComplete(Entry entry, Object ctx) {
try {
results.complete(generateResponseWithEntry(entry));
results.complete(generateResponseWithEntry(entry, (PersistentTopic) topic));
} catch (IOException exception) {
throw new RestException(exception);
} finally {
Expand Down Expand Up @@ -2968,57 +2969,59 @@ private CompletableFuture<MessageId> findMessageIdByPublishTime(long timestamp,

protected CompletableFuture<Response> internalPeekNthMessageAsync(String subName, int messagePosition,
boolean authoritative) {
CompletableFuture<Void> ret;
// If the topic name is a partition name, no need to get partition topic metadata again
if (!topicName.isPartitioned()) {
ret = getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenCompose(topicMetadata -> {
if (topicMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Peek messages on a partitioned topic is not allowed");
}
return CompletableFuture.completedFuture(null);
});
} else {
ret = CompletableFuture.completedFuture(null);
}
return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES, subName))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
CompletableFuture<Entry> entry;
if (!(topic instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(),
topicName, subName);
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Peek messages on a non-persistent topic is not allowed");
} else {
if (subName.startsWith(((PersistentTopic) topic).getReplicatorPrefix())) {
PersistentReplicator repl = getReplicatorReference(subName, (PersistentTopic) topic);
entry = repl.peekNthMessage(messagePosition);
} else {
PersistentSubscription sub =
(PersistentSubscription) getSubscriptionReference(subName, (PersistentTopic) topic);
entry = sub.peekNthMessage(messagePosition);
}
}
return entry;
}).thenCompose(entry -> {
try {
Response response = generateResponseWithEntry(entry);
return CompletableFuture.completedFuture(response);
} catch (NullPointerException npe) {
throw new RestException(Status.NOT_FOUND, "Message not found");
} catch (Exception exception) {
log.error("[{}] Failed to peek message at position {} from {} {}", clientAppId(),
messagePosition, topicName, subName, exception);
throw new RestException(exception);
} finally {
if (entry != null) {
entry.release();
}
}
});
CompletableFuture<Void> ret = validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES, subName);
return ret.thenCompose(__ -> {
// If the topic name is a partition name, no need to get partition topic metadata again
if (!topicName.isPartitioned()) {
return getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenCompose(topicMetadata -> {
if (topicMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Peek messages on a partitioned topic is not allowed");
}
return CompletableFuture.completedFuture(null);
});
} else {
return CompletableFuture.completedFuture(null);
}
}).thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
CompletableFuture<Entry> entry;
if (!(topic instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(),
topicName, subName);
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Peek messages on a non-persistent topic is not allowed");
} else {
if (subName.startsWith(((PersistentTopic) topic).getReplicatorPrefix())) {
PersistentReplicator repl = getReplicatorReference(subName, (PersistentTopic) topic);
entry = repl.peekNthMessage(messagePosition);
} else {
PersistentSubscription sub =
(PersistentSubscription) getSubscriptionReference(subName, (PersistentTopic) topic);
entry = sub.peekNthMessage(messagePosition);
}
}
return entry.thenApply(e -> Pair.of(e, (PersistentTopic) topic));
}).thenCompose(entryTopicPair -> {
Entry entry = entryTopicPair.getLeft();
PersistentTopic persistentTopic = entryTopicPair.getRight();
try {
Response response = generateResponseWithEntry(entry, persistentTopic);
return CompletableFuture.completedFuture(response);
} catch (NullPointerException npe) {
throw new RestException(Status.NOT_FOUND, "Message not found");
} catch (Exception exception) {
log.error("[{}] Failed to peek message at position {} from {} {}", clientAppId(),
messagePosition, topicName, subName, exception);
throw new RestException(exception);
} finally {
if (entry != null) {
entry.release();
}
}
});
}

protected CompletableFuture<Response> internalExamineMessageAsync(String initialPosition, long messagePosition,
Expand Down Expand Up @@ -3082,17 +3085,19 @@ public String toString() {
PersistentTopicsBase.this.topicName);
}
}, null);
return future;
return future.thenApply(entry -> Pair.of(entry, (PersistentTopic) topic));
} catch (ManagedLedgerException exception) {
log.error("[{}] Failed to examine message at position {} from {} due to {}", clientAppId(),
messagePosition,
topicName, exception);
throw new RestException(exception);
}

}).thenApply(entry -> {
}).thenApply(entryTopicPair -> {
Entry entry = entryTopicPair.getLeft();
PersistentTopic persistentTopic = entryTopicPair.getRight();
try {
return generateResponseWithEntry(entry);
return generateResponseWithEntry(entry, persistentTopic);
} catch (IOException exception) {
throw new RestException(exception);
} finally {
Expand All @@ -3103,7 +3108,7 @@ public String toString() {
});
}

private Response generateResponseWithEntry(Entry entry) throws IOException {
private Response generateResponseWithEntry(Entry entry, PersistentTopic persistentTopic) throws IOException {
checkNotNull(entry);
PositionImpl pos = (PositionImpl) entry.getPosition();
ByteBuf metadataAndPayload = entry.getDataBuffer();
Expand Down Expand Up @@ -3221,6 +3226,14 @@ private Response generateResponseWithEntry(Entry entry) throws IOException {
if (metadata.hasNullPartitionKey()) {
responseBuilder.header("X-Pulsar-null-partition-key", metadata.isNullPartitionKey());
}
if (metadata.hasTxnidMostBits() && metadata.hasTxnidLeastBits()) {
TxnID txnID = new TxnID(metadata.getTxnidMostBits(), metadata.getTxnidLeastBits());
boolean isTxnAborted = persistentTopic.isTxnAborted(txnID, (PositionImpl) entry.getPosition());
responseBuilder.header("X-Pulsar-txn-aborted", isTxnAborted);
}
boolean isTxnUncommitted = ((PositionImpl) entry.getPosition())
.compareTo(persistentTopic.getMaxReadPosition()) > 0;
responseBuilder.header("X-Pulsar-txn-uncommitted", isTxnUncommitted);

// Decode if needed
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.http.HttpStatus;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
Expand All @@ -48,12 +49,16 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TransactionIsolationLevel;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.api.proto.MarkerType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
Expand Down Expand Up @@ -894,6 +899,127 @@ public void testGetPositionStatsInPendingAckStatsFroBatch() throws Exception {

}

@Test
public void testPeekMessageForSkipTxnMarker() throws Exception {
initTransaction(1);

final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/peek_marker");

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
int n = 10;
for (int i = 0; i < n; i++) {
Transaction txn = pulsarClient.newTransaction().build().get();
producer.newMessage(txn).value("msg").send();
txn.commit().get();
}

List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, "t-sub", n,
false, TransactionIsolationLevel.READ_UNCOMMITTED);
assertEquals(peekMsgs.size(), n);
for (Message<byte[]> peekMsg : peekMsgs) {
assertEquals(new String(peekMsg.getValue()), "msg");
}
}

@Test
public void testPeekMessageFoReadCommittedMessages() throws Exception {
initTransaction(1);

final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/peek_txn");

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
int n = 10;
// Alternately sends `n` committed transactional messages and `n` abort transactional messages.
for (int i = 0; i < 2 * n; i++) {
Transaction txn = pulsarClient.newTransaction().build().get();
if (i % 2 == 0) {
producer.newMessage(txn).value("msg").send();
txn.commit().get();
} else {
producer.newMessage(txn).value("msg-aborted").send();
txn.abort();
}
}
// Then sends 1 uncommitted transactional messages.
Transaction txn = pulsarClient.newTransaction().build().get();
producer.newMessage(txn).value("msg-uncommitted").send();
// Then sends n-1 no transaction messages.
for (int i = 0; i < n - 1; i++) {
producer.newMessage().value("msg-after-uncommitted").send();
}

// peek n message, all messages value should be "msg"
{
List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, "t-sub", n,
false, TransactionIsolationLevel.READ_COMMITTED);
assertEquals(peekMsgs.size(), n);
for (Message<byte[]> peekMsg : peekMsgs) {
assertEquals(new String(peekMsg.getValue()), "msg");
}
}

// peek 3 * n message, and still get n message, all messages value should be "msg"
{
List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, "t-sub", 2 * n,
false, TransactionIsolationLevel.READ_COMMITTED);
assertEquals(peekMsgs.size(), n);
for (Message<byte[]> peekMsg : peekMsgs) {
assertEquals(new String(peekMsg.getValue()), "msg");
}
}
}

@Test
public void testPeekMessageForShowAllMessages() throws Exception {
initTransaction(1);

final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/peek_all");

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
int n = 10;
// Alternately sends `n` committed transactional messages and `n` abort transactional messages.
for (int i = 0; i < 2 * n; i++) {
Transaction txn = pulsarClient.newTransaction().build().get();
if (i % 2 == 0) {
producer.newMessage(txn).value("msg").send();
txn.commit().get();
} else {
producer.newMessage(txn).value("msg-aborted").send();
txn.abort();
}
}
// Then sends `n` uncommitted transactional messages.
Transaction txn = pulsarClient.newTransaction().build().get();
for (int i = 0; i < n; i++) {
producer.newMessage(txn).value("msg-uncommitted").send();
}

// peek 5 * n message, will get 5 * n msg.
List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, "t-sub", 5 * n,
true, TransactionIsolationLevel.READ_UNCOMMITTED);
assertEquals(peekMsgs.size(), 5 * n);

for (int i = 0; i < 4 * n; i++) {
Message<byte[]> peekMsg = peekMsgs.get(i);
MessageImpl peekMsgImpl = (MessageImpl) peekMsg;
MessageMetadata metadata = peekMsgImpl.getMessageBuilder();
if (metadata.hasMarkerType()) {
assertTrue(metadata.getMarkerType() == MarkerType.TXN_COMMIT_VALUE ||
metadata.getMarkerType() == MarkerType.TXN_ABORT_VALUE);
} else {
String value = new String(peekMsg.getValue());
assertTrue(value.equals("msg") || value.equals("msg-aborted"));
}
}
for (int i = 4 * n; i < peekMsgs.size(); i++) {
Message<byte[]> peekMsg = peekMsgs.get(i);
assertEquals(new String(peekMsg.getValue()), "msg-uncommitted");
}
}

private static void verifyCoordinatorStats(String state,
long sequenceId, long lowWaterMark) {
assertEquals(state, "Ready");
Expand Down
Loading

0 comments on commit b2ccc5f

Please sign in to comment.