Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3821] Support Set for exact buffers in Flux.buffer #3822

Merged
merged 11 commits into from
Aug 22, 2024
Merged

[#3821] Support Set for exact buffers in Flux.buffer #3822

merged 11 commits into from
Aug 22, 2024

Conversation

Sage-Pierce
Copy link
Contributor

If a Set is used as the destination in Flux.buffer, the stream will not hang if/when there are duplicates in a given buffer

Previously, FluxBuffer was not taking the result of adding to the buffer into account. If adding to the buffer does not result in modifying it, an extra request(1) should be issued.

Fixes #3821

@Sage-Pierce Sage-Pierce requested a review from a team as a code owner June 1, 2024 16:04
@chemicL
Copy link
Member

chemicL commented Jun 17, 2024

Hey, @Sage-Pierce !

I apologize it took some time to respond. I reworked the bufferTimeout operator with fair backpressure recently, that's the reason for it.

Considering all existing buffer* implementations I believe their behaviour should be consistent across the offering. I wonder how this requirement would influence other implementations as it feels a bit risky in the face of concurrency. Can you share your thoughts? After looking at the codebase it seems the authors didn't consider this scenario and assumed there's no filtering happening and once an item is added, the collection's size increases. I wonder whether including this check and handling is the way to go or perhaps limiting the possible Collection types is better while combining with another operator to remove the duplicates would be a safer bet.

Let me know, thanks!

@Sage-Pierce
Copy link
Contributor Author

Hi @chemicL 👋

No worries about the delay 😄 I didn't immediately look into the other buffer* permutations, as I precisely wanted to see what the feedback on this change was before undertaking that. I would agree that consistency across all the permutations is desirable. The weird bit is that all the other permutations use some form of sizing and/or either some asynchronous trigger or a predicate. It's not clear to me that this would actually be an issue in all those other permutations, and after some initial poking around, it doesn't seem to be.

it feels a bit risky in the face of concurrency

I don't think that there's any more risk with this change regarding concurrency, since onNext is serialized, and the interaction with the buffer is already synchronized where necessary.

I wonder whether including this check and handling is the way to go or perhaps limiting the possible Collection types is better while combining with another operator to remove the duplicates would be a safer bet.

I don't think that there is currently another way to implement "give me n distinct items at a time", though I initially thought there could be with something like window, but I couldn't/can't figure it out

@chemicL
Copy link
Member

chemicL commented Jun 18, 2024

I don't think that there is currently another way to implement "give me n distinct items at a time", though I initially thought there could be with something like window, but I couldn't/can't figure it out

I came up with this:

public static void main(String[] args) {
	Sinks.Many<Integer> makeACut = Sinks.many().unicast().onBackpressureBuffer();
	Flux.just(1, 1, 2, 3, 4, 4, 4, 4, 5, 6, 6, 7)
			.window(makeACut.asFlux())
			.concatMap(w -> w.reduceWith(HashSet::new, (set, i) -> {
				set.add(i);
				if (set.size() == 2) {
					makeACut.tryEmitNext(0);
				}
				return set;
			}))
			.filter(set -> !set.isEmpty())
			.doOnNext(System.out::println)
			.blockLast();
}

Just as a conversation starter :) I do imagine this doesn't look as nice and the performance would be incomparable.

I'll try to digest the rest of the comments and review the other implementations next. For now, can you also prepare a few sample {input, output} sets so that we know what the end goal is? I mean a sequence 1, 2, 1, 3 would yield [1, 2] and [1, 3] for n == 2, but would yield [1, 2, 3] for n == 3. Is that desired? Can you share some real world scenarios that come to mind that this would benefit? I tend to first try to understand the need and then try to work towards a solution that matches the expectations. This potential mismatch regarding expected supplied aggregator types is puzzling and it would be neat if we could comprehensively address this.

@Sage-Pierce
Copy link
Contributor Author

