Skip to content

Commit

Permalink
# TODO: stop dropping tasks on the floor (#18866)
Browse files Browse the repository at this point in the history
* # TODO: stop dropping tasks on the floor

* Update chia/_tests/core/data_layer/test_data_rpc.py

* no really, stop
  • Loading branch information
altendky authored Nov 18, 2024
1 parent 817427c commit 748c8ce
Show file tree
Hide file tree
Showing 11 changed files with 30 additions and 0 deletions.
2 changes: 2 additions & 0 deletions chia/_tests/core/full_node/test_full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,7 @@ async def suppress_value_error(coro: Coroutine) -> None:
uint32(0),
blocks_reorg[-2].reward_chain_block.get_unfinished().get_hash(),
)
# TODO: stop dropping tasks on the floor
asyncio.create_task(suppress_value_error(full_node_1.new_peak(new_peak, dummy_peer))) # noqa: RUF006
await time_out_assert(10, time_out_messages(incoming_queue, "request_block", 0))

Expand All @@ -823,6 +824,7 @@ async def suppress_value_error(coro: Coroutine) -> None:
uint32(0),
blocks_reorg[-1].reward_chain_block.get_unfinished().get_hash(),
)
# TODO: stop dropping tasks on the floor
asyncio.create_task(suppress_value_error(full_node_1.new_peak(new_peak, dummy_peer))) # noqa: RUF006
await time_out_assert(10, time_out_messages(incoming_queue, "request_block", 1))

Expand Down
2 changes: 2 additions & 0 deletions chia/daemon/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ async def listener_task() -> None:
finally:
await self.close()

# TODO: stop dropping tasks on the floor
asyncio.create_task(listener_task()) # noqa: RUF006
await asyncio.sleep(1)

Expand All @@ -91,6 +92,7 @@ async def _get(self, request: WsRpcMessage) -> WsRpcMessage:
string = dict_to_json_str(request)
if self.websocket is None or self.websocket.closed:
raise Exception("Websocket is not connected")
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.websocket.send_str(string)) # noqa: RUF006
try:
await asyncio.wait_for(self._request_dict[request_id].wait(), timeout=30)
Expand Down
1 change: 1 addition & 0 deletions chia/daemon/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,7 @@ async def start_plotting(self, websocket: WebSocketResponse, request: dict[str,
log.info(f"Plotting will start in {config['delay']} seconds")
# TODO: loop gets passed down a lot, review for potential removal
loop = asyncio.get_running_loop()
# TODO: stop dropping tasks on the floor
loop.create_task(self._start_plotting(id, loop, queue)) # noqa: RUF006
else:
log.info("Plotting will start automatically when previous plotting finish")
Expand Down
2 changes: 2 additions & 0 deletions chia/farmer/farmer.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,10 @@ async def start_task() -> None:
if sys.getprofile() is not None:
self.log.warning("not enabling profiler, getprofile() is already set")
else:
# TODO: stop dropping tasks on the floor
asyncio.create_task(profile_task(self._root_path, "farmer", self.log)) # noqa: RUF006

# TODO: stop dropping tasks on the floor
asyncio.create_task(start_task()) # noqa: RUF006
try:
yield
Expand Down
5 changes: 5 additions & 0 deletions chia/full_node/full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ async def manage(self) -> AsyncIterator[None]:
self._init_weight_proof = asyncio.create_task(self.initialize_weight_proof())

if self.config.get("enable_profiler", False):
# TODO: stop dropping tasks on the floor
asyncio.create_task(profile_task(self.root_path, "node", self.log)) # noqa: RUF006

self.profile_block_validation = self.config.get("profile_block_validation", False)
Expand All @@ -291,6 +292,7 @@ async def manage(self) -> AsyncIterator[None]:
profile_dir.mkdir(parents=True, exist_ok=True)

if self.config.get("enable_memory_profiler", False):
# TODO: stop dropping tasks on the floor
asyncio.create_task(mem_profile_task(self.root_path, "node", self.log)) # noqa: RUF006

time_taken = time.monotonic() - start_time
Expand Down Expand Up @@ -338,6 +340,7 @@ async def manage(self) -> AsyncIterator[None]:

self.initialized = True
if self.full_node_peers is not None:
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.full_node_peers.start()) # noqa: RUF006
try:
yield
Expand All @@ -354,6 +357,7 @@ async def manage(self) -> AsyncIterator[None]:
self.mempool_manager.shut_down()

if self.full_node_peers is not None:
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.full_node_peers.close()) # noqa: RUF006
if self.uncompact_task is not None:
self.uncompact_task.cancel()
Expand Down Expand Up @@ -891,6 +895,7 @@ async def on_connect(self, connection: WSChiaConnection) -> None:
self._state_changed("add_connection")
self._state_changed("sync_mode")
if self.full_node_peers is not None:
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.full_node_peers.on_connect(connection)) # noqa: RUF006

if self.initialized is False:
Expand Down
2 changes: 2 additions & 0 deletions chia/full_node/full_node_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ async def eventually_clear() -> None:
await asyncio.sleep(5)
self.full_node.full_node_store.remove_requesting_unfinished_block(block_hash, None)

# TODO: stop dropping tasks on the floor
asyncio.create_task(eventually_clear()) # noqa: RUF006

