From dd3a67c7ab6b0417e5ed7a66ab97ee59c10edf85 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Wed, 6 Dec 2023 14:24:18 +0100 Subject: [PATCH] Process tokens after each feed in Jackson2Tokenizer This commit ensures that we process after each fed buffer in Jackson2Tokenizer, instead of after all fed buffers. See gh-31747 Closes gh-31772 --- .../http/codec/json/Jackson2Tokenizer.java | 21 ++++---- .../codec/json/Jackson2TokenizerTests.java | 50 +++++++++++++++---- 2 files changed, 51 insertions(+), 20 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2Tokenizer.java b/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2Tokenizer.java index 9b56435503a8..427d8025a7ad 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2Tokenizer.java +++ b/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2Tokenizer.java @@ -91,10 +91,12 @@ private Jackson2Tokenizer(JsonParser parser, DeserializationContext deserializat private List tokenize(DataBuffer dataBuffer) { try { int bufferSize = dataBuffer.readableByteCount(); + List tokens = new ArrayList<>(); if (this.inputFeeder instanceof ByteBufferFeeder byteBufferFeeder) { try (DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers()) { while (iterator.hasNext()) { byteBufferFeeder.feedInput(iterator.next()); + parseTokens(tokens); } } } @@ -102,10 +104,10 @@ else if (this.inputFeeder instanceof ByteArrayFeeder byteArrayFeeder) { byte[] bytes = new byte[bufferSize]; dataBuffer.read(bytes); byteArrayFeeder.feedInput(bytes, 0, bufferSize); + parseTokens(tokens); } - List result = parseTokenBufferFlux(); - assertInMemorySize(bufferSize, result); - return result; + assertInMemorySize(bufferSize, tokens); + return tokens; } catch (JsonProcessingException ex) { throw new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex); @@ -122,7 +124,9 @@ private Flux endOfInput() { return Flux.defer(() -> { this.inputFeeder.endOfInput(); try { - return Flux.fromIterable(parseTokenBufferFlux()); + List tokens = new ArrayList<>(); + parseTokens(tokens); + return Flux.fromIterable(tokens); } catch (JsonProcessingException ex) { throw new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex); @@ -133,9 +137,7 @@ private Flux endOfInput() { }); } - private List parseTokenBufferFlux() throws IOException { - List result = new ArrayList<>(); - + private void parseTokens(List tokens) throws IOException { // SPR-16151: Smile data format uses null to separate documents boolean previousNull = false; while (!this.parser.isClosed()) { @@ -153,13 +155,12 @@ else if (token == null ) { // !previousNull } updateDepth(token); if (!this.tokenizeArrayElements) { - processTokenNormal(token, result); + processTokenNormal(token, tokens); } else { - processTokenArray(token, result); + processTokenArray(token, tokens); } } - return result; } private void updateDepth(JsonToken token) { diff --git a/spring-web/src/test/java/org/springframework/http/codec/json/Jackson2TokenizerTests.java b/spring-web/src/test/java/org/springframework/http/codec/json/Jackson2TokenizerTests.java index 31641b9347d2..ab9820fa83ba 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/json/Jackson2TokenizerTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/json/Jackson2TokenizerTests.java @@ -27,6 +27,10 @@ import com.fasterxml.jackson.core.TreeNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.util.TokenBuffer; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.UnpooledByteBufAllocator; import org.json.JSONException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -39,6 +43,7 @@ import org.springframework.core.codec.DecodingException; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferLimitException; +import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.core.testfixture.io.buffer.AbstractLeakCheckingTests; import static java.util.Arrays.asList; @@ -345,22 +350,47 @@ public void useBigDecimalForFloats(boolean useBigDecimalForFloats) { .verifyComplete(); } + // gh-31747 + @Test + public void compositeNettyBuffer() { + ByteBufAllocator allocator = UnpooledByteBufAllocator.DEFAULT; + ByteBuf firstByteBuf = allocator.buffer(); + firstByteBuf.writeBytes("{\"foo\": \"foofoo\"".getBytes(StandardCharsets.UTF_8)); + ByteBuf secondBuf = allocator.buffer(); + secondBuf.writeBytes(", \"bar\": \"barbar\"}".getBytes(StandardCharsets.UTF_8)); + CompositeByteBuf composite = allocator.compositeBuffer(); + composite.addComponent(true, firstByteBuf); + composite.addComponent(true, secondBuf); + + NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(allocator); + Flux source = Flux.just(bufferFactory.wrap(composite)); + Flux tokens = Jackson2Tokenizer.tokenize(source, this.jsonFactory, this.objectMapper, false, false, -1); + + Flux strings = tokens.map(this::tokenToString); + + StepVerifier.create(strings) + .assertNext(s -> assertThat(s).isEqualTo("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}")) + .verifyComplete(); + } + + private Flux decode(List source, boolean tokenize, int maxInMemorySize) { Flux tokens = Jackson2Tokenizer.tokenize( Flux.fromIterable(source).map(this::stringBuffer), this.jsonFactory, this.objectMapper, tokenize, false, maxInMemorySize); - return tokens - .map(tokenBuffer -> { - try { - TreeNode root = this.objectMapper.readTree(tokenBuffer.asParser()); - return this.objectMapper.writeValueAsString(root); - } - catch (IOException ex) { - throw new UncheckedIOException(ex); - } - }); + return tokens.map(this::tokenToString); + } + + private String tokenToString(TokenBuffer tokenBuffer) { + try { + TreeNode root = this.objectMapper.readTree(tokenBuffer.asParser()); + return this.objectMapper.writeValueAsString(root); + } + catch (IOException ex) { + throw new UncheckedIOException(ex); + } } private DataBuffer stringBuffer(String value) {