From 748c8cec8e24c0feabc10e8db22284dc3712180f Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Mon, 18 Nov 2024 14:32:52 -0500 Subject: [PATCH] `# TODO: stop dropping tasks on the floor` (#18866) * # TODO: stop dropping tasks on the floor * Update chia/_tests/core/data_layer/test_data_rpc.py * no really, stop --- chia/_tests/core/full_node/test_full_node.py | 2 ++ chia/daemon/client.py | 2 ++ chia/daemon/server.py | 1 + chia/farmer/farmer.py | 2 ++ chia/full_node/full_node.py | 5 +++++ chia/full_node/full_node_api.py | 2 ++ chia/rpc/rpc_server.py | 1 + chia/seeder/dns_server.py | 2 ++ chia/server/server.py | 1 + chia/server/ws_connection.py | 8 ++++++++ chia/wallet/wallet_node.py | 4 ++++ 11 files changed, 30 insertions(+) diff --git a/chia/_tests/core/full_node/test_full_node.py b/chia/_tests/core/full_node/test_full_node.py index 3c1835bdb0e5..08e62ac354f6 100644 --- a/chia/_tests/core/full_node/test_full_node.py +++ b/chia/_tests/core/full_node/test_full_node.py @@ -812,6 +812,7 @@ async def suppress_value_error(coro: Coroutine) -> None: uint32(0), blocks_reorg[-2].reward_chain_block.get_unfinished().get_hash(), ) + # TODO: stop dropping tasks on the floor asyncio.create_task(suppress_value_error(full_node_1.new_peak(new_peak, dummy_peer))) # noqa: RUF006 await time_out_assert(10, time_out_messages(incoming_queue, "request_block", 0)) @@ -823,6 +824,7 @@ async def suppress_value_error(coro: Coroutine) -> None: uint32(0), blocks_reorg[-1].reward_chain_block.get_unfinished().get_hash(), ) + # TODO: stop dropping tasks on the floor asyncio.create_task(suppress_value_error(full_node_1.new_peak(new_peak, dummy_peer))) # noqa: RUF006 await time_out_assert(10, time_out_messages(incoming_queue, "request_block", 1)) diff --git a/chia/daemon/client.py b/chia/daemon/client.py index 38f066b9d1dd..5597a68041b4 100644 --- a/chia/daemon/client.py +++ b/chia/daemon/client.py @@ -67,6 +67,7 @@ async def listener_task() -> None: finally: await self.close() + # TODO: stop dropping tasks on the floor asyncio.create_task(listener_task()) # noqa: RUF006 await asyncio.sleep(1) @@ -91,6 +92,7 @@ async def _get(self, request: WsRpcMessage) -> WsRpcMessage: string = dict_to_json_str(request) if self.websocket is None or self.websocket.closed: raise Exception("Websocket is not connected") + # TODO: stop dropping tasks on the floor asyncio.create_task(self.websocket.send_str(string)) # noqa: RUF006 try: await asyncio.wait_for(self._request_dict[request_id].wait(), timeout=30) diff --git a/chia/daemon/server.py b/chia/daemon/server.py index d537fd7e9436..e7e9709aaed7 100644 --- a/chia/daemon/server.py +++ b/chia/daemon/server.py @@ -1192,6 +1192,7 @@ async def start_plotting(self, websocket: WebSocketResponse, request: dict[str, log.info(f"Plotting will start in {config['delay']} seconds") # TODO: loop gets passed down a lot, review for potential removal loop = asyncio.get_running_loop() + # TODO: stop dropping tasks on the floor loop.create_task(self._start_plotting(id, loop, queue)) # noqa: RUF006 else: log.info("Plotting will start automatically when previous plotting finish") diff --git a/chia/farmer/farmer.py b/chia/farmer/farmer.py index 7037bbc26556..58470b53c30d 100644 --- a/chia/farmer/farmer.py +++ b/chia/farmer/farmer.py @@ -198,8 +198,10 @@ async def start_task() -> None: if sys.getprofile() is not None: self.log.warning("not enabling profiler, getprofile() is already set") else: + # TODO: stop dropping tasks on the floor asyncio.create_task(profile_task(self._root_path, "farmer", self.log)) # noqa: RUF006 + # TODO: stop dropping tasks on the floor asyncio.create_task(start_task()) # noqa: RUF006 try: yield diff --git a/chia/full_node/full_node.py b/chia/full_node/full_node.py index 5a0355e7bef5..2e636c4ce2a5 100644 --- a/chia/full_node/full_node.py +++ b/chia/full_node/full_node.py @@ -281,6 +281,7 @@ async def manage(self) -> AsyncIterator[None]: self._init_weight_proof = asyncio.create_task(self.initialize_weight_proof()) if self.config.get("enable_profiler", False): + # TODO: stop dropping tasks on the floor asyncio.create_task(profile_task(self.root_path, "node", self.log)) # noqa: RUF006 self.profile_block_validation = self.config.get("profile_block_validation", False) @@ -291,6 +292,7 @@ async def manage(self) -> AsyncIterator[None]: profile_dir.mkdir(parents=True, exist_ok=True) if self.config.get("enable_memory_profiler", False): + # TODO: stop dropping tasks on the floor asyncio.create_task(mem_profile_task(self.root_path, "node", self.log)) # noqa: RUF006 time_taken = time.monotonic() - start_time @@ -338,6 +340,7 @@ async def manage(self) -> AsyncIterator[None]: self.initialized = True if self.full_node_peers is not None: + # TODO: stop dropping tasks on the floor asyncio.create_task(self.full_node_peers.start()) # noqa: RUF006 try: yield @@ -354,6 +357,7 @@ async def manage(self) -> AsyncIterator[None]: self.mempool_manager.shut_down() if self.full_node_peers is not None: + # TODO: stop dropping tasks on the floor asyncio.create_task(self.full_node_peers.close()) # noqa: RUF006 if self.uncompact_task is not None: self.uncompact_task.cancel() @@ -891,6 +895,7 @@ async def on_connect(self, connection: WSChiaConnection) -> None: self._state_changed("add_connection") self._state_changed("sync_mode") if self.full_node_peers is not None: + # TODO: stop dropping tasks on the floor asyncio.create_task(self.full_node_peers.on_connect(connection)) # noqa: RUF006 if self.initialized is False: diff --git a/chia/full_node/full_node_api.py b/chia/full_node/full_node_api.py index aa2721d83c56..6bf09d867d1d 100644 --- a/chia/full_node/full_node_api.py +++ b/chia/full_node/full_node_api.py @@ -459,6 +459,7 @@ async def eventually_clear() -> None: await asyncio.sleep(5) self.full_node.full_node_store.remove_requesting_unfinished_block(block_hash, None) + # TODO: stop dropping tasks on the floor asyncio.create_task(eventually_clear()) # noqa: RUF006 return msg @@ -527,6 +528,7 @@ async def eventually_clear() -> None: await asyncio.sleep(5) self.full_node.full_node_store.remove_requesting_unfinished_block(block_hash, foliage_hash) + # TODO: stop dropping tasks on the floor asyncio.create_task(eventually_clear()) # noqa: RUF006 return msg diff --git a/chia/rpc/rpc_server.py b/chia/rpc/rpc_server.py index e118e8a27c00..a604cad144d3 100644 --- a/chia/rpc/rpc_server.py +++ b/chia/rpc/rpc_server.py @@ -248,6 +248,7 @@ async def _state_changed(self, change: str, change_data: Optional[dict[str, Any] def state_changed(self, change: str, change_data: Optional[dict[str, Any]] = None) -> None: if self.websocket is None or self.websocket.closed: return None + # TODO: stop dropping tasks on the floor asyncio.create_task(self._state_changed(change, change_data)) # noqa: RUF006 @property diff --git a/chia/seeder/dns_server.py b/chia/seeder/dns_server.py index f72c5aeab8fb..b239246517fa 100644 --- a/chia/seeder/dns_server.py +++ b/chia/seeder/dns_server.py @@ -80,6 +80,7 @@ def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None: dns_request: Optional[DNSRecord] = parse_dns_request(data) if dns_request is None: # Invalid Request, we can just drop it and move on. return + # TODO: stop dropping tasks on the floor asyncio.create_task(self.handler(dns_request, addr)) # noqa: RUF006 async def respond(self) -> None: @@ -192,6 +193,7 @@ def eof_received(self) -> Optional[bool]: f"Received incomplete TCP DNS request of length {self.expected_length} from {self.peer_info}, " f"closing connection after dns replies are sent." ) + # TODO: stop dropping tasks on the floor asyncio.create_task(self.wait_for_futures()) # noqa: RUF006 return True # Keep connection open, until the futures are done. log.info(f"Received early EOF from {self.peer_info}, closing connection.") diff --git a/chia/server/server.py b/chia/server/server.py index bfb0720dcf01..1ff72c2214a5 100644 --- a/chia/server/server.py +++ b/chia/server/server.py @@ -502,6 +502,7 @@ async def start_client( self.log.info(f"Connected with {connection_type_str} {target_node}") else: self.log.debug(f"Successful feeler connection with {connection_type_str} {target_node}") + # TODO: stop dropping tasks on the floor asyncio.create_task(connection.close()) # noqa: RUF006 return True except client_exceptions.ClientConnectorError as e: diff --git a/chia/server/ws_connection.py b/chia/server/ws_connection.py index a28865675a6f..a6c4efb441e8 100644 --- a/chia/server/ws_connection.py +++ b/chia/server/ws_connection.py @@ -640,6 +640,7 @@ async def _send_message(self, message: Message) -> None: # TODO: fix this special case. This function has rate limits which are too low. if ProtocolMessageTypes(message.type) != ProtocolMessageTypes.respond_peers: + # TODO: stop dropping tasks on the floor asyncio.create_task(self._wait_and_retry(message)) # noqa: RUF006 return None @@ -668,6 +669,7 @@ async def _read_one_message(self) -> Optional[Message]: f"{self.peer_server_port}/" f"{self.peer_info.port}" ) + # TODO: stop dropping tasks on the floor asyncio.create_task(self.close()) # noqa: RUF006 await asyncio.sleep(3) elif message.type == WSMsgType.CLOSE: @@ -676,10 +678,12 @@ async def _read_one_message(self) -> Optional[Message]: f"{self.peer_server_port}/" f"{self.peer_info.port}" ) + # TODO: stop dropping tasks on the floor asyncio.create_task(self.close()) # noqa: RUF006 await asyncio.sleep(3) elif message.type == WSMsgType.CLOSED: if not self.closed: + # TODO: stop dropping tasks on the floor asyncio.create_task(self.close()) # noqa: RUF006 await asyncio.sleep(3) return None @@ -701,6 +705,7 @@ async def _read_one_message(self) -> Optional[Message]: f"message: {message_type}" ) # Only full node disconnects peers, to prevent abuse and crashing timelords, farmers, etc + # TODO: stop dropping tasks on the floor asyncio.create_task(self.close(300)) # noqa: RUF006 await asyncio.sleep(3) return None @@ -714,13 +719,16 @@ async def _read_one_message(self) -> Optional[Message]: elif message.type == WSMsgType.ERROR: self.log.error(f"WebSocket Error: {message}") if isinstance(message.data, WebSocketError) and message.data.code == WSCloseCode.MESSAGE_TOO_BIG: + # TODO: stop dropping tasks on the floor asyncio.create_task(self.close(300)) # noqa: RUF006 else: + # TODO: stop dropping tasks on the floor asyncio.create_task(self.close()) # noqa: RUF006 await asyncio.sleep(3) else: self.log.error(f"Unexpected WebSocket message type: {message}") + # TODO: stop dropping tasks on the floor asyncio.create_task(self.close()) # noqa: RUF006 await asyncio.sleep(3) return None diff --git a/chia/wallet/wallet_node.py b/chia/wallet/wallet_node.py index f11d12dbcd57..9c14f184a9ab 100644 --- a/chia/wallet/wallet_node.py +++ b/chia/wallet/wallet_node.py @@ -426,9 +426,11 @@ async def _start_with_fingerprint( if sys.getprofile() is not None: self.log.warning("not enabling profiler, getprofile() is already set") else: + # TODO: stop dropping tasks on the floor asyncio.create_task(profile_task(self.root_path, "wallet", self.log)) # noqa: RUF006 if self.config.get("enable_memory_profiler", False): + # TODO: stop dropping tasks on the floor asyncio.create_task(mem_profile_task(self.root_path, "wallet", self.log)) # noqa: RUF006 path: Path = get_wallet_db_path(self.root_path, self.config, str(fingerprint)) @@ -517,6 +519,7 @@ def _set_state_changed_callback(self, callback: StateChangedProtocol) -> None: def _pending_tx_handler(self) -> None: if self._wallet_state_manager is None: return None + # TODO: stop dropping tasks on the floor asyncio.create_task(self._resend_queue()) # noqa: RUF006 async def _resend_queue(self) -> None: @@ -718,6 +721,7 @@ def initialize_wallet_peers(self) -> None: default_port, self.log, ) + # TODO: stop dropping tasks on the floor asyncio.create_task(self.wallet_peers.start()) # noqa: RUF006 async def on_disconnect(self, peer: WSChiaConnection) -> None: