Skip to content

Commit

Permalink
Ensure chunks released on cancel in StringDecoder
Browse files Browse the repository at this point in the history
The current test were not catching the issue because they request 1
via StepVerifier, wait for it, and then cancel. In the case of
StringDecoder it means all chunks are used up to produce that first
String and so the cancel doesn't catch any cached chunks.

Closes gh-30299
  • Loading branch information
rstoyanchev committed May 9, 2023
1 parent 93fa010 commit e737980
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public Flux<String> decode(Publisher<DataBuffer> input, ResolvableType elementTy
chunks.clear();
return Mono.just(lastBuffer);
}))
.doOnTerminate(chunks::releaseAndClear)
.doFinally(signalType -> chunks.releaseAndClear())
.doOnDiscard(DataBuffer.class, DataBufferUtils::release)
.map(buffer -> decode(buffer, elementType, mimeType, hints));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,12 +18,15 @@

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

Expand Down Expand Up @@ -75,14 +78,15 @@ public void decode() {
String s = String.format("%s\n%s\n%s", u, e, o);
Flux<DataBuffer> input = toDataBuffers(s, 1, UTF_8);

// TODO: temporarily replace testDecodeAll with explicit decode/cancel/empty
// see https://github.com/reactor/reactor-core/issues/2041
testDecodeAll(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null);
}

// testDecode(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null);
// testDecodeCancel(input, TYPE, null, null);
// testDecodeEmpty(TYPE, null, null);
@Test // gh-30299
public void decodeAndCancelWithPendingChunks() {
Flux<DataBuffer> input = toDataBuffers("abc", 1, UTF_8).concatWith(Flux.never());
Flux<String> result = this.decoder.decode(input, TYPE, null, null);

testDecodeAll(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null);
StepVerifier.create(result).thenAwait(Duration.ofMillis(100)).thenCancel().verify();
}

@Test
Expand Down Expand Up @@ -264,4 +268,13 @@ private DataBuffer stringBuffer(String value) {
return buffer;
}


private static class SingleRequestSubscriber extends BaseSubscriber<String> {

@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(1);
}
}

}

0 comments on commit e737980

Please sign in to comment.