Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peer hashcode fix #194

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion bt-core/src/main/java/bt/data/PeerBitfield.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@
import bt.protocol.Protocols;

import java.util.BitSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.function.BiConsumer;
import java.util.function.IntConsumer;

public class PeerBitfield extends Bitfield {
private final AtomicInteger piecesLeft;

/**
* Create an empty bitfield for a peer
*
* @param piecesTotal the total number of pieces in torrent
*/
public PeerBitfield(int piecesTotal) {
super(piecesTotal);
this.piecesLeft = new AtomicInteger(piecesTotal);
}

/**
Expand All @@ -29,6 +33,7 @@ public PeerBitfield(int piecesTotal) {
*/
public PeerBitfield(byte[] value, BitOrder bitOrder, int piecesTotal) {
super(piecesTotal, createBitmask(value, bitOrder, piecesTotal));
this.piecesLeft = new AtomicInteger(super.getPiecesIncomplete());
}

private static BitSet createBitmask(byte[] bytes, BitOrder bitOrder, int piecesTotal) {
Expand Down Expand Up @@ -59,7 +64,11 @@ private static BitSet createBitmask(byte[] bytes, BitOrder bitOrder, int piecesT
* @since 1.10
*/
public boolean markPeerPieceVerified(int pieceIndex) {
return checkAndMarkVerified(pieceIndex);
boolean newlyVerified = checkAndMarkVerified(pieceIndex);
if (newlyVerified) {
piecesLeft.decrementAndGet();
}
return newlyVerified;
}


Expand All @@ -71,4 +80,9 @@ public void forEachVerifiedPiece(IntConsumer consumer) {
lock.readLock().unlock();
}
}

@Override
public int getPiecesIncomplete() {
return piecesLeft.get();
}
}
52 changes: 33 additions & 19 deletions bt-core/src/main/java/bt/event/EventBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;

/**
* Basic implementation of event bus, that connects event producers and listeners.
Expand All @@ -39,9 +41,9 @@
public class EventBus implements EventSink, EventSource {
private static final Logger LOGGER = LoggerFactory.getLogger(EventBus.class);

private final ConcurrentMap<Class<? extends BaseEvent>, Collection<Consumer<? extends BaseEvent>>> listeners;
private final ConcurrentMap<Class<? extends BaseEvent>, Collection<Predicate<? extends BaseEvent>>> listeners;

private final ConcurrentMap<TorrentId, ConcurrentMap<Class<? extends BaseEvent>, Collection<Consumer<? extends BaseEvent>>>> listenersOnTorrent;
private final ConcurrentMap<TorrentId, ConcurrentMap<Class<? extends BaseEvent>, Collection<Predicate<? extends BaseEvent>>>> listenersOnTorrent;

private final ReentrantReadWriteLock eventLock;

Expand All @@ -63,12 +65,13 @@ public synchronized void firePeerDiscovered(TorrentId torrentId, Peer peer) {
}

@Override
public synchronized void firePeerConnected(ConnectionKey connectionKey) {
public synchronized boolean firePeerConnected(ConnectionKey connectionKey) {
long timestamp = System.currentTimeMillis();
if (hasListeners(PeerConnectedEvent.class, connectionKey.getTorrentId())) {
long id = nextId();
fireEvent(new PeerConnectedEvent(id, timestamp, connectionKey), connectionKey.getTorrentId());
return fireEvent(new PeerConnectedEvent(id, timestamp, connectionKey), connectionKey.getTorrentId());
}
return true;
}

@Override
Expand Down Expand Up @@ -129,7 +132,7 @@ public void firePieceVerified(TorrentId torrentId, int pieceIndex) {
}

private boolean hasListeners(Class<? extends BaseEvent> eventType, TorrentId torrentId) {
Collection<Consumer<? extends BaseEvent>> listeners = this.listeners.get(eventType);
Collection<Predicate<? extends BaseEvent>> listeners = this.listeners.get(eventType);
if (listeners != null && !listeners.isEmpty()) {
return true;
}
Expand All @@ -138,7 +141,7 @@ private boolean hasListeners(Class<? extends BaseEvent> eventType, TorrentId tor
return false;
}

ConcurrentMap<Class<? extends BaseEvent>, Collection<Consumer<? extends BaseEvent>>> map = this.listenersOnTorrent.get(torrentId);
ConcurrentMap<Class<? extends BaseEvent>, Collection<Predicate<? extends BaseEvent>>> map = this.listenersOnTorrent.get(torrentId);
if (map == null) {
return false;
}
Expand All @@ -151,35 +154,38 @@ private synchronized long nextId() {
return ++idSequence;
}

private <E extends BaseEvent> void fireEvent(E event, TorrentId torrentId) {
private <E extends BaseEvent> boolean fireEvent(E event, TorrentId torrentId) {
eventLock.readLock().lock();
try {
Collection<Consumer<? extends BaseEvent>> listeners = this.listeners.get(event.getClass());
doFireEvent(event, listeners, "Firing event: {}. General Listeners count: {}");
Collection<Predicate<? extends BaseEvent>> listeners = this.listeners.get(event.getClass());
boolean ret = doFireEvent(event, listeners, "Firing event: {}. General Listeners count: {}");
if (torrentId != null) {
ConcurrentMap<Class<? extends BaseEvent>, Collection<Consumer<? extends BaseEvent>>> map = this.listenersOnTorrent.get(torrentId);
ConcurrentMap<Class<? extends BaseEvent>, Collection<Predicate<? extends BaseEvent>>> map = this.listenersOnTorrent.get(torrentId);
if (map != null) {
listeners = map.get(event.getClass());
doFireEvent(event, listeners, "Firing event: {}. Torrent Listeners count: {}");
ret &= doFireEvent(event, listeners, "Firing event: {}. Torrent Listeners count: {}");
}
}
return ret;
} finally {
eventLock.readLock().unlock();
}
}

private <E extends BaseEvent> void doFireEvent(E event, Collection<Consumer<? extends BaseEvent>> listeners, String s) {
private <E extends BaseEvent> boolean doFireEvent(E event, Collection<Predicate<? extends BaseEvent>> listeners, String s) {
if (LOGGER.isTraceEnabled()) {
int count = (listeners == null) ? 0 : listeners.size();
LOGGER.trace(s, event, count);
}
boolean ret = true;
if (listeners != null && !listeners.isEmpty()) {
for (Consumer<? extends BaseEvent> listener : listeners) {
for (Predicate<? extends BaseEvent> listener : listeners) {
@SuppressWarnings("unchecked")
Consumer<E> _listener = (Consumer<E>) listener;
_listener.accept(event);
Predicate<E> _listener = (Predicate<E>) listener;
ret &= _listener.test(event);
}
}
return ret;
}

@Override
Expand All @@ -189,7 +195,7 @@ public EventSource onPeerDiscovered(TorrentId torrentId, Consumer<PeerDiscovered
}

@Override
public EventSource onPeerConnected(TorrentId torrentId, Consumer<PeerConnectedEvent> listener) {
public EventSource onPeerConnected(TorrentId torrentId, Predicate<PeerConnectedEvent> listener) {
addListener(PeerConnectedEvent.class, torrentId, listener);
return this;
}
Expand Down Expand Up @@ -231,7 +237,14 @@ public EventSource onPieceVerified(TorrentId torrentId, Consumer<PieceVerifiedEv
}

private <E extends BaseEvent> void addListener(Class<E> eventType, TorrentId torrentId, Consumer<E> listener) {
Collection<Consumer<? extends BaseEvent>> listeners;
addListener(eventType, torrentId, (e) -> {
listener.accept(e);
return true;
});
}

private <E extends BaseEvent> void addListener(Class<E> eventType, TorrentId torrentId, Predicate<E> listener) {
Collection<Predicate<? extends BaseEvent>> listeners;
if (torrentId == null) {
listeners = this.listeners.computeIfAbsent(eventType, key -> ConcurrentHashMap.newKeySet());
} else {
Expand All @@ -241,11 +254,12 @@ private <E extends BaseEvent> void addListener(Class<E> eventType, TorrentId tor

eventLock.writeLock().lock();
try {
Consumer<E> safeListener = event -> {
Predicate<E> safeListener = event -> {
try {
listener.accept(event);
return listener.test(event);
} catch (Exception ex) {
LOGGER.error("Listener invocation failed", ex);
return false;
}
};
listeners.add(safeListener);
Expand Down
4 changes: 3 additions & 1 deletion bt-core/src/main/java/bt/event/EventSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ public interface EventSink {
/**
* Generate event, that a new connection with some peer has been established.
*
* @param connectionKey the connection key
* @return true if the peer was successfully connected. False otherwise.
* @since 1.9
*/
void firePeerConnected(ConnectionKey connectionKey);
boolean firePeerConnected(ConnectionKey connectionKey);

/**
* Generate event, that a connection with some peer has been terminated.
Expand Down
20 changes: 18 additions & 2 deletions bt-core/src/main/java/bt/event/EventSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import bt.metainfo.TorrentId;

import java.util.function.Consumer;
import java.util.function.Predicate;

/**
* Provides API for hooking into the stream of runtime events.
Expand All @@ -35,11 +36,26 @@ public interface EventSource {
EventSource onPeerDiscovered(TorrentId torrentId, Consumer<PeerDiscoveredEvent> listener);

/**
* Fired, when a new connection with some peer has been established.
* Fired, when a new peer has been discovered for some torrent.
*
* @since 1.5
* @deprecated
*/
default EventSource onPeerConnected(TorrentId torrentId, Consumer<PeerConnectedEvent> listener) {
return onPeerConnected(torrentId, pe -> {
listener.accept(pe);
return true;
});
}

/**
* Fired, when a new connection with some peer has been established.
*
* @param torrentId the torrent id to connect this listener to, or null if it should be run on all torrents
* @param listener the listener to run. Returns true if peer was connected successfully, false on error
* @since 1.10
*/
EventSource onPeerConnected(TorrentId torrentId, Consumer<PeerConnectedEvent> listener);
EventSource onPeerConnected(TorrentId torrentId, Predicate<PeerConnectedEvent> listener);

/**
* Fired, when a connection with some peer has been terminated.
Expand Down
4 changes: 2 additions & 2 deletions bt-core/src/main/java/bt/event/PeerBitfieldUpdatedEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import bt.data.Bitfield;
import bt.metainfo.TorrentId;
import bt.net.ConnectionKey;
import bt.net.Peer;
import bt.net.peer.InetPeer;

/**
* Indicates, that local information about some peer's data has been updated.
Expand All @@ -46,7 +46,7 @@ public TorrentId getTorrentId() {
/**
* @since 1.5
*/
public Peer getPeer() {
public InetPeer getPeer() {
return connectionKey.getPeer();
}

Expand Down
4 changes: 2 additions & 2 deletions bt-core/src/main/java/bt/event/PeerConnectedEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import bt.metainfo.TorrentId;
import bt.net.ConnectionKey;
import bt.net.Peer;
import bt.net.peer.InetPeer;

import java.util.Objects;

Expand All @@ -44,7 +44,7 @@ public TorrentId getTorrentId() {
/**
* @since 1.5
*/
public Peer getPeer() {
public InetPeer getPeer() {
return connectionKey.getPeer();
}

Expand Down
4 changes: 2 additions & 2 deletions bt-core/src/main/java/bt/event/PeerDisconnectedEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import bt.metainfo.TorrentId;
import bt.net.ConnectionKey;
import bt.net.Peer;
import bt.net.peer.InetPeer;

import java.util.Objects;

Expand All @@ -44,7 +44,7 @@ public TorrentId getTorrentId() {
/**
* @since 1.5
*/
public Peer getPeer() {
public InetPeer getPeer() {
return connectionKey.getPeer();
}

Expand Down
3 changes: 2 additions & 1 deletion bt-core/src/main/java/bt/magnet/UtMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package bt.magnet;

import bt.protocol.extended.ExtendedMessage;
import bt.torrent.messaging.MetadataConsumer;

import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -68,7 +69,7 @@ static Type forId(int id) {
}
}

private static final String id = "ut_metadata";
private static final String id = MetadataConsumer.UT_METADATA_EXTENSION;
private static final String messageTypeField = "msg_type";
private static final String pieceIndexField = "piece";
private static final String totalSizeField = "total_size";
Expand Down
5 changes: 3 additions & 2 deletions bt-core/src/main/java/bt/net/BitfieldConnectionHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package bt.net;

import bt.data.Bitfield;
import bt.net.peer.InetPeer;
import bt.protocol.BitOrder;
import bt.protocol.Handshake;
import bt.torrent.TorrentDescriptor;
Expand All @@ -33,7 +34,7 @@
*/
public class BitfieldConnectionHandler implements HandshakeHandler {

private TorrentRegistry torrentRegistry;
private final TorrentRegistry torrentRegistry;

@Inject
public BitfieldConnectionHandler(TorrentRegistry torrentRegistry) {
Expand All @@ -48,7 +49,7 @@ public void processIncomingHandshake(PeerConnection connection, Handshake peerHa
Bitfield bitfield = descriptorOptional.get().getDataDescriptor().getBitfield();

if (bitfield.getPiecesComplete() > 0) {
Peer peer = connection.getRemotePeer();
InetPeer peer = connection.getRemotePeer();
bt.protocol.Bitfield bitfieldMessage = new bt.protocol.Bitfield(bitfield.toByteArray(BitOrder.LITTLE_ENDIAN));
try {
connection.postMessage(bitfieldMessage);
Expand Down
7 changes: 4 additions & 3 deletions bt-core/src/main/java/bt/net/ConnectionKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,25 @@
package bt.net;

import bt.metainfo.TorrentId;
import bt.net.peer.InetPeer;
import com.google.common.base.MoreObjects;

import java.util.Objects;

public class ConnectionKey {
private final Peer peer;
private final InetPeer peer;
private final int remotePort;
private final TorrentId torrentId;

public ConnectionKey(Peer peer, int remotePort, TorrentId torrentId) {
public ConnectionKey(InetPeer peer, int remotePort, TorrentId torrentId) {
Objects.requireNonNull(peer);
Objects.requireNonNull(torrentId);
this.peer = peer;
this.remotePort = remotePort;
this.torrentId = torrentId;
}

public Peer getPeer() {
public InetPeer getPeer() {
return peer;
}

Expand Down
8 changes: 8 additions & 0 deletions bt-core/src/main/java/bt/net/ConnectionRoutine.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package bt.net;

import java.net.SocketAddress;
import java.time.Instant;

/**
* Encapsulates a procedure for establishing the connection.
Expand All @@ -43,4 +44,11 @@ public interface ConnectionRoutine {
* @since 1.6
*/
void cancel();

/**
* Get when this connection was established
*
* @return when this connection was established
*/
Instant getConnectionEstablishedTimestamp();
}
Loading