Skip to content

Commit

Permalink
2.x: fix buffer(time, maxSize) duplicating buffers on time-size race (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Jun 20, 2017
1 parent 73a85c1 commit f82a197
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -460,12 +460,12 @@ public void onNext(T t) {
if (b.size() < maxSize) {
return;
}
}

if (restartTimerOnMaxSize) {
buffer = null;
producerIndex++;
}

if (restartTimerOnMaxSize) {
timer.dispose();
}

Expand All @@ -480,17 +480,12 @@ public void onNext(T t) {
return;
}

synchronized (this) {
buffer = b;
consumerIndex++;
}
if (restartTimerOnMaxSize) {
synchronized (this) {
buffer = b;
consumerIndex++;
}

timer = w.schedulePeriodically(this, timespan, timespan, unit);
} else {
synchronized (this) {
buffer = b;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,12 +458,11 @@ public void onNext(T t) {
if (b.size() < maxSize) {
return;
}
}

if (restartTimerOnMaxSize) {
buffer = null;
producerIndex++;
}

if (restartTimerOnMaxSize) {
timer.dispose();
}

Expand All @@ -478,17 +477,12 @@ public void onNext(T t) {
return;
}

synchronized (this) {
buffer = b;
consumerIndex++;
}
if (restartTimerOnMaxSize) {
synchronized (this) {
buffer = b;
consumerIndex++;
}

timer = w.schedulePeriodically(this, timespan, timespan, unit);
} else {
synchronized (this) {
buffer = b;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1973,4 +1973,45 @@ public void skipBackpressure() {
.test()
.assertResult(Arrays.asList(1, 2), Arrays.asList(4, 5), Arrays.asList(7, 8), Arrays.asList(10));
}

@Test
public void withTimeAndSizeCapacityRace() {
for (int i = 0; i < 1000; i++) {
final TestScheduler scheduler = new TestScheduler();

final PublishProcessor<Object> ps = PublishProcessor.create();

TestSubscriber<List<Object>> ts = ps.buffer(1, TimeUnit.SECONDS, scheduler, 5).test();

ps.onNext(1);
ps.onNext(2);
ps.onNext(3);
ps.onNext(4);

Runnable r1 = new Runnable() {
@Override
public void run() {
ps.onNext(5);
}
};

Runnable r2 = new Runnable() {
@Override
public void run() {
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
}
};

TestHelper.race(r1, r2);

ps.onComplete();

int items = 0;
for (List<Object> o : ts.values()) {
items += o.size();
}

assertEquals("Round: " + i, 5, items);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1398,4 +1398,45 @@ public void bufferTimedExactBoundedError() {
to
.assertFailure(TestException.class);
}

@Test
public void withTimeAndSizeCapacityRace() {
for (int i = 0; i < 1000; i++) {
final TestScheduler scheduler = new TestScheduler();

final PublishSubject<Object> ps = PublishSubject.create();

TestObserver<List<Object>> ts = ps.buffer(1, TimeUnit.SECONDS, scheduler, 5).test();

ps.onNext(1);
ps.onNext(2);
ps.onNext(3);
ps.onNext(4);

Runnable r1 = new Runnable() {
@Override
public void run() {
ps.onNext(5);
}
};

Runnable r2 = new Runnable() {
@Override
public void run() {
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
}
};

TestHelper.race(r1, r2);

ps.onComplete();

int items = 0;
for (List<Object> o : ts.values()) {
items += o.size();
}

assertEquals("Round: " + i, 5, items);
}
}
}

0 comments on commit f82a197

Please sign in to comment.