Skip to content

Commit

Permalink
Revert "Feature/rate limiter publisher (#672)"
Browse files Browse the repository at this point in the history
This reverts commit 88ee909
  • Loading branch information
mostroverkhov committed Oct 14, 2019
1 parent 33259c2 commit b9ea3eb
Show file tree
Hide file tree
Showing 11 changed files with 23 additions and 412 deletions.
21 changes: 10 additions & 11 deletions rsocket-core/src/main/java/io/rsocket/RSocketRequester.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.rsocket.exceptions.Exceptions;
import io.rsocket.frame.*;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.internal.RateLimitableRequestPublisher;
import io.rsocket.internal.LimitableRequestPublisher;
import io.rsocket.internal.SynchronizedIntObjectHashMap;
import io.rsocket.internal.UnboundedProcessor;
import io.rsocket.internal.UnicastMonoProcessor;
Expand All @@ -47,7 +47,6 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.*;
import reactor.util.concurrent.Queues;

/**
* Requester Side of a RSocket socket. Sends {@link ByteBuf}s to a {@link RSocketResponder} of peer
Expand All @@ -61,7 +60,7 @@ class RSocketRequester implements RSocket {
private final PayloadDecoder payloadDecoder;
private final Consumer<Throwable> errorConsumer;
private final StreamIdSupplier streamIdSupplier;
private final IntObjectMap<RateLimitableRequestPublisher> senders;
private final IntObjectMap<LimitableRequestPublisher> senders;
private final IntObjectMap<Processor<Payload, Payload>> receivers;
private final UnboundedProcessor<ByteBuf> sendProcessor;
private final RequesterLeaseHandler leaseHandler;
Expand Down Expand Up @@ -132,7 +131,7 @@ private void handleSendProcessorError(Throwable t) {
}
});

senders.values().forEach(RateLimitableRequestPublisher::cancel);
senders.values().forEach(LimitableRequestPublisher::cancel);
}

private void handleSendProcessorCancel(SignalType t) {
Expand All @@ -151,7 +150,7 @@ private void handleSendProcessorCancel(SignalType t) {
}
});

senders.values().forEach(RateLimitableRequestPublisher::cancel);
senders.values().forEach(LimitableRequestPublisher::cancel);
}

@Override
Expand Down Expand Up @@ -344,8 +343,8 @@ public void accept(long n) {
request
.transform(
f -> {
RateLimitableRequestPublisher<Payload> wrapped =
RateLimitableRequestPublisher.wrap(f, Queues.SMALL_BUFFER_SIZE);
LimitableRequestPublisher<Payload> wrapped =
LimitableRequestPublisher.wrap(f);
// Need to set this to one for first the frame
wrapped.request(1);
senders.put(streamId, wrapped);
Expand Down Expand Up @@ -422,7 +421,7 @@ protected void hookOnError(Throwable t) {
.doFinally(
s -> {
receivers.remove(streamId);
RateLimitableRequestPublisher sender = senders.remove(streamId);
LimitableRequestPublisher sender = senders.remove(streamId);
if (sender != null) {
sender.cancel();
}
Expand Down Expand Up @@ -490,7 +489,7 @@ private void setTerminationError(Throwable error) {
}

private synchronized void cleanUpLimitableRequestPublisher(
RateLimitableRequestPublisher<?> limitableRequestPublisher) {
LimitableRequestPublisher<?> limitableRequestPublisher) {
try {
limitableRequestPublisher.cancel();
} catch (Throwable t) {
Expand Down Expand Up @@ -562,7 +561,7 @@ private void handleFrame(int streamId, FrameType type, ByteBuf frame) {
break;
case CANCEL:
{
RateLimitableRequestPublisher sender = senders.remove(streamId);
LimitableRequestPublisher sender = senders.remove(streamId);
if (sender != null) {
sender.cancel();
}
Expand All @@ -573,7 +572,7 @@ private void handleFrame(int streamId, FrameType type, ByteBuf frame) {
break;
case REQUEST_N:
{
RateLimitableRequestPublisher sender = senders.get(streamId);
LimitableRequestPublisher sender = senders.get(streamId);
if (sender != null) {
int n = RequestNFrameFlyweight.requestN(frame);
sender.request(n >= Integer.MAX_VALUE ? Long.MAX_VALUE : n);
Expand Down
9 changes: 4 additions & 5 deletions rsocket-core/src/main/java/io/rsocket/RSocketResponder.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import io.rsocket.exceptions.ApplicationErrorException;
import io.rsocket.frame.*;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.internal.RateLimitableRequestPublisher;
import io.rsocket.internal.LimitableRequestPublisher;
import io.rsocket.internal.SynchronizedIntObjectHashMap;
import io.rsocket.internal.UnboundedProcessor;
import io.rsocket.lease.ResponderLeaseHandler;
Expand All @@ -35,7 +35,6 @@
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.*;
import reactor.util.concurrent.Queues;

/** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */
class RSocketResponder implements ResponderRSocket {
Expand All @@ -47,7 +46,7 @@ class RSocketResponder implements ResponderRSocket {
private final Consumer<Throwable> errorConsumer;
private final ResponderLeaseHandler leaseHandler;

private final IntObjectMap<RateLimitableRequestPublisher> sendingLimitableSubscriptions;
private final IntObjectMap<LimitableRequestPublisher> sendingLimitableSubscriptions;
private final IntObjectMap<Subscription> sendingSubscriptions;
private final IntObjectMap<Processor<Payload, Payload>> channelProcessors;

Expand Down Expand Up @@ -436,8 +435,8 @@ private void handleStream(int streamId, Flux<Payload> response, int initialReque
response
.transform(
frameFlux -> {
RateLimitableRequestPublisher<Payload> payloads =
RateLimitableRequestPublisher.wrap(frameFlux, Queues.SMALL_BUFFER_SIZE);
LimitableRequestPublisher<Payload> payloads =
LimitableRequestPublisher.wrap(frameFlux);
sendingLimitableSubscriptions.put(streamId, payloads);
payloads.request(
initialRequestN >= Integer.MAX_VALUE ? Long.MAX_VALUE : initialRequestN);
Expand Down

This file was deleted.

Loading

0 comments on commit b9ea3eb

Please sign in to comment.