Ah nice, that was abstractly what I had in my head, but I couldn't come up with that Sink feedback loop that you used 😄

can you also prepare a few sample {input, output} sets so that we know what the end goal is? I mean a sequence 1, 2, 1, 3 would yield [1, 2] and [1, 3] for n == 2, but would yield [1, 2, 3] for n == 3. Is that desired?

The test I wrote for this changeset covers the basic expectation I think, and it looks like you already understand my intent quite well. I'll just format those and a few more below:

Given flux.buffer(2, LinkedHashSet::new):

  • [1, 2] -> [1, 2]
  • [1, 1, 2] -> [1, 2]
  • [1, 2, 1] -> [1, 2], [1]
  • [1, 2, 1, 3] -> [1, 2], [1, 3]
  • [1, 1, 2, 3] -> [1, 2], [3]
  • [2, 1, 1, 3] -> [2, 1], [1, 3]

Can you share some real world scenarios that come to mind that this would benefit?

In my use case, I am iterating over time-bucketed data elements (from a database) and executing an I/O-bound process on them (a service call). That service call is maximally efficient when passed n distinct elements. The "buckets" are large, usually much larger than n, so the optimization I'm driving toward is iterating over n distinct elements at a time from each "bucket" and executing the I/O bound process for every n elements.

@chemicL
Copy link
Member

chemicL commented Jun 19, 2024

Thanks.

For bufferTimeout:

Replace

.<Set<Integer>>buffer(2, HashSet::new)

with

.<Set<Integer>>bufferTimeout(2, Duration.ofDays(30), HashSet::new)

and you can observe the same outcome.

Out of the others, bufferWhen also works with a Supplier of Collection, but there's no specific size so this wouldn't be an issue for that variant.

@Sage-Pierce
Copy link
Contributor Author

Sage-Pierce commented Jun 19, 2024

For the bufferTimeout behavior, that issue is slightly different than the issue being fixed here. In bufferTimeout, the amount of buffered items is tracked independently of the buffer size. This leads to FluxBufferTimeout not having the same issue of hanging as FluxBuffer. However, it does make the size-triggered emission slightly different:

Given flux.bufferTimeout(2, Duration.ofSeconds(1), LinkedHashSet::new), the current behavior is (differences marked with *):

  • [1, 2] -> [1, 2]
  • [1, 1, 2] -> [1], [2]*
  • [1, 2, 1] -> [1, 2], [1]
  • [1, 2, 1, 3] -> [1, 2], [1, 3]
  • [1, 1, 2, 3] -> [1], [2, 3]*
  • [2, 1, 1, 3] -> [2, 1], [1, 3]

Due to that significant difference in how FluxBufferTimeout keeps track of the number of buffered elements, I'm not convinced this particular behavior can be made consistent with FluxBuffer without significant refactoring of either operator.

Given that there isn't actually a "hanging" issue with bufferTimeout and the usage of a Set buffer supplier, and given that FluxBuffer's upstream request count is decoupled from the number of elements actually contained in emitted buffers, I'd be inclined to treat this behavioral difference as a separate issue, and one which may not be a practical issue at all, since bufferTimeout users inherently have to be prepared to handle buffers with sizes less than the max buffer size.

Thoughts?

@chemicL
Copy link
Member

chemicL commented Jun 21, 2024

Thanks for following up. I agree that bufferTimeout can be treated differently, thanks for sharing that view. Let's consider what would be a consistent approach to this that wouldn't surprise users of the API once we make changes around supporting Set. From the Javadoc perspective of bufferTimeout:

(...) buffers that will be emitted by the returned {@link Flux} each time the buffer reaches a maximum size OR the maxTime {@link Duration} elapses (...)

In my view, the current behaviour when presented with a Collection that can return false upon add() doesn't deliver what the Javadoc's promises. For one, this PR sounds justifiable from that perspective and thank you for starting the conversation :)

I think in order to merge something we'd need to cover all buffer* cases that accept a Supplier<C> where C is a Collection for the upstream type.