return msg
Expand Down Expand Up @@ -527,6 +528,7 @@ async def eventually_clear() -> None:
await asyncio.sleep(5)
self.full_node.full_node_store.remove_requesting_unfinished_block(block_hash, foliage_hash)

# TODO: stop dropping tasks on the floor
asyncio.create_task(eventually_clear()) # noqa: RUF006

return msg
Expand Down
1 change: 1 addition & 0 deletions chia/rpc/rpc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ async def _state_changed(self, change: str, change_data: Optional[dict[str, Any]
def state_changed(self, change: str, change_data: Optional[dict[str, Any]] = None) -> None:
if self.websocket is None or self.websocket.closed:
return None
# TODO: stop dropping tasks on the floor
asyncio.create_task(self._state_changed(change, change_data)) # noqa: RUF006

@property
Expand Down
2 changes: 2 additions & 0 deletions chia/seeder/dns_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
dns_request: Optional[DNSRecord] = parse_dns_request(data)
if dns_request is None: # Invalid Request, we can just drop it and move on.
return
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.handler(dns_request, addr)) # noqa: RUF006

async def respond(self) -> None:
Expand Down Expand Up @@ -192,6 +193,7 @@ def eof_received(self) -> Optional[bool]:
f"Received incomplete TCP DNS request of length {self.expected_length} from {self.peer_info}, "
f"closing connection after dns replies are sent."
)
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.wait_for_futures()) # noqa: RUF006
return True # Keep connection open, until the futures are done.
log.info(f"Received early EOF from {self.peer_info}, closing connection.")
Expand Down
1 change: 1 addition & 0 deletions chia/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ async def start_client(
self.log.info(f"Connected with {connection_type_str} {target_node}")
else:
self.log.debug(f"Successful feeler connection with {connection_type_str} {target_node}")
# TODO: stop dropping tasks on the floor
asyncio.create_task(connection.close()) # noqa: RUF006
return True
except client_exceptions.ClientConnectorError as e:
Expand Down
8 changes: 8 additions & 0 deletions chia/server/ws_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ async def _send_message(self, message: Message) -> None:

# TODO: fix this special case. This function has rate limits which are too low.
if ProtocolMessageTypes(message.type) != ProtocolMessageTypes.respond_peers:
# TODO: stop dropping tasks on the floor
asyncio.create_task(self._wait_and_retry(message)) # noqa: RUF006

return None
Expand Down Expand Up @@ -668,6 +669,7 @@ async def _read_one_message(self) -> Optional[Message]:
f"{self.peer_server_port}/"
f"{self.peer_info.port}"
)
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.close()) # noqa: RUF006
await asyncio.sleep(3)
elif message.type == WSMsgType.CLOSE:
Expand All @@ -676,10 +678,12 @@ async def _read_one_message(self) -> Optional[Message]:
f"{self.peer_server_port}/"
f"{self.peer_info.port}"
)
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.close()) # noqa: RUF006
await asyncio.sleep(3)
elif message.type == WSMsgType.CLOSED:
if not self.closed:
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.close()) # noqa: RUF006
await asyncio.sleep(3)
return None
Expand All @@ -701,6 +705,7 @@ async def _read_one_message(self) -> Optional[Message]:
f"message: {message_type}"
)
# Only full node disconnects peers, to prevent abuse and crashing timelords, farmers, etc
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.close(300)) # noqa: RUF006
await asyncio.sleep(3)
return None
Expand All @@ -714,13 +719,16 @@ async def _read_one_message(self) -> Optional[Message]:
elif message.type == WSMsgType.ERROR:
self.log.error(f"WebSocket Error: {message}")
if isinstance(message.data, WebSocketError) and message.data.code == WSCloseCode.MESSAGE_TOO_BIG:
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.close(300)) # noqa: RUF006
else:
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.close()) # noqa: RUF006
await asyncio.sleep(3)

else:
self.log.error(f"Unexpected WebSocket message type: {message}")
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.close()) # noqa: RUF006
await asyncio.sleep(3)
return None
Expand Down
4 changes: 4 additions & 0 deletions chia/wallet/wallet_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,11 @@ async def _start_with_fingerprint(
if sys.getprofile() is not None:
self.log.warning("not enabling profiler, getprofile() is already set")
else:
# TODO: stop dropping tasks on the floor
asyncio.create_task(profile_task(self.root_path, "wallet", self.log)) # noqa: RUF006

if self.config.get("enable_memory_profiler", False):
# TODO: stop dropping tasks on the floor
asyncio.create_task(mem_profile_task(self.root_path, "wallet", self.log)) # noqa: RUF006

path: Path = get_wallet_db_path(self.root_path, self.config, str(fingerprint))
Expand Down Expand Up @@ -517,6 +519,7 @@ def _set_state_changed_callback(self, callback: StateChangedProtocol) -> None:
def _pending_tx_handler(self) -> None:
if self._wallet_state_manager is None:
return None
# TODO: stop dropping tasks on the floor
asyncio.create_task(self._resend_queue()) # noqa: RUF006

async def _resend_queue(self) -> None:
Expand Down Expand Up @@ -718,6 +721,7 @@ def initialize_wallet_peers(self) -> None:
default_port,
self.log,
)
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.wallet_peers.start()) # noqa: RUF006

async def on_disconnect(self, peer: WSChiaConnection) -> None:
Expand Down

0 comments on commit 748c8ce

Please sign in to comment.