Skip to content

Commit

Permalink
improve logging of rate limits
Browse files Browse the repository at this point in the history
  • Loading branch information
arvidn committed Nov 20, 2024
1 parent 3bc2fbf commit 4eea3d7
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 17 deletions.
40 changes: 31 additions & 9 deletions chia/server/rate_limits.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def __init__(self, incoming: bool, reset_seconds: int = 60, percentage_of_limit:

def process_msg_and_check(
self, message: Message, our_capabilities: list[Capability], peer_capabilities: list[Capability]
) -> bool:
) -> tuple[bool, str]:
"""
Returns True if message can be processed successfully, false if a rate limit is passed.
"""
Expand All @@ -59,7 +59,7 @@ def process_msg_and_check(
message_type = ProtocolMessageTypes(message.type)
except Exception as e:
log.warning(f"Invalid message: {message.type}, {e}")
return True
return (True, "")

new_message_counts: int = self.message_counts[message_type] + 1
new_cumulative_size: int = self.message_cumulative_sizes[message_type] + len(message.data)
Expand All @@ -81,25 +81,47 @@ def process_msg_and_check(
new_non_tx_count = self.non_tx_message_counts + 1
new_non_tx_size = self.non_tx_cumulative_size + len(message.data)
if new_non_tx_count > non_tx_freq * proportion_of_limit:
return False
return (
False,
f"non-tx count: {new_non_tx_count} "
f"> {non_tx_freq * proportion_of_limit} "
f"(scale factor: {proportion_of_limit})",
)
if new_non_tx_size > non_tx_max_total_size * proportion_of_limit:
return False
return (
False,
f"non-tx size: {new_non_tx_size} "
f"> {non_tx_max_total_size * proportion_of_limit}"
f"(scale factor: {proportion_of_limit})",
)
else:
log.warning(f"Message type {message_type} not found in rate limits")
log.warning(
f"Message type {message_type} not found in rate limits " f"(scale factor: {proportion_of_limit})",
)

if limits.max_total_size is None:
limits = dataclasses.replace(limits, max_total_size=limits.frequency * limits.max_size)
assert limits.max_total_size is not None

if new_message_counts > limits.frequency * proportion_of_limit:
return False
return (
False,
f"message count: {new_message_counts} "
f"> {limits.frequency * proportion_of_limit} "
f"(scale factor: {proportion_of_limit})",
)
if len(message.data) > limits.max_size:
return False
return (False, f"message size: {len(message.data)} > {limits.max_size}")
if new_cumulative_size > limits.max_total_size * proportion_of_limit:
return False
return (
False,
f"cumulative size: {new_cumulative_size} "
f"> {limits.max_total_size * proportion_of_limit} "
f"(scale factor: {proportion_of_limit})",
)

ret = True
return True
return (True, "")
finally:
if self.incoming or ret:
# now that we determined that it's OK to send the message, commit the
Expand Down
24 changes: 16 additions & 8 deletions chia/server/ws_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,17 +626,23 @@ async def _send_message(self, message: Message) -> None:
encoded: bytes = bytes(message)
size = len(encoded)
assert len(encoded) < (2 ** (LENGTH_BYTES * 8))
if not self.outbound_rate_limiter.process_msg_and_check(
accepted, limiter_msg = self.outbound_rate_limiter.process_msg_and_check(
message, self.local_capabilities, self.peer_capabilities
):
)
if not accepted:
if not is_localhost(self.peer_info.host):
message_type = ProtocolMessageTypes(message.type)
last_time = self.log_rate_limit_last_time[message_type]
now = time.monotonic()
self.log_rate_limit_last_time[message_type] = now
if now - last_time >= 60:
msg = f"Rate limiting ourselves. message type: {message_type.name}, peer: {self.peer_info.host}"
self.log.debug(msg)
if now - last_time >= 30:
self.log.info(
f"Rate limiting ourselves. Dropping outbound message: "
f"{message_type.name}, "
f"sz: {len(message.data) / 1000:0.2f} kB, "
f"peer: {self.peer_info.host}, "
f"{limiter_msg}"
)

# TODO: fix this special case. This function has rate limits which are too low.
if ProtocolMessageTypes(message.type) != ProtocolMessageTypes.respond_peers:
Expand Down Expand Up @@ -696,13 +702,15 @@ async def _read_one_message(self) -> Optional[Message]:
message_type = ProtocolMessageTypes(full_message_loaded.type).name
except Exception:
message_type = "Unknown"
if not self.inbound_rate_limiter.process_msg_and_check(
accepted, limiter_msg = self.inbound_rate_limiter.process_msg_and_check(
full_message_loaded, self.local_capabilities, self.peer_capabilities
):
)
if not accepted:
if self.local_type == NodeType.FULL_NODE and not is_localhost(self.peer_info.host):
self.log.error(
f"Peer has been rate limited and will be disconnected: {self.peer_info.host}, "
f"message: {message_type}"
f"message: {message_type}, "
f"{limiter_msg}"
)
# Only full node disconnects peers, to prevent abuse and crashing timelords, farmers, etc
# TODO: stop dropping tasks on the floor
Expand Down

0 comments on commit 4eea3d7

Please sign in to comment.