As this currently doesn't work correctly nor consistently we should make an effort to bring more clarity in the docs and tests.

For bufferTimeout:

  1. I'd suggest adding a test which validates the Set against an upstream with N consecutive duplicate values, where N is higher than the prefetch size. If there is no issue here, let's proceed with next steps, otherwise we should seek on explaining the failures (just like some operators state that an accepted Publisher needs to be finite -> we are sometimes not at liberty to limit implementations by type but only by the specification).
  2. Please document in the Javadoc that for Collections that are capable of returning false from add() the buffers can be smaller than the expected buffer size.

For buffer:

  1. Please also address the other Subscriber implementations. You touched BufferExactSubscriber, but we also have BufferSkipSubscriber and BufferOverlappingSubscriber.
  2. We need more test cases.

I understand this requires more work so please let me know if you're still keen to contribute. I'd just like us to have a consistent UX across similar operators and that requires a holistic approach. I'll be away for a week but if you make any progress, please do commit and I'll review the changes when I'm back.

Thanks again @Sage-Pierce and I look forward to where this discussion leads us :)

@Sage-Pierce
Copy link
Contributor Author

@chemicL I don't mind taking a stab at all of that 😄 May take some time, but may have an updated review next week.

spierce added 3 commits June 24, 2024 07:55
- Make `Collection` behavior consistent among all FluxBuffer* operators
- Added several more tests for all FluxBuffer* operators covering usage of `Set`
@Sage-Pierce
Copy link
Contributor Author

@chemicL I believe I have addressed your feedback, and I look forward to your re-review when you return. I will be on vacation for the first half of July, so it may take me a bit to follow up on further feedback.

Copy link
Member

@chemicL chemicL left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the follow-up. Please have a look at my comments. For the skip != maxSize case I have different expectations for the results in the test cases after reading the javadoc. These should:

  • a) Either comply with the existing specification (new buffer after skip items were emitted by the source).
  • b) Or adjust the spec to the special case.

With a) it would be necessary to emit the current buffers despite their size being smaller than max but according to considered emitted items so far that fall into the window observed by a particular buffer... And maxSize == skip should not be a special case. So this would lead to an inefficiency in just emitting smaller buffers and that's not your goal.

I think we're left with b) and I guess that in such a case the overlapping buffers would need to be smaller in size once they fall out of scope. For the disjoint case (skip >= maxSize) we can just pretend that the discarded items were never emitted.

Additionally, please note the "Discard support" section in the relevant Javadocs -> they should also explain what's happening here.

Let me know your thoughts and thanks for the effort so far.

