You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
But when the disposeIfNotSubscribed() fires, it is subscribed to without the operator:
publicbooleandisposeIfNotSubscribed() {
if (state.casState(State.STATES.UNSUBSCRIBED, State.STATES.DISPOSED)) {
state.bufferedSubject.subscribe(Subscribers.empty()); // Drain all items so that ByteBuf gets released. // <-----if (null != state.onUnsubscribe) {
state.onUnsubscribe.call(); // Since this is an inline/sync call, if this throws an error, it gets thrown to the caller.
}
returntrue;
}
returnfalse;
}
I changed the implementation also to
state.bufferedSubject.lift(newAutoReleaseByteBufOperator()).subscribe(Subscribers.empty()); // Drain all items so that ByteBuf gets released.
and now it seems to work. I had to move out the operator one level since its private on the inner class, but it's not sharing any context anyway.
So
do you agree its an issue
want me to file a PR with a unit test?
I'm going with the modified version for now... at one point I wonder if we should merge back the subject into RxJava. The only problematic thing is probably how to reference the "releaseable" part, but we could come up with an interface that has a retain() and release() method which in the actual impls could be proxied through to netty buffers.
The text was updated successfully, but these errors were encountered:
Thanks @daschl for reporting this. I missed the release during timeout in my latest changes (ugghh)
I will review your PR now.
at one point I wonder if we should merge back the subject into RxJava.
I think there is a value in providing a UnicastDisposableSubject in RxJava; which provides:
Only a single subscriber (unicast)
An option to dispose if not subscribed. The signal to dispose can be via an Observable
we could come up with an interface that has a retain() and release() method which in the actual impls could be proxied through to netty buffers.
Yeah I think before-after onNext semantics is what we would need in the subject to have retain and release. Not sure what is a clean way of doing that, we can start a discussion on rxjava for this.
Hi @NiteshKant @benjchristensen,
I think I found a bug in the UnicastContentSubject. I'm currently copying it over to the couchbase SDK and giving it a try and I found the following.
When you do not subscribe the timer fires, but it actually never frees the buffer - and I think here is why:
Normally, when one subscribes you chain in the releasing operator:
But when the
disposeIfNotSubscribed()
fires, it is subscribed to without the operator:I changed the implementation also to
and now it seems to work. I had to move out the operator one level since its private on the inner class, but it's not sharing any context anyway.
So
I'm going with the modified version for now... at one point I wonder if we should merge back the subject into RxJava. The only problematic thing is probably how to reference the "releaseable" part, but we could come up with an interface that has a retain() and release() method which in the actual impls could be proxied through to netty buffers.
The text was updated successfully, but these errors were encountered: