Skip to content

Commit

Permalink
Process tokens after each feed in Jackson2Tokenizer
Browse files Browse the repository at this point in the history
This commit ensures that we process after each fed buffer in
Jackson2Tokenizer, instead of after all fed buffers.

Closes gh-31747
  • Loading branch information
poutsma committed Dec 6, 2023
1 parent 1afea0b commit 0e6c17f
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,23 @@ private Jackson2Tokenizer(JsonParser parser, DeserializationContext deserializat
private List<TokenBuffer> tokenize(DataBuffer dataBuffer) {
try {
int bufferSize = dataBuffer.readableByteCount();
List<TokenBuffer> tokens = new ArrayList<>();
if (this.inputFeeder instanceof ByteBufferFeeder byteBufferFeeder) {
try (DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers()) {
while (iterator.hasNext()) {
byteBufferFeeder.feedInput(iterator.next());
parseTokens(tokens);
}
}
}
else if (this.inputFeeder instanceof ByteArrayFeeder byteArrayFeeder) {
byte[] bytes = new byte[bufferSize];
dataBuffer.read(bytes);
byteArrayFeeder.feedInput(bytes, 0, bufferSize);
parseTokens(tokens);
}
List<TokenBuffer> result = parseTokenBufferFlux();
assertInMemorySize(bufferSize, result);
return result;
assertInMemorySize(bufferSize, tokens);
return tokens;
}
catch (JsonProcessingException ex) {
throw new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex);
Expand All @@ -122,7 +124,9 @@ private Flux<TokenBuffer> endOfInput() {
return Flux.defer(() -> {
this.inputFeeder.endOfInput();
try {
return Flux.fromIterable(parseTokenBufferFlux());
List<TokenBuffer> tokens = new ArrayList<>();
parseTokens(tokens);
return Flux.fromIterable(tokens);
}
catch (JsonProcessingException ex) {
throw new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex);
Expand All @@ -133,9 +137,7 @@ private Flux<TokenBuffer> endOfInput() {
});
}

private List<TokenBuffer> parseTokenBufferFlux() throws IOException {
List<TokenBuffer> result = new ArrayList<>();

private void parseTokens(List<TokenBuffer> tokens) throws IOException {
// SPR-16151: Smile data format uses null to separate documents
boolean previousNull = false;
while (!this.parser.isClosed()) {
Expand All @@ -153,13 +155,12 @@ else if (token == null ) { // !previousNull
}
updateDepth(token);
if (!this.tokenizeArrayElements) {
processTokenNormal(token, result);
processTokenNormal(token, tokens);
}
else {
processTokenArray(token, result);
processTokenArray(token, tokens);
}
}
return result;
}

private void updateDepth(JsonToken token) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.util.TokenBuffer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import org.json.JSONException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -37,6 +41,7 @@
import org.springframework.core.codec.DecodingException;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferLimitException;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.core.testfixture.io.buffer.AbstractLeakCheckingTests;

import static java.util.Arrays.asList;
Expand Down Expand Up @@ -337,22 +342,47 @@ public void useBigDecimalForFloats() {
.verifyComplete();
}

// gh-31747
@Test
public void compositeNettyBuffer() {
ByteBufAllocator allocator = UnpooledByteBufAllocator.DEFAULT;
ByteBuf firstByteBuf = allocator.buffer();
firstByteBuf.writeBytes("{\"foo\": \"foofoo\"".getBytes(StandardCharsets.UTF_8));
ByteBuf secondBuf = allocator.buffer();
secondBuf.writeBytes(", \"bar\": \"barbar\"}".getBytes(StandardCharsets.UTF_8));
CompositeByteBuf composite = allocator.compositeBuffer();
composite.addComponent(true, firstByteBuf);
composite.addComponent(true, secondBuf);

NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(allocator);
Flux<DataBuffer> source = Flux.just(bufferFactory.wrap(composite));
Flux<TokenBuffer> tokens = Jackson2Tokenizer.tokenize(source, this.jsonFactory, this.objectMapper, false, false, -1);

Flux<String> strings = tokens.map(this::tokenToString);

StepVerifier.create(strings)
.assertNext(s -> assertThat(s).isEqualTo("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}"))
.verifyComplete();
}


private Flux<String> decode(List<String> source, boolean tokenize, int maxInMemorySize) {

Flux<TokenBuffer> tokens = Jackson2Tokenizer.tokenize(
Flux.fromIterable(source).map(this::stringBuffer),
this.jsonFactory, this.objectMapper, tokenize, false, maxInMemorySize);

return tokens
.map(tokenBuffer -> {
try {
TreeNode root = this.objectMapper.readTree(tokenBuffer.asParser());
return this.objectMapper.writeValueAsString(root);
}
catch (IOException ex) {
throw new UncheckedIOException(ex);
}
});
return tokens.map(this::tokenToString);
}

private String tokenToString(TokenBuffer tokenBuffer) {
try {
TreeNode root = this.objectMapper.readTree(tokenBuffer.asParser());
return this.objectMapper.writeValueAsString(root);
}
catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}

private DataBuffer stringBuffer(String value) {
Expand Down

0 comments on commit 0e6c17f

Please sign in to comment.