Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Use pooled direct memory allocator when decoding Pulsar entry to Kafka records #673

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package io.streamnative.pulsar.handlers.kop.format;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NonNull;
import org.apache.kafka.common.record.MemoryRecords;

/**
Expand All @@ -25,10 +27,10 @@
@AllArgsConstructor
public class DecodeResult {

private MemoryRecords records;
private @NonNull MemoryRecords records;
private ByteBuf releasedByteBuf;

public DecodeResult(MemoryRecords records) {
public DecodeResult(@NonNull MemoryRecords records) {
this.records = records;
}

Expand All @@ -37,4 +39,12 @@ public void release() {
releasedByteBuf.release();
}
}

public @NonNull ByteBuf getOrCreateByteBuf() {
if (releasedByteBuf != null) {
return releasedByteBuf;
} else {
return Unpooled.wrappedBuffer(records.buffer());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.format;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.nio.ByteBuffer;
import lombok.Getter;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;

/**
* The OutputStream class that uses direct buffer from Netty's buffer allocator as its underlying buffer.
*
* The methods that may be called in `MemoryRecordsBuilder` are all overridden.
*/
public class DirectBufferOutputStream extends ByteBufferOutputStream {

private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
private static final ByteBufAllocator ALLOCATOR = PulsarByteBufAllocator.DEFAULT;

private final int initialCapacity;
@Getter
private final ByteBuf byteBuf;

public DirectBufferOutputStream(int initialCapacity) {
super(EMPTY_BUFFER);
this.initialCapacity = initialCapacity;
this.byteBuf = ALLOCATOR.directBuffer(initialCapacity);
}

@Override
public void write(int b) {
byteBuf.writeByte(b);
}

@Override
public void write(byte[] bytes, int off, int len) {
byteBuf.writeBytes(bytes, off, len);
}

@Override
public void write(ByteBuffer sourceBuffer) {
byteBuf.writeBytes(sourceBuffer);
}

@Override
public ByteBuffer buffer() {
// When this method is called, the internal NIO ByteBuffer should be treated as a buffer that has only been
// written. In this case, the position should be the same with the limit because the caller side will usually
// call `ByteBuffer#flip()` to reset position and limit.
final ByteBuffer byteBuffer = byteBuf.nioBuffer();
byteBuffer.position(byteBuffer.limit());
return byteBuffer;
}

@Override
public int position() {
return byteBuf.readerIndex();
}

@Override
public void position(int position) {
if (position > byteBuf.capacity()) {
byteBuf.capacity(position);
}
byteBuf.writerIndex(position);
}

@Override
public int initialCapacity() {
return initialCapacity;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ public DecodeResult decode(List<Entry> entries, byte magic) {
orderedByteBuf.add(byteBuf.slice(byteBuf.readerIndex(), byteBuf.readableBytes()));
}
} else {
final MemoryRecords records =
final DecodeResult decodeResult =
ByteBufUtils.decodePulsarEntryToKafkaRecords(metadata, byteBuf, startOffset, magic);
final ByteBuf kafkaBuffer = Unpooled.wrappedBuffer(records.buffer());
final ByteBuf kafkaBuffer = decodeResult.getOrCreateByteBuf();
orderedByteBuf.add(kafkaBuffer);
if (!optionalByteBufs.isPresent()) {
optionalByteBufs = Optional.of(new ArrayList<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public class PulsarEntryFormatter implements EntryFormatter {
private static final int INITIAL_BATCH_BUFFER_SIZE = 1024;
private static final int MAX_MESSAGE_BATCH_SIZE_BYTES = 128 * 1024;

private static final DecodeResult EMPTY_DECODE_RESULT = new DecodeResult(
MemoryRecords.readableRecords(ByteBuffer.allocate(0)));

@Override
public ByteBuf encode(final MemoryRecords records, final int numMessages) {
long currentBatchSizeBytes = 0;
Expand Down Expand Up @@ -100,7 +103,7 @@ public ByteBuf encode(final MemoryRecords records, final int numMessages) {

@Override
public DecodeResult decode(final List<Entry> entries, final byte magic) {
final List<MemoryRecords> recordsList = new ArrayList<>();
final List<DecodeResult> decodeResults = new ArrayList<>();

entries.parallelStream().forEachOrdered(entry -> {
try {
Expand All @@ -111,7 +114,7 @@ public DecodeResult decode(final List<Entry> entries, final byte magic) {
Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
MessageMetadata msgMetadata = Commands.parseMessageMetadata(metadataAndPayload);

recordsList.add(ByteBufUtils.decodePulsarEntryToKafkaRecords(
decodeResults.add(ByteBufUtils.decodePulsarEntryToKafkaRecords(
msgMetadata, metadataAndPayload, baseOffset, magic));
} catch (KoPMessageMetadataNotFoundException | IOException e) { // skip failed decode entry
log.error("[{}:{}] Failed to decode entry", entry.getLedgerId(), entry.getEntryId());
Expand All @@ -120,14 +123,17 @@ public DecodeResult decode(final List<Entry> entries, final byte magic) {
}
});

if (recordsList.isEmpty()) {
return new DecodeResult(MemoryRecords.readableRecords(ByteBuffer.allocate(0)));
} else if (recordsList.size() == 1) {
return new DecodeResult(recordsList.get(0));
if (decodeResults.isEmpty()) {
return EMPTY_DECODE_RESULT;
} else if (decodeResults.size() == 1) {
return decodeResults.get(0);
} else {
final int totalSize = recordsList.stream().mapToInt(MemoryRecords::sizeInBytes).sum();
final int totalSize = decodeResults.stream()
.mapToInt(decodeResult -> decodeResult.getRecords().sizeInBytes())
.sum();
final ByteBuf mergedBuffer = PulsarByteBufAllocator.DEFAULT.directBuffer(totalSize);
recordsList.forEach(records -> mergedBuffer.writeBytes(records.buffer()));
decodeResults.forEach(decodeResult -> mergedBuffer.writeBytes(decodeResult.getRecords().buffer()));
decodeResults.forEach(DecodeResult::release);
return new DecodeResult(MemoryRecords.readableRecords(mergedBuffer.nioBuffer()), mergedBuffer);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import static java.nio.charset.StandardCharsets.UTF_8;

import io.netty.buffer.ByteBuf;
import io.streamnative.pulsar.handlers.kop.format.DecodeResult;
import io.streamnative.pulsar.handlers.kop.format.DirectBufferOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Base64;
Expand Down Expand Up @@ -93,28 +95,28 @@ public static ByteBuffer getNioBuffer(ByteBuf buffer) {
return ByteBuffer.wrap(bytes);
}

public static MemoryRecords decodePulsarEntryToKafkaRecords(final MessageMetadata metadata,
final ByteBuf payload,
final long baseOffset,
final byte magic) throws IOException {
public static DecodeResult decodePulsarEntryToKafkaRecords(final MessageMetadata metadata,
final ByteBuf payload,
final long baseOffset,
final byte magic) throws IOException {
if (metadata.hasMarkerType()
&& (metadata.getMarkerType() == MarkerType.TXN_COMMIT_VALUE
|| metadata.getMarkerType() == MarkerType.TXN_ABORT_VALUE)) {
return MemoryRecords.withEndTransactionMarker(
return new DecodeResult(MemoryRecords.withEndTransactionMarker(
baseOffset,
metadata.getPublishTime(),
0,
metadata.getTxnidMostBits(),
(short) metadata.getTxnidLeastBits(),
new EndTransactionMarker(metadata.getMarkerType() == MarkerType.TXN_COMMIT_VALUE
? ControlRecordType.COMMIT : ControlRecordType.ABORT, 0));
? ControlRecordType.COMMIT : ControlRecordType.ABORT, 0)));
}
final int uncompressedSize = metadata.getUncompressedSize();
final CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
final ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize);

final ByteBuffer byteBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
final MemoryRecordsBuilder builder = new MemoryRecordsBuilder(byteBuffer,
final DirectBufferOutputStream directBufferOutputStream = new DirectBufferOutputStream(DEFAULT_BUFFER_SIZE);
final MemoryRecordsBuilder builder = new MemoryRecordsBuilder(directBufferOutputStream,
magic,
CompressionType.NONE,
TimestampType.CREATE_TIME,
Expand Down Expand Up @@ -181,8 +183,7 @@ public static MemoryRecords decodePulsarEntryToKafkaRecords(final MessageMetadat

final MemoryRecords records = builder.build();
uncompressedPayload.release();
byteBuffer.flip();
return records;
return new DecodeResult(records, directBufferOutputStream.getByteBuf());
}

@NonNull
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.format;

import java.nio.ByteBuffer;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/**
* Test for {@link DirectBufferOutputStream}.
*/
public class DirectBufferOutputStreamTest {

private static final int WRITE_LIMIT = 100 * 1024 * 1024;
private static final long LOG_APPEND_TIME = System.currentTimeMillis();

@DataProvider
public static Object[][] initialCapacityAndNumRecords() {
return new Object[][]{
{ 1, 10 },
{ 1024 * 1024, 10 },
{ 1024 * 1024, 1024 },
};
}

@Test(dataProvider = "initialCapacityAndNumRecords")
public void testBuildMemoryRecords(int initialCapacity, int numRecords) {
final MemoryRecordsBuilder heapMemoryRecordsBuilder =
newMemoryRecordsBuilder(new ByteBufferOutputStream(initialCapacity));
// We must expose the DirectBufferOutputStream because we need to release the internal ByteBuf later
final DirectBufferOutputStream directBufferOutputStream = new DirectBufferOutputStream(initialCapacity);
final MemoryRecordsBuilder directMemoryRecordsBuilder = newMemoryRecordsBuilder(directBufferOutputStream);
final ByteBuffer valueBuffer = ByteBuffer.allocate(1024);

for (int i = 0; i < numRecords; i++) {
heapMemoryRecordsBuilder.appendWithOffset(i, LOG_APPEND_TIME + i, null, valueBuffer.duplicate());
directMemoryRecordsBuilder.appendWithOffset(i, LOG_APPEND_TIME + i, null, valueBuffer.duplicate());
}

final ByteBuffer heapBuffer = heapMemoryRecordsBuilder.build().buffer();
final ByteBuffer directBuffer = directMemoryRecordsBuilder.build().buffer();
System.out.println("heapBuffer size: " + heapBuffer.limit() + ", directBuffer size: " + directBuffer.limit());
Assert.assertEquals(heapBuffer, directBuffer);

Assert.assertEquals(directBufferOutputStream.getByteBuf().refCnt(), 1);
directBufferOutputStream.getByteBuf().release();
}

private static MemoryRecordsBuilder newMemoryRecordsBuilder(final ByteBufferOutputStream bufferStream) {
return new MemoryRecordsBuilder(bufferStream,
RecordBatch.MAGIC_VALUE_V2,
CompressionType.NONE,
TimestampType.CREATE_TIME,
0,
LOG_APPEND_TIME,
RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE,
false,
false,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
WRITE_LIMIT);
}
}