From c85fe69695e2b357d50786b723517d50c100aaa0 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Sat, 22 Aug 2020 11:06:05 +0300 Subject: [PATCH] renames PooledRSocket to WeightedRSocket Signed-off-by: Oleh Dokuka --- .../loadbalance/LoadbalanceStrategy.java | 2 +- ...Socket.java => PooledWeightedRSocket.java} | 39 +++++++-------- .../io/rsocket/loadbalance/RSocketPool.java | 50 +++++++++---------- .../RoundRobinLoadbalanceStrategy.java | 2 +- .../WeightedLoadbalanceStrategy.java | 36 ++++++------- ...ooledRSocket.java => WeightedRSocket.java} | 4 +- 6 files changed, 65 insertions(+), 68 deletions(-) rename rsocket-core/src/main/java/io/rsocket/loadbalance/{DefaultPooledRSocket.java => PooledWeightedRSocket.java} (87%) rename rsocket-core/src/main/java/io/rsocket/loadbalance/{PooledRSocket.java => WeightedRSocket.java} (88%) diff --git a/rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceStrategy.java b/rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceStrategy.java index 8220d1f2a..2bcf4455b 100644 --- a/rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceStrategy.java +++ b/rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceStrategy.java @@ -21,7 +21,7 @@ @FunctionalInterface public interface LoadbalanceStrategy { - PooledRSocket select(List availableRSockets); + WeightedRSocket select(List availableRSockets); default Supplier statsSupplier() { return Stats::noOps; diff --git a/rsocket-core/src/main/java/io/rsocket/loadbalance/DefaultPooledRSocket.java b/rsocket-core/src/main/java/io/rsocket/loadbalance/PooledWeightedRSocket.java similarity index 87% rename from rsocket-core/src/main/java/io/rsocket/loadbalance/DefaultPooledRSocket.java rename to rsocket-core/src/main/java/io/rsocket/loadbalance/PooledWeightedRSocket.java index a7ba9dfef..0cd5952a2 100644 --- a/rsocket-core/src/main/java/io/rsocket/loadbalance/DefaultPooledRSocket.java +++ b/rsocket-core/src/main/java/io/rsocket/loadbalance/PooledWeightedRSocket.java @@ -28,9 +28,9 @@ import reactor.core.publisher.Operators; import reactor.util.context.Context; -/** Default implementation of {@link PooledRSocket} stored in {@link RSocketPool} */ -final class DefaultPooledRSocket extends ResolvingOperator - implements CoreSubscriber, PooledRSocket { +/** Default implementation of {@link WeightedRSocket} stored in {@link RSocketPool} */ +final class PooledWeightedRSocket extends ResolvingOperator + implements CoreSubscriber, WeightedRSocket { final RSocketPool parent; final LoadbalanceRSocketSource loadbalanceRSocketSource; @@ -38,10 +38,10 @@ final class DefaultPooledRSocket extends ResolvingOperator volatile Subscription s; - static final AtomicReferenceFieldUpdater S = - AtomicReferenceFieldUpdater.newUpdater(DefaultPooledRSocket.class, Subscription.class, "s"); + static final AtomicReferenceFieldUpdater S = + AtomicReferenceFieldUpdater.newUpdater(PooledWeightedRSocket.class, Subscription.class, "s"); - DefaultPooledRSocket( + PooledWeightedRSocket( RSocketPool parent, LoadbalanceRSocketSource loadbalanceRSocketSource, Stats stats) { this.parent = parent; this.stats = stats; @@ -128,7 +128,7 @@ public void dispose() { protected void doOnDispose() { final RSocketPool parent = this.parent; for (; ; ) { - final PooledRSocket[] sockets = parent.activeSockets; + final PooledWeightedRSocket[] sockets = parent.activeSockets; final int activeSocketsCount = sockets.length; int index = -1; @@ -144,7 +144,7 @@ protected void doOnDispose() { } final int lastIndex = activeSocketsCount - 1; - final PooledRSocket[] newSockets = new PooledRSocket[lastIndex]; + final PooledWeightedRSocket[] newSockets = new PooledWeightedRSocket[lastIndex]; if (index != 0) { System.arraycopy(sockets, 0, newSockets, 0, index); } @@ -196,8 +196,7 @@ public Stats stats() { return stats; } - @Override - public LoadbalanceRSocketSource source() { + LoadbalanceRSocketSource source() { return loadbalanceRSocketSource; } @@ -211,7 +210,7 @@ static final class RequestTrackingMonoInner long startTime; - RequestTrackingMonoInner(DefaultPooledRSocket parent, Payload payload, FrameType requestType) { + RequestTrackingMonoInner(PooledWeightedRSocket parent, Payload payload, FrameType requestType) { super(parent, payload, requestType); } @@ -245,7 +244,7 @@ public void accept(RSocket rSocket, Throwable t) { return; } - startTime = ((DefaultPooledRSocket) parent).stats.startRequest(); + startTime = ((PooledWeightedRSocket) parent).stats.startRequest(); source.subscribe((CoreSubscriber) this); } else { @@ -257,7 +256,7 @@ public void accept(RSocket rSocket, Throwable t) { public void onComplete() { final long state = this.requested; if (state != TERMINATED_STATE && REQUESTED.compareAndSet(this, state, TERMINATED_STATE)) { - final Stats stats = ((DefaultPooledRSocket) parent).stats; + final Stats stats = ((PooledWeightedRSocket) parent).stats; final long now = stats.stopRequest(startTime); stats.record(now - startTime); super.onComplete(); @@ -268,7 +267,7 @@ public void onComplete() { public void onError(Throwable t) { final long state = this.requested; if (state != TERMINATED_STATE && REQUESTED.compareAndSet(this, state, TERMINATED_STATE)) { - Stats stats = ((DefaultPooledRSocket) parent).stats; + Stats stats = ((PooledWeightedRSocket) parent).stats; stats.stopRequest(startTime); stats.recordError(0.0); super.onError(t); @@ -284,7 +283,7 @@ public void cancel() { if (state == STATE_SUBSCRIBED) { this.s.cancel(); - ((DefaultPooledRSocket) parent).stats.stopRequest(startTime); + ((PooledWeightedRSocket) parent).stats.stopRequest(startTime); } else { this.parent.remove(this); ReferenceCountUtil.safeRelease(this.payload); @@ -296,7 +295,7 @@ static final class RequestTrackingFluxInner extends FluxDeferredResolution { RequestTrackingFluxInner( - DefaultPooledRSocket parent, INPUT fluxOrPayload, FrameType requestType) { + PooledWeightedRSocket parent, INPUT fluxOrPayload, FrameType requestType) { super(parent, fluxOrPayload, requestType); } @@ -329,7 +328,7 @@ public void accept(RSocket rSocket, Throwable t) { return; } - ((DefaultPooledRSocket) parent).stats.startStream(); + ((PooledWeightedRSocket) parent).stats.startStream(); source.subscribe(this); } else { @@ -341,7 +340,7 @@ public void accept(RSocket rSocket, Throwable t) { public void onComplete() { final long state = this.requested; if (state != TERMINATED_STATE && REQUESTED.compareAndSet(this, state, TERMINATED_STATE)) { - ((DefaultPooledRSocket) parent).stats.stopStream(); + ((PooledWeightedRSocket) parent).stats.stopStream(); super.onComplete(); } } @@ -350,7 +349,7 @@ public void onComplete() { public void onError(Throwable t) { final long state = this.requested; if (state != TERMINATED_STATE && REQUESTED.compareAndSet(this, state, TERMINATED_STATE)) { - ((DefaultPooledRSocket) parent).stats.stopStream(); + ((PooledWeightedRSocket) parent).stats.stopStream(); super.onError(t); } } @@ -364,7 +363,7 @@ public void cancel() { if (state == STATE_SUBSCRIBED) { this.s.cancel(); - ((DefaultPooledRSocket) parent).stats.stopStream(); + ((PooledWeightedRSocket) parent).stats.stopStream(); } else { this.parent.remove(this); if (requestType == FrameType.REQUEST_STREAM) { diff --git a/rsocket-core/src/main/java/io/rsocket/loadbalance/RSocketPool.java b/rsocket-core/src/main/java/io/rsocket/loadbalance/RSocketPool.java index cc2ce1a73..35a38f3b4 100644 --- a/rsocket-core/src/main/java/io/rsocket/loadbalance/RSocketPool.java +++ b/rsocket-core/src/main/java/io/rsocket/loadbalance/RSocketPool.java @@ -37,20 +37,20 @@ import reactor.util.annotation.Nullable; class RSocketPool extends ResolvingOperator - implements CoreSubscriber>, List { + implements CoreSubscriber>, List { final DeferredResolutionRSocket deferredResolutionRSocket = new DeferredResolutionRSocket(this); final LoadbalanceStrategy loadbalanceStrategy; final Supplier statsSupplier; - volatile PooledRSocket[] activeSockets; + volatile PooledWeightedRSocket[] activeSockets; - static final AtomicReferenceFieldUpdater ACTIVE_SOCKETS = + static final AtomicReferenceFieldUpdater ACTIVE_SOCKETS = AtomicReferenceFieldUpdater.newUpdater( - RSocketPool.class, PooledRSocket[].class, "activeSockets"); + RSocketPool.class, PooledWeightedRSocket[].class, "activeSockets"); - static final PooledRSocket[] EMPTY = new PooledRSocket[0]; - static final PooledRSocket[] TERMINATED = new PooledRSocket[0]; + static final PooledWeightedRSocket[] EMPTY = new PooledWeightedRSocket[0]; + static final PooledWeightedRSocket[] TERMINATED = new PooledWeightedRSocket[0]; volatile Subscription s; static final AtomicReferenceFieldUpdater S = @@ -96,8 +96,8 @@ public void onNext(List loadbalanceRSocketSources) { return; } - PooledRSocket[] previouslyActiveSockets; - PooledRSocket[] activeSockets; + PooledWeightedRSocket[] previouslyActiveSockets; + PooledWeightedRSocket[] activeSockets; for (; ; ) { HashMap rSocketSuppliersCopy = new HashMap<>(); @@ -108,11 +108,11 @@ public void onNext(List loadbalanceRSocketSources) { // checking intersection of active RSocket with the newly received set previouslyActiveSockets = this.activeSockets; - PooledRSocket[] nextActiveSockets = - new PooledRSocket[previouslyActiveSockets.length + rSocketSuppliersCopy.size()]; + PooledWeightedRSocket[] nextActiveSockets = + new PooledWeightedRSocket[previouslyActiveSockets.length + rSocketSuppliersCopy.size()]; int position = 0; for (int i = 0; i < previouslyActiveSockets.length; i++) { - PooledRSocket rSocket = previouslyActiveSockets[i]; + PooledWeightedRSocket rSocket = previouslyActiveSockets[i]; Integer index = rSocketSuppliersCopy.remove(rSocket.source()); if (index == null) { @@ -130,7 +130,7 @@ public void onNext(List loadbalanceRSocketSources) { } else { // put newly create RSocket instance nextActiveSockets[position++] = - new DefaultPooledRSocket( + new PooledWeightedRSocket( this, loadbalanceRSocketSources.get(index), this.statsSupplier.get()); } } @@ -139,7 +139,7 @@ public void onNext(List loadbalanceRSocketSources) { // going though brightly new rsocket for (LoadbalanceRSocketSource newLoadbalanceRSocketSource : rSocketSuppliersCopy.keySet()) { nextActiveSockets[position++] = - new DefaultPooledRSocket(this, newLoadbalanceRSocketSource, this.statsSupplier.get()); + new PooledWeightedRSocket(this, newLoadbalanceRSocketSource, this.statsSupplier.get()); } // shrank to actual length @@ -198,7 +198,7 @@ RSocket select() { @Nullable RSocket doSelect() { - PooledRSocket[] sockets = this.activeSockets; + WeightedRSocket[] sockets = this.activeSockets; if (sockets == EMPTY) { return null; } @@ -207,7 +207,7 @@ RSocket doSelect() { } @Override - public PooledRSocket get(int index) { + public WeightedRSocket get(int index) { return activeSockets[index]; } @@ -361,12 +361,12 @@ public boolean contains(Object o) { } @Override - public Iterator iterator() { + public Iterator iterator() { throw new UnsupportedOperationException(); } @Override - public boolean add(PooledRSocket pooledRSocket) { + public boolean add(WeightedRSocket weightedRSocket) { throw new UnsupportedOperationException(); } @@ -381,12 +381,12 @@ public boolean containsAll(Collection c) { } @Override - public boolean addAll(Collection c) { + public boolean addAll(Collection c) { throw new UnsupportedOperationException(); } @Override - public boolean addAll(int index, Collection c) { + public boolean addAll(int index, Collection c) { throw new UnsupportedOperationException(); } @@ -406,17 +406,17 @@ public void clear() { } @Override - public PooledRSocket set(int index, PooledRSocket element) { + public WeightedRSocket set(int index, WeightedRSocket element) { throw new UnsupportedOperationException(); } @Override - public void add(int index, PooledRSocket element) { + public void add(int index, WeightedRSocket element) { throw new UnsupportedOperationException(); } @Override - public PooledRSocket remove(int index) { + public WeightedRSocket remove(int index) { throw new UnsupportedOperationException(); } @@ -431,17 +431,17 @@ public int lastIndexOf(Object o) { } @Override - public ListIterator listIterator() { + public ListIterator listIterator() { throw new UnsupportedOperationException(); } @Override - public ListIterator listIterator(int index) { + public ListIterator listIterator(int index) { throw new UnsupportedOperationException(); } @Override - public List subList(int fromIndex, int toIndex) { + public List subList(int fromIndex, int toIndex) { throw new UnsupportedOperationException(); } } diff --git a/rsocket-core/src/main/java/io/rsocket/loadbalance/RoundRobinLoadbalanceStrategy.java b/rsocket-core/src/main/java/io/rsocket/loadbalance/RoundRobinLoadbalanceStrategy.java index 13bf96e1a..60227f9ac 100644 --- a/rsocket-core/src/main/java/io/rsocket/loadbalance/RoundRobinLoadbalanceStrategy.java +++ b/rsocket-core/src/main/java/io/rsocket/loadbalance/RoundRobinLoadbalanceStrategy.java @@ -26,7 +26,7 @@ public class RoundRobinLoadbalanceStrategy implements LoadbalanceStrategy { AtomicIntegerFieldUpdater.newUpdater(RoundRobinLoadbalanceStrategy.class, "nextIndex"); @Override - public PooledRSocket select(List sockets) { + public WeightedRSocket select(List sockets) { int length = sockets.size(); int indexToUse = Math.abs(NEXT_INDEX.getAndIncrement(this) % length); diff --git a/rsocket-core/src/main/java/io/rsocket/loadbalance/WeightedLoadbalanceStrategy.java b/rsocket-core/src/main/java/io/rsocket/loadbalance/WeightedLoadbalanceStrategy.java index 0a5195568..590da3ded 100644 --- a/rsocket-core/src/main/java/io/rsocket/loadbalance/WeightedLoadbalanceStrategy.java +++ b/rsocket-core/src/main/java/io/rsocket/loadbalance/WeightedLoadbalanceStrategy.java @@ -57,33 +57,33 @@ public Supplier statsSupplier() { } @Override - public PooledRSocket select(List sockets) { + public WeightedRSocket select(List sockets) { final int effort = this.effort; final int size = sockets.size(); - PooledRSocket pooledRSocket; + WeightedRSocket weightedRSocket; switch (size) { case 1: - pooledRSocket = sockets.get(0); + weightedRSocket = sockets.get(0); break; case 2: { - PooledRSocket rsc1 = sockets.get(0); - PooledRSocket rsc2 = sockets.get(1); + WeightedRSocket rsc1 = sockets.get(0); + WeightedRSocket rsc2 = sockets.get(1); double w1 = algorithmicWeight(rsc1); double w2 = algorithmicWeight(rsc2); if (w1 < w2) { - pooledRSocket = rsc2; + weightedRSocket = rsc2; } else { - pooledRSocket = rsc1; + weightedRSocket = rsc1; } } break; default: { - PooledRSocket rsc1 = null; - PooledRSocket rsc2 = null; + WeightedRSocket rsc1 = null; + WeightedRSocket rsc2 = null; for (int i = 0; i < effort; i++) { int i1 = ThreadLocalRandom.current().nextInt(size); @@ -102,23 +102,23 @@ public PooledRSocket select(List sockets) { double w1 = algorithmicWeight(rsc1); double w2 = algorithmicWeight(rsc2); if (w1 < w2) { - pooledRSocket = rsc2; + weightedRSocket = rsc2; } else { - pooledRSocket = rsc1; + weightedRSocket = rsc1; } } } - return pooledRSocket; + return weightedRSocket; } - private static double algorithmicWeight(@Nullable final PooledRSocket pooledRSocket) { - if (pooledRSocket == null - || pooledRSocket.isDisposed() - || pooledRSocket.availability() == 0.0) { + private static double algorithmicWeight(@Nullable final WeightedRSocket weightedRSocket) { + if (weightedRSocket == null + || weightedRSocket.isDisposed() + || weightedRSocket.availability() == 0.0) { return 0.0; } - final Stats stats = pooledRSocket.stats(); + final Stats stats = weightedRSocket.stats(); final int pending = stats.pending(); double latency = stats.predictedLatency(); @@ -135,7 +135,7 @@ private static double algorithmicWeight(@Nullable final PooledRSocket pooledRSoc latency *= calculateFactor(latency, high, bandWidth); } - return pooledRSocket.availability() * 1.0 / (1.0 + latency * (pending + 1)); + return weightedRSocket.availability() * 1.0 / (1.0 + latency * (pending + 1)); } private static double calculateFactor(final double u, final double l, final double bandWidth) { diff --git a/rsocket-core/src/main/java/io/rsocket/loadbalance/PooledRSocket.java b/rsocket-core/src/main/java/io/rsocket/loadbalance/WeightedRSocket.java similarity index 88% rename from rsocket-core/src/main/java/io/rsocket/loadbalance/PooledRSocket.java rename to rsocket-core/src/main/java/io/rsocket/loadbalance/WeightedRSocket.java index 825a8ac88..de9e56fa4 100644 --- a/rsocket-core/src/main/java/io/rsocket/loadbalance/PooledRSocket.java +++ b/rsocket-core/src/main/java/io/rsocket/loadbalance/WeightedRSocket.java @@ -17,9 +17,7 @@ import io.rsocket.RSocket; -public interface PooledRSocket extends RSocket { +public interface WeightedRSocket extends RSocket { Stats stats(); - - LoadbalanceRSocketSource source(); }