Skip to content

Commit

Permalink
ensures InMemoryResumableFramesStore does not retain not resumable fr…
Browse files Browse the repository at this point in the history
…ames (#1009)

Signed-off-by: Oleh Dokuka <[email protected]>
  • Loading branch information
OlegDokuka committed May 24, 2021
1 parent 2816a79 commit 627f590
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,12 @@ public void onComplete() {
public void onNext(ByteBuf frame) {
final int state;
final boolean isResumable = isResumableFrame(frame);
boolean canBeStore = isResumable;
if (isResumable) {
final ArrayList<ByteBuf> frames = cachedFrames;
int incomingFrameSize = frame.readableBytes();
final int incomingFrameSize = frame.readableBytes();
final int cacheLimit = this.cacheLimit;

if (cacheLimit != Integer.MAX_VALUE) {
long availableSize = cacheLimit - cacheSize;
if (availableSize < incomingFrameSize) {
Expand All @@ -256,26 +258,36 @@ public void onNext(ByteBuf frame) {
}
}
CACHE_SIZE.addAndGet(this, -removedBytes);
POSITION.addAndGet(this, removedBytes);

canBeStore = availableSize >= incomingFrameSize;
POSITION.addAndGet(this, removedBytes + (canBeStore ? 0 : incomingFrameSize));
} else {
canBeStore = true;
}
} else {
canBeStore = true;
}
synchronized (this) {
state = this.state;
if (state != 2) {
frames.add(frame);

state = this.state;
if (canBeStore) {
synchronized (this) {
if (state != 2) {
frames.add(frame);
}
}

if (cacheLimit != Integer.MAX_VALUE) {
CACHE_SIZE.addAndGet(this, incomingFrameSize);
}
}
if (cacheLimit != Integer.MAX_VALUE) {
CACHE_SIZE.addAndGet(this, incomingFrameSize);
}
} else {
state = this.state;
}

final CoreSubscriber<? super ByteBuf> actual = this.actual;
if (state == 1) {
actual.onNext(frame.retain());
} else if (!isResumable || state == 2) {
actual.onNext(isResumable && canBeStore ? frame.retainedSlice() : frame);
} else if (!isResumable || !canBeStore || state == 2) {
frame.release();
}
}
Expand All @@ -302,7 +314,7 @@ public void subscribe(CoreSubscriber<? super ByteBuf> actual) {
actual.onSubscribe(this);
synchronized (this) {
for (final ByteBuf frame : cachedFrames) {
actual.onNext(frame.retain());
actual.onNext(frame.retainedSlice());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,93 +1,125 @@
// package io.rsocket.resume;
//
// import io.netty.buffer.ByteBuf;
// import io.netty.buffer.Unpooled;
// import java.util.Arrays;
// import org.junit.Assert;
// import org.junit.jupiter.api.Test;
// import reactor.core.publisher.Flux;
//
// public class InMemoryResumeStoreTest {
//
// @Test
// void saveWithoutTailRemoval() {
// InMemoryResumableFramesStore store = inMemoryStore(25);
// ByteBuf frame = frameMock(10);
// store.saveFrames(Flux.just(frame)).block();
// Assert.assertEquals(1, store.cachedFrames.size());
// Assert.assertEquals(frame.readableBytes(), store.cacheSize);
// Assert.assertEquals(0, store.position);
// }
//
// @Test
// void saveRemoveOneFromTail() {
// InMemoryResumableFramesStore store = inMemoryStore(25);
// ByteBuf frame1 = frameMock(20);
// ByteBuf frame2 = frameMock(10);
// store.saveFrames(Flux.just(frame1, frame2)).block();
// Assert.assertEquals(1, store.cachedFrames.size());
// Assert.assertEquals(frame2.readableBytes(), store.cacheSize);
// Assert.assertEquals(frame1.readableBytes(), store.position);
// }
//
// @Test
// void saveRemoveTwoFromTail() {
// InMemoryResumableFramesStore store = inMemoryStore(25);
// ByteBuf frame1 = frameMock(10);
// ByteBuf frame2 = frameMock(10);
// ByteBuf frame3 = frameMock(20);
// store.saveFrames(Flux.just(frame1, frame2, frame3)).block();
// Assert.assertEquals(1, store.cachedFrames.size());
// Assert.assertEquals(frame3.readableBytes(), store.cacheSize);
// Assert.assertEquals(size(frame1, frame2), store.position);
// }
//
// @Test
// void saveBiggerThanStore() {
// InMemoryResumableFramesStore store = inMemoryStore(25);
// ByteBuf frame1 = frameMock(10);
// ByteBuf frame2 = frameMock(10);
// ByteBuf frame3 = frameMock(30);
// store.saveFrames(Flux.just(frame1, frame2, frame3)).block();
// Assert.assertEquals(0, store.cachedFrames.size());
// Assert.assertEquals(0, store.cacheSize);
// Assert.assertEquals(size(frame1, frame2, frame3), store.position);
// }
//
// @Test
// void releaseFrames() {
// InMemoryResumableFramesStore store = inMemoryStore(100);
// ByteBuf frame1 = frameMock(10);
// ByteBuf frame2 = frameMock(10);
// ByteBuf frame3 = frameMock(30);
// store.saveFrames(Flux.just(frame1, frame2, frame3)).block();
// store.releaseFrames(20);
// Assert.assertEquals(1, store.cachedFrames.size());
// Assert.assertEquals(frame3.readableBytes(), store.cacheSize);
// Assert.assertEquals(size(frame1, frame2), store.position);
// }
//
// @Test
// void receiveImpliedPosition() {
// InMemoryResumableFramesStore store = inMemoryStore(100);
// ByteBuf frame1 = frameMock(10);
// ByteBuf frame2 = frameMock(30);
// store.resumableFrameReceived(frame1);
// store.resumableFrameReceived(frame2);
// Assert.assertEquals(size(frame1, frame2), store.frameImpliedPosition());
// }
//
// private int size(ByteBuf... byteBufs) {
// return Arrays.stream(byteBufs).mapToInt(ByteBuf::readableBytes).sum();
// }
//
// private static InMemoryResumableFramesStore inMemoryStore(int size) {
// return new InMemoryResumableFramesStore("test", size);
// }
//
// private static ByteBuf frameMock(int size) {
// byte[] bytes = new byte[size];
// Arrays.fill(bytes, (byte) 7);
// return Unpooled.wrappedBuffer(bytes);
// }
// }
package io.rsocket.resume;

import static org.assertj.core.api.Assertions.assertThat;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.Arrays;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;

public class InMemoryResumeStoreTest {

@Test
void saveNonResumableFrame() {
InMemoryResumableFramesStore store = inMemoryStore(25);
ByteBuf frame1 = fakeConnectionFrame(10);
ByteBuf frame2 = fakeConnectionFrame(35);
store.saveFrames(Flux.just(frame1, frame2)).block();
assertThat(store.cachedFrames.size()).isZero();
assertThat(store.cacheSize).isZero();
assertThat(store.position).isZero();
assertThat(frame1.refCnt()).isZero();
assertThat(frame2.refCnt()).isZero();
}

@Test
void saveWithoutTailRemoval() {
InMemoryResumableFramesStore store = inMemoryStore(25);
ByteBuf frame = fakeResumableFrame(10);
store.saveFrames(Flux.just(frame)).block();
assertThat(store.cachedFrames.size()).isEqualTo(1);
assertThat(store.cacheSize).isEqualTo(frame.readableBytes());
assertThat(store.position).isZero();
assertThat(frame.refCnt()).isOne();
}

@Test
void saveRemoveOneFromTail() {
InMemoryResumableFramesStore store = inMemoryStore(25);
ByteBuf frame1 = fakeResumableFrame(20);
ByteBuf frame2 = fakeResumableFrame(10);
store.saveFrames(Flux.just(frame1, frame2)).block();
assertThat(store.cachedFrames.size()).isOne();
assertThat(store.cacheSize).isEqualTo(frame2.readableBytes());
assertThat(store.position).isEqualTo(frame1.readableBytes());
assertThat(frame1.refCnt()).isZero();
assertThat(frame2.refCnt()).isOne();
}

@Test
void saveRemoveTwoFromTail() {
InMemoryResumableFramesStore store = inMemoryStore(25);
ByteBuf frame1 = fakeResumableFrame(10);
ByteBuf frame2 = fakeResumableFrame(10);
ByteBuf frame3 = fakeResumableFrame(20);
store.saveFrames(Flux.just(frame1, frame2, frame3)).block();
assertThat(store.cachedFrames.size()).isOne();
assertThat(store.cacheSize).isEqualTo(frame3.readableBytes());
assertThat(store.position).isEqualTo(size(frame1, frame2));
assertThat(frame1.refCnt()).isZero();
assertThat(frame2.refCnt()).isZero();
assertThat(frame3.refCnt()).isOne();
}

@Test
void saveBiggerThanStore() {
InMemoryResumableFramesStore store = inMemoryStore(25);
ByteBuf frame1 = fakeResumableFrame(10);
ByteBuf frame2 = fakeResumableFrame(10);
ByteBuf frame3 = fakeResumableFrame(30);
store.saveFrames(Flux.just(frame1, frame2, frame3)).block();
assertThat(store.cachedFrames.size()).isZero();
assertThat(store.cacheSize).isZero();
assertThat(store.position).isEqualTo(size(frame1, frame2, frame3));
assertThat(frame1.refCnt()).isZero();
assertThat(frame2.refCnt()).isZero();
assertThat(frame3.refCnt()).isZero();
}

@Test
void releaseFrames() {
InMemoryResumableFramesStore store = inMemoryStore(100);
ByteBuf frame1 = fakeResumableFrame(10);
ByteBuf frame2 = fakeResumableFrame(10);
ByteBuf frame3 = fakeResumableFrame(30);
store.saveFrames(Flux.just(frame1, frame2, frame3)).block();
store.releaseFrames(20);
assertThat(store.cachedFrames.size()).isOne();
assertThat(store.cacheSize).isEqualTo(frame3.readableBytes());
assertThat(store.position).isEqualTo(size(frame1, frame2));
assertThat(frame1.refCnt()).isZero();
assertThat(frame2.refCnt()).isZero();
assertThat(frame3.refCnt()).isOne();
}

@Test
void receiveImpliedPosition() {
InMemoryResumableFramesStore store = inMemoryStore(100);
ByteBuf frame1 = fakeResumableFrame(10);
ByteBuf frame2 = fakeResumableFrame(30);
store.resumableFrameReceived(frame1);
store.resumableFrameReceived(frame2);
assertThat(store.frameImpliedPosition()).isEqualTo(size(frame1, frame2));
}

private int size(ByteBuf... byteBufs) {
return Arrays.stream(byteBufs).mapToInt(ByteBuf::readableBytes).sum();
}

private static InMemoryResumableFramesStore inMemoryStore(int size) {
return new InMemoryResumableFramesStore("test", size);
}

private static ByteBuf fakeResumableFrame(int size) {
byte[] bytes = new byte[size];
Arrays.fill(bytes, (byte) 7);
return Unpooled.wrappedBuffer(bytes);
}

private static ByteBuf fakeConnectionFrame(int size) {
byte[] bytes = new byte[size];
Arrays.fill(bytes, (byte) 0);
return Unpooled.wrappedBuffer(bytes);
}
}

0 comments on commit 627f590

Please sign in to comment.