@@ -3123,6 +3129,10 @@ public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime) {
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
* size OR the maxTime {@link Duration} elapses.
* <p>
* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation
* of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is
* less than the specified max size.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* less than the specified max size.
* less than the specified max size. The element will be discarded in such a case.

@@ -3163,6 +3173,10 @@ public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime, Schedule
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
* size OR the maxTime {@link Duration} elapses, as measured on the provided {@link Scheduler}.
* <p>
* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation
* of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is
* less than the specified max size.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same suggestion as above.

@@ -3230,6 +3244,10 @@ public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime,
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
* size OR the maxTime {@link Duration} elapses.
* <p>
* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation
* of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is
* less than the specified max size.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same suggestion as above.

@@ -3254,6 +3272,10 @@ public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSiz
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
* size OR the maxTime {@link Duration} elapses, as measured on the provided {@link Scheduler}.
* <p>
* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation
* of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is
* less than the specified max size.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the same suggestion as above.

Comment on lines 487 to 494
// It should never be the case that an element can be added to the first open
// buffer and not all of them. Otherwise, the buffer behavior is non-deterministic,
// and this operator's behavior is undefined.
if (!b.add(t) && b == b0) {
Operators.onDiscard(t, actual.currentContext());
s.request(1);
return;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine it can be the case. In a recent buffer the item is not a duplicate, but in the older buffers it can appear to be a duplicate. Therefore, an item can be discarded only provided that it is a duplicate in all currently tracked buffers. That also necessitates that the old buffers get emitted despite being smaller in size, otherwise there is a risk of leaking memory. Consider the following:

maxSize = 3
skip = 1
input: 1|2|1|2|1|2

step1: 1
  buf1 = 1     // ok
step2: 2
  buf1 = 1|2   // ok
  buf2 = 2     // ok
step3: 1
  buf1 = 1|2   // fail(1)
  buf2 = 2|1   // ok
  buf3 = 1     // ok
step4: 2
  buf1 = 1|2   // fail(2)
  buf2 = 2|1   // fail(2)
  buf3 = 1|2   // ok
  buf4 = 2     // ok
and so on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a good catch. I rather implemented this as-is to satisfy an invariant of, "an item is discarded if it is not add-able to all in-flight buffers" (hence the comment). However, I can see how this should rather be, "an item is discarded if it is not add-able to any in-flight buffers".

@chemicL
Copy link
Member

chemicL commented Jul 3, 2024

Ah and regarding the build failure -> make sure to run ./gradlew spotlessApply before committing.

@chemicL
Copy link
Member

chemicL commented Jul 3, 2024

After giving it some though, I wonder whether it makes sense to introduce a new operator for this particular purpose, like

public final <C extends Collection<? super T>> Flux<C> accumulate(Supplier<C> bufferSupplier, Predicate<C> emitPredicate)

which would emit the buffer once the condition is met and then start a new buffer.

Usage:

flux.accumulate(() -> new HashSet(), set -> set.size() == N);

And for the existing nuance with Collections that can return false, just document that it's not supported for buffer operators.

WDYT?

@chemicL
Copy link
Member

chemicL commented Jul 4, 2024

The more I think about it the more doubts I have :)
As a generic operator it feels that any attempt with Sets is spawning a set of problems. Considering the example of
1|2|1|2|1|2... stream, any strong choice of the size parameter would never emit anything until the source completes. That sounds like a risk for more complaints than the gain of addressing the particular need that you have. Your examples make some implicit assumptions about the source stream, like the global approximation of monotonicity over time with possible local duplicates that you wish to eliminate with "best-effort" approach. However, as a generic set of operators, we can't make such assumptions.

I consulted RxJava's codebase, which shares the implementation of the buffer operator with reactor. There is also no handling of the return value of C.add() in the implementation. In Akka, similar operators, called grouped*, just work with List and don't run into such an issue.

To add to that, the suggestion I made with using window sounds feasible in the face of your use case, where you actually intend to send the aggregations of unique items over the network. In such a case, the overhead of using window instead of buffer should be diminished. Considering that particular use case, I imagine a case where a situation might happen where a time-related trigger can also be necessary. E.g. in case you have aggregated N-1 items in the buffer and now there is a stall in the source emissions for e.g. 1s. It would probably be reasonable to send the incomplete buffer after a smaller delay according to the SLA of your business component.

Consider the following idea for your use case (with artificial delays introduced):

	int maxSize = 3;
	Duration timeBoundary = Duration.ofMillis(100);
	Duration itemDelay = Duration.ofMillis(60);

	Sinks.Many<Object> makeACut = Sinks.many()
	                               .unicast()
	                               .onBackpressureBuffer();
	Flux.just(1, 2, 1, 2, 1, 2)
			.delayElements(itemDelay)
			.window(makeACut.asFlux())
			.concatMap(items -> items.distinct()
			                         .<Integer>zipWith(Flux.fromStream(IntStream.iterate(1, i -> i + 1).boxed())
			                                               .timeout(timeBoundary)
			                                               .doOnError(e -> makeACut.tryEmitNext("TIME_SLICE"))
			                                               .onErrorComplete())
			                         .map(t -> {
				                         if (t.getT2() == maxSize) {
					                         makeACut.tryEmitNext("SIZE_SLICE");
				                         }
										 return t.getT1();
			                         })
			                         .collectList()
			)
		.doOnNext(buf -> System.out.println(Instant.now() + ": " + buf))
        .blockLast();

@chemicL
Copy link
Member

chemicL commented Jul 12, 2024

I feel I've re-invented the bufferTimeout operator in the above comment 😅 I'm looking forward to your comments @Sage-Pierce .

@chemicL chemicL self-assigned this Jul 17, 2024
@chemicL chemicL added the status/need-user-input This needs user input to proceed label Jul 17, 2024
@Sage-Pierce
Copy link
Contributor Author

Sage-Pierce commented Jul 20, 2024

Heya @chemicL, I am back from vacation and catching up on your feedback 😄

As a generic operator it feels that any attempt with Sets is spawning a set of problems. Considering the example of
1|2|1|2|1|2... stream, any strong choice of the size parameter would never emit anything until the source completes.

This is true, and I think this makes sense, if you have any client that is choosing to use a Set instead of a List. This particular corner-case usage of Set is something that clients would have to explicitly opt themselves into by providing a Set in the first place, and I think it stands to reason that such clients will/would have thought through the idiosyncrasies of configuring such usage.

Your examples make some implicit assumptions about the source stream, like the global approximation of monotonicity over time with possible local duplicates that you wish to eliminate with "best-effort" approach. However, as a generic set of operators, we can't make such assumptions.

True, the examples I've provided so far imply some form of monotonically increasing emission, but I don't know that it's an assumption that affects the desired behavior. Even if the emission is more random, I think the desired generic behavior still makes sense, and again, I think the onus would be on the client to understand that providing a Set to the buffering operator would cause a bit of odd, yet expected behavior.

To add to that, the suggestion I made with using window sounds feasible in the face of your use case, where you actually intend to send the aggregations of unique items over the network. In such a case, the overhead of using window instead of buffer should be diminished. Considering that particular use case, I imagine a case where a situation might happen where a time-related trigger can also be necessary. E.g. in case you have aggregated N-1 items in the buffer and now there is a stall in the source emissions for e.g. 1s. It would probably be reasonable to send the incomplete buffer after a smaller delay according to the SLA of your business component.

I certainly see how this approach could be used as a workaround, but in all honesty, if that's the only way to accomplish what I'm looking for, I would be inclined to abandon my goal, since, IMO the resulting code complexity doesn't warrant the benefit. I would feel a bit awkward trying to explain to my teammates that this much code was responsible for implementing "give me N distinct items at a time". And as you are also pointing out, this proposed workaround may have some missing considerations.

I wonder whether it makes sense to introduce a new operator for this particular purpose

I'm not immediately opposed to a new operator, though I know there is a low appetite for new operators to maintain in Reactor.

After I update the code to incorporate your latest feedback, I'd like to see how you're feeling about these thoughts. At the very least, I think something should be changed to address the cases where a Collection may be provided that returns false, because the failure that happens right now is, IMO, the worst possible type of failure, which is hanging, and this is incredibly difficult to debug (trying to find where a requested element is sliently dropped in a pipeline can be a nightmare). I think there are a few options:

  1. Continue with these changes, ensuring that the cases where a Collection returns false are called out in the documentation
  2. Emit an error if a Collection returns false
  3. Update the buffer signatures to only accept Supplier<? extends List<? super T>>
  4. Remove the buffer signatures that accept a buffer supplier
  5. New operator

@chemicL
Copy link
Member

chemicL commented Jul 22, 2024

Hey, thanks for getting back on this. Can you also reflect on the notes I made about the potential use of bufferTimeout for your use case? This seemed to me to be the justified choice to deal with stale buffers. Would you be able to explain the use of bufferTimeout with Set to your colleagues to address the concern of "give me N distinct items at a time" with a footnote of "or less if waiting for more than M"?

@Sage-Pierce
Copy link
Contributor Author

Been a busy week 😓

Can you also reflect on the notes I made about the potential use of bufferTimeout for your use case?

I do believe this could work for my me, though I think we both agree it's not obvious that this would be the correct operator to use for the more generic version of my use case. In particular, there isn't really a notion of a "stale" buffer for what I'm working on. I just care that there's "N distinct items", and not that it could take a long time to emit any given buffer because there are a bunch of duplicates. In fact, this would be great for my use in order to absolutely minimize load.

Copy link
Member

@chemicL chemicL left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a few rounds I'm quite convinced that the exact buffer case (maxSize == skip) is something that can be handled and makes sense. For maxSize != skip I find inconsistencies in

  • breaking semantics from existing implementation about opening the buffer every skip item emitted from the source
  • risks of keeping incomplete buffers in memory
  • risks of stalling processing

Do you agree that bufferTimeout is a good fit and can be documented as one that supports this type of Collection together with buffer with exact size buffers where this is an exceptional case where the skip argument is ignored as buffers are created upon completion of the current buffer?

If so:

  • I think we can discard changes in the BufferSkipSubscriber and BufferOverlappingSubscriber
  • I wondered if we should implement abrupt onError signalled to downstream in case Collection#add returns false and document the requirement for Collection to accept all signals, but I'm hesitant to do so. I think it might be better to leave the gate open in case we need to reconsider. For now the undefined behaviour should not impact the performance in any case with additional conditional statements.
  • Javadoc for buffer and bufferWhen can explain the "exact" case exception which allows Collection to return false and will request more from upstream.


if (!added) {
Operators.onDiscard(t, actual.currentContext());
s.request(1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do have doubts here - the demand flow without Set is that every buffer that is initiated has a chance to be fulfilled. If the downstream requests 1 it means a full buffer should be completed. As the buffers are delivered, the downstream can request more. With potentially non-deterministic implementations of Collection which can unpredictably return false some assumptions might not hold. There are a few assumptions here that dictate the flow that follows. I'd probably prefer to back away from implementations for skip != maxSize as they become complicated to reason about and explain to users what are the risks (e.g. keeping a growing list of undelivered buffers). I think we could terminate early instead of stalling and deliver an error to the downstream and cancel the source in case the buffer is unable to accept the item.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the early termination, I clarified in the general comment - better to leave the undefined behaviour in such case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the demand flow without Set is that every buffer that is initiated has a chance to be fulfilled. If the downstream requests 1 it means a full buffer should be completed. As the buffers are delivered, the downstream can request more. With potentially non-deterministic implementations of Collection which can unpredictably return false some assumptions might not hold.

Indeed, this is one of the reasons I originally implemented this with "if the data can't be added to all in-flight buffers, then discard it", which might avoid some of this weirdness.

I'd probably prefer to back away from implementations for skip != maxSize as they become complicated to reason about and explain to users what are the risks ... better to leave the undefined behaviour in such case.

I agree 😄

@Sage-Pierce
Copy link
Contributor Author

After a few rounds I'm quite convinced that the exact buffer case (maxSize == skip) is something that can be handled and makes sense. For maxSize != skip I find inconsistencies ... Do you agree that bufferTimeout is a good fit and can be documented as one that supports this type of Collection

Yep, at this point, I am in agreement. For overlapping or skipping buffers that use collections that might return false, the guidance can be to instead use bufferTimeout

with exact size buffers where this is an exceptional case where the skip argument is ignored as buffers are created upon completion of the current buffer?

Agreed

I think we can discard changes in the BufferSkipSubscriber and BufferOverlappingSubscriber

Will do!

I wondered if we should implement abrupt onError signalled to downstream in case Collection#add returns false and document the requirement for Collection to accept all signals, but I'm hesitant to do so. I think it might be better to leave the gate open in case we need to reconsider. For now the undefined behaviour should not impact the performance in any case with additional conditional statements.

I'm on board with not doing anything about this for the cases where max != skip, and just leaving the changes I currently have for overlap and skip reverted.

Javadoc for buffer and bufferWhen can explain the "exact" case exception which allows Collection to return false and will request more from upstream.

I will include this in my updates. However, I don't think anything should change about bufferWhen?

Copy link
Member

@chemicL chemicL left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the end I'm not sure if it makes sense to touch bufferTimeout in this PR as per comment in the test suite and the fact that discard is not implemented. The same can be achieved with just eliminating duplicates on each buffer emission.

Sorry to have dragged you all the way and now we're back to square one.

I think that if we work on #3774 it could be possible to rework the bufferTimeout operator in such a way to make this work too. For now, Let's just make an exceptional case for buffer with maxSize == skip and merge your changes.

@@ -3123,6 +3126,10 @@ public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime) {
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
* size OR the maxTime {@link Duration} elapses.
* <p>
* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation
* of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is
* less than the specified max size. The element will be discarded in such a case.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is false.

@@ -3163,6 +3170,10 @@ public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime, Schedule
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
* size OR the maxTime {@link Duration} elapses, as measured on the provided {@link Scheduler}.
* <p>
* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation
* of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is
* less than the specified max size. The element will be discarded in such a case.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is false as well.

@@ -3230,6 +3241,10 @@ public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime,
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
* size OR the maxTime {@link Duration} elapses.
* <p>
* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation
* of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is
* less than the specified max size. The element will be discarded in such a case.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this one is also false. Discard support is not present in bufferTimeout for buffer.add returning false.

@@ -3254,6 +3269,10 @@ public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSiz
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
* size OR the maxTime {@link Duration} elapses, as measured on the provided {@link Scheduler}.
* <p>
* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation
* of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is
* less than the specified max size. The element will be discarded in such a case.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And also false here.

Flux.just(1, 1, 1, 1, 1, 1, 1)
.<Set<Object>>bufferTimeout(3, Duration.ofSeconds(2), HashSet::new)
.as(it -> StepVerifier.create(it, 3))
.expectNext(Collections.singleton(1), Collections.singleton(1), Collections.singleton(1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it of little practical use in the end -> the result is that a buffer is emitted every time maxSize items are consumed from the source, not when the buffer size is at the maxSize limit.

@Sage-Pierce
Copy link
Contributor Author

In the end I'm not sure if it makes sense to touch bufferTimeout in this PR as per comment in the test suite and the fact that discard is not implemented.

Sounds good to me! I'll revert the changes related to bufferTimeout

Let's just make an exceptional case for buffer with maxSize == skip and merge your changes.

And also sounds good 😄 I will go ahead and update this PR with your recommendations!

@chemicL chemicL merged commit e58658d into reactor:main Aug 22, 2024
7 checks passed
@chemicL chemicL added type/enhancement A general enhancement and removed status/need-user-input This needs user input to proceed labels Aug 22, 2024
@chemicL
Copy link
Member

chemicL commented Aug 22, 2024

Thanks for the contribution @Sage-Pierce :) I labelled it as an enhancement. This actually has a "It's a feature, not a bug" kind of vibe to me and I feel we are not really addressing a common flaw but added support for a special case as demonstrated by our lengthy discussion. It is a grey area, unfortunately. Anyways, thanks for all the back and forth, glad we arrived at the final destination 🚢

@chemicL chemicL changed the title [#3821] Fix FluxBuffer to request 1 when buffer is not modified [#3821] Support Set for exact buffers in Flux.buffer Aug 22, 2024
@chemicL chemicL added this to the 3.7.0-M6 milestone Aug 22, 2024
@Sage-Pierce Sage-Pierce deleted the #3821 branch August 22, 2024 17:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement A general enhancement
Projects
None yet
Development

Successfully merging this pull request may close these issues.

FluxBuffer hangs when buffer Supplier returns a Set
2 participants