Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changes LoadbalanceStrategy to accept List #919

Merged
merged 3 commits into from
Aug 24, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
*/
package io.rsocket.loadbalance;

import java.util.List;
import java.util.function.Supplier;

@FunctionalInterface
public interface LoadbalanceStrategy {

PooledRSocket select(PooledRSocket[] availableRSockets);
PooledRSocket select(List<PooledRSocket> availableRSockets);

default Supplier<Stats> statsSupplier() {
return Stats::noOps;
Expand Down
123 changes: 121 additions & 2 deletions rsocket-core/src/main/java/io/rsocket/loadbalance/RSocketPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
import io.rsocket.RSocket;
import io.rsocket.frame.FrameType;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Supplier;
Expand All @@ -34,7 +37,7 @@
import reactor.util.annotation.Nullable;

class RSocketPool extends ResolvingOperator<Void>
implements CoreSubscriber<List<LoadbalanceRSocketSource>> {
implements CoreSubscriber<List<LoadbalanceRSocketSource>>, List<PooledRSocket> {

final DeferredResolutionRSocket deferredResolutionRSocket = new DeferredResolutionRSocket(this);
final LoadbalanceStrategy loadbalanceStrategy;
Expand Down Expand Up @@ -200,7 +203,33 @@ RSocket doSelect() {
return null;
}

return this.loadbalanceStrategy.select(sockets);
return this.loadbalanceStrategy.select(this);
}

@Override
public PooledRSocket get(int index) {
return activeSockets[index];
}

@Override
public int size() {
return activeSockets.length;
}

@Override
public boolean isEmpty() {
return activeSockets.length == 0;
}

@Override
public Object[] toArray() {
return activeSockets;
}

@Override
@SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a) {
return (T[]) activeSockets;
}

static class DeferredResolutionRSocket implements RSocket {
Expand Down Expand Up @@ -325,4 +354,94 @@ public void accept(Void aVoid, Throwable t) {
}
}
}

@Override
public boolean contains(Object o) {
throw new UnsupportedOperationException();
}

@Override
public Iterator<PooledRSocket> iterator() {
throw new UnsupportedOperationException();
}

@Override
public boolean add(PooledRSocket pooledRSocket) {
throw new UnsupportedOperationException();
}

@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException();
}

@Override
public boolean containsAll(Collection<?> c) {
throw new UnsupportedOperationException();
}

@Override
public boolean addAll(Collection<? extends PooledRSocket> c) {
throw new UnsupportedOperationException();
}

@Override
public boolean addAll(int index, Collection<? extends PooledRSocket> c) {
throw new UnsupportedOperationException();
}

@Override
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException();
}

@Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException();
}

@Override
public void clear() {
throw new UnsupportedOperationException();
}

@Override
public PooledRSocket set(int index, PooledRSocket element) {
throw new UnsupportedOperationException();
}

@Override
public void add(int index, PooledRSocket element) {
throw new UnsupportedOperationException();
}

@Override
public PooledRSocket remove(int index) {
throw new UnsupportedOperationException();
}

@Override
public int indexOf(Object o) {
throw new UnsupportedOperationException();
}

@Override
public int lastIndexOf(Object o) {
throw new UnsupportedOperationException();
}

@Override
public ListIterator<PooledRSocket> listIterator() {
throw new UnsupportedOperationException();
}

@Override
public ListIterator<PooledRSocket> listIterator(int index) {
throw new UnsupportedOperationException();
}

@Override
public List<PooledRSocket> subList(int fromIndex, int toIndex) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.rsocket.loadbalance;

import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

public class RoundRobinLoadbalanceStrategy implements LoadbalanceStrategy {
Expand All @@ -25,12 +26,11 @@ public class RoundRobinLoadbalanceStrategy implements LoadbalanceStrategy {
AtomicIntegerFieldUpdater.newUpdater(RoundRobinLoadbalanceStrategy.class, "nextIndex");

@Override
public PooledRSocket select(PooledRSocket[] sockets) {
int length = sockets.length;
public PooledRSocket select(List<PooledRSocket> sockets) {
int length = sockets.size();

int indexToUse = Math.abs(NEXT_INDEX.getAndIncrement(this) % length);

final PooledRSocket pooledRSocket = sockets[indexToUse];
return pooledRSocket;
return sockets.get(indexToUse);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.rsocket.loadbalance;

import java.util.List;
import java.util.SplittableRandom;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
Expand Down Expand Up @@ -56,19 +57,19 @@ public Supplier<Stats> statsSupplier() {
}

@Override
public PooledRSocket select(PooledRSocket[] sockets) {
public PooledRSocket select(List<PooledRSocket> sockets) {
final int effort = this.effort;
final int size = sockets.length;
final int size = sockets.size();

PooledRSocket pooledRSocket;
switch (size) {
case 1:
pooledRSocket = sockets[0];
pooledRSocket = sockets.get(0);
break;
case 2:
{
PooledRSocket rsc1 = sockets[0];
PooledRSocket rsc2 = sockets[1];
PooledRSocket rsc1 = sockets.get(0);
PooledRSocket rsc2 = sockets.get(1);

double w1 = algorithmicWeight(rsc1);
double w2 = algorithmicWeight(rsc2);
Expand All @@ -91,8 +92,8 @@ public PooledRSocket select(PooledRSocket[] sockets) {
if (i2 >= i1) {
i2++;
}
rsc1 = sockets[i1];
rsc2 = sockets[i2];
rsc1 = sockets.get(i1);
rsc2 = sockets.get(i2);
if (rsc1.availability() > 0.0 && rsc2.availability() > 0.0) {
break;
}
Expand Down