Skip to content

Commit

Permalink
changes from review
Browse files Browse the repository at this point in the history
  • Loading branch information
shyba committed May 12, 2022
1 parent e54cc88 commit 6298123
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 18 deletions.
10 changes: 5 additions & 5 deletions lbry/extras/daemon/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,14 +733,14 @@ def running(self):

async def announce_forever(self):
while True:
to_sleep = 60.0
to_announce = []
sleep_seconds = 60.0
announce_sd_hashes = []
for file in self.file_manager.get_filtered():
if not file.downloader:
continue
to_announce.append(bytes.fromhex(file.sd_hash))
await self.tracker_client.announce_many(*to_announce)
await asyncio.sleep(to_sleep)
announce_sd_hashes.append(bytes.fromhex(file.sd_hash))
await self.tracker_client.announce_many(*announce_sd_hashes)
await asyncio.sleep(sleep_seconds)

async def start(self):
node = self.component_manager.get_component(DHT_COMPONENT) \
Expand Down
26 changes: 13 additions & 13 deletions lbry/torrent/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
ScrapeResponse = namedtuple("ScrapeResponse", ["action", "transaction_id", "items"])
ScrapeResponseItem = namedtuple("ScrapeResponseItem", ["seeders", "completed", "leechers"])
ErrorResponse = namedtuple("ErrorResponse", ["action", "transaction_id", "message"])
STRUCTS = {
structs = {
ConnectRequest: struct.Struct(">QII"),
ConnectResponse: struct.Struct(">IIQ"),
AnnounceRequest: struct.Struct(">QII20s20sQQQIIIiH"),
Expand All @@ -45,26 +45,26 @@


def decode(cls, data, offset=0):
decoder = STRUCTS[cls]
if cls == AnnounceResponse:
decoder = structs[cls]
if cls is AnnounceResponse:
return AnnounceResponse(*decoder.unpack_from(data, offset),
peers=[decode(CompactIPv4Peer, data, index) for index in range(20, len(data), 6)])
elif cls == ScrapeResponse:
elif cls is ScrapeResponse:
return ScrapeResponse(*decoder.unpack_from(data, offset),
items=[decode(ScrapeResponseItem, data, index) for index in range(8, len(data), 12)])
elif cls == ErrorResponse:
elif cls is ErrorResponse:
return ErrorResponse(*decoder.unpack_from(data, offset), data[decoder.size:])
return cls(*decoder.unpack_from(data, offset))


def encode(obj):
if isinstance(obj, ScrapeRequest):
return STRUCTS[ScrapeRequest].pack(*obj[:-1]) + b''.join(obj.infohashes)
return structs[ScrapeRequest].pack(*obj[:-1]) + b''.join(obj.infohashes)
elif isinstance(obj, ErrorResponse):
return STRUCTS[ErrorResponse].pack(*obj[:-1]) + obj.message
return structs[ErrorResponse].pack(*obj[:-1]) + obj.message
elif isinstance(obj, AnnounceResponse):
return STRUCTS[AnnounceResponse].pack(*obj[:-1]) + b''.join([encode(peer) for peer in obj.peers])
return STRUCTS[type(obj)].pack(*obj)
return structs[AnnounceResponse].pack(*obj[:-1]) + b''.join([encode(peer) for peer in obj.peers])
return structs[type(obj)].pack(*obj)


def make_peer_id(random_part: Optional[str] = None) -> bytes:
Expand Down Expand Up @@ -136,7 +136,7 @@ def connection_lost(self, exc: Exception = None) -> None:


class TrackerClient:
EVENT_CONTROLLER = StreamController()
event_controller = StreamController()

def __init__(self, node_id, announce_port, get_servers, timeout=10.0):
self.client = UDPTrackerClientProtocol(timeout=timeout)
Expand All @@ -150,7 +150,7 @@ def __init__(self, node_id, announce_port, get_servers, timeout=10.0):
async def start(self):
self.transport, _ = await asyncio.get_running_loop().create_datagram_endpoint(
lambda: self.client, local_addr=("0.0.0.0", 0))
self.EVENT_CONTROLLER.stream.listen(
self.event_controller.stream.listen(
lambda request: self.on_hash(request[1], request[2]) if request[0] == 'search' else None)

def stop(self):
Expand All @@ -160,7 +160,7 @@ def stop(self):
self.transport.close()
self.client = None
self.transport = None
self.EVENT_CONTROLLER.close()
self.event_controller.close()

def on_hash(self, info_hash, on_announcement=None):
if info_hash not in self.tasks:
Expand Down Expand Up @@ -230,7 +230,7 @@ async def on_announcement(announcement: AnnounceResponse):
peers = await announcement_to_kademlia_peers(announcement)
log.info("Found %d peers from tracker for %s", len(peers), info_hash.hex()[:8])
peer_q.put_nowait(peers)
TrackerClient.EVENT_CONTROLLER.add(('search', info_hash, on_announcement))
TrackerClient.event_controller.add(('search', info_hash, on_announcement))


def announcement_to_kademlia_peers(*announcements: AnnounceResponse):
Expand Down

0 comments on commit 6298123

Please sign in to comment.