Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the stability of SocketModeClient implementations #1114

Merged
merged 2 commits into from
Sep 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions slack_sdk/socket_mode/aiohttp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ async def monitor_current_session(self) -> None:
t = time.time()
if self.last_ping_pong_time is None:
self.last_ping_pong_time = float(t)
await self.current_session.ping(f"ping-pong:{t}")
await self.current_session.ping(f"sdk-ping-pong:{t}")

if self.auto_reconnect_enabled:
should_reconnect = False
Expand Down Expand Up @@ -226,7 +226,10 @@ async def receive_messages(self) -> None:
if message.data is not None:
str_message_data = message.data.decode("utf-8")
elements = str_message_data.split(":")
if len(elements) == 2:
if (
len(elements) == 2
and elements[0] == "sdk-ping-pong"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a minor improvement for #1112

):
try:
self.last_ping_pong_time = float(elements[1])
except Exception as e:
Expand Down Expand Up @@ -296,7 +299,30 @@ async def disconnect(self):
async def send_message(self, message: str):
if self.logger.level <= logging.DEBUG:
self.logger.debug(f"Sending a message: {message}")
await self.current_session.send_str(message)
try:
await self.current_session.send_str(message)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A race condition where the background job can disconnect the current session if it's not working any more. In this scenario, reconnection will be completed shortly by the background job. By acquiring the connect_operation_lock before retrying, this method can wait for the newly established connection.

except ConnectionError as e:
# We rarely get this exception while replacing the underlying WebSocket connections.
# We can do one more try here as the self.current_session should be ready now.
if self.logger.level <= logging.DEBUG:
self.logger.debug(
f"Failed to send a message (error: {e}, message: {message})"
" as the underlying connection was replaced. Retrying the same request only one time..."
)
# Although acquiring self.connect_operation_lock also for the first method call is the safest way,
# we avoid synchronizing a lot for better performance. That's why we are doing a retry here.
try:
await self.connect_operation_lock.acquire()
if await self.is_connected():
await self.current_session.send_str(message)
else:
self.logger.warning(
"The current session is no longer active. Failed to send a message"
)
raise e
finally:
if self.connect_operation_lock.locked() is True:
self.connect_operation_lock.release()

async def close(self):
self.closed = True
Expand Down
24 changes: 22 additions & 2 deletions slack_sdk/socket_mode/builtin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from slack_sdk.web import WebClient
from .connection import Connection, ConnectionState
from ..interval_runner import IntervalRunner
from ...errors import SlackClientConfigurationError
from ...errors import SlackClientConfigurationError, SlackClientNotConnectedError
from ...proxy_env_variable_loader import load_http_proxy_from_env


Expand Down Expand Up @@ -206,7 +206,27 @@ def send_message(self, message: str) -> None:
self.logger.debug(
f"Sending a message (session id: {self.session_id()}, message: {message})"
)
self.current_session.send(message)
try:
self.current_session.send(message)
except SlackClientNotConnectedError as e:
# We rarely get this exception while replacing the underlying WebSocket connections.
# We can do one more try here as the self.current_session should be ready now.
if self.logger.level <= logging.DEBUG:
self.logger.debug(
f"Failed to send a message (session id: {self.session_id()}, error: {e}, message: {message})"
" as the underlying connection was replaced. Retrying the same request only one time..."
)
# Although acquiring self.connect_operation_lock also for the first method call is the safest way,
# we avoid synchronizing a lot for better performance. That's why we are doing a retry here.
with self.connect_operation_lock:
if self.is_connected():
self.current_session.send(message)
else:
self.logger.warning(
f"The current session (session id: {self.session_id()}) is no longer active. "
"Failed to send a message"
)
raise e

def close(self):
self.closed = True
Expand Down
37 changes: 32 additions & 5 deletions slack_sdk/socket_mode/builtin/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,13 @@ def connect(self) -> None:

def disconnect(self) -> None:
if self.sock is not None:
self.sock.close()
self.sock = None
with self.sock_send_lock:
with self.sock_receive_lock:
# Synchronize before closing this instance's socket
self.sock.close()
self.sock = None
# After this, all operations using self.sock will be skipped

self.logger.info(
f"The connection has been closed (session id: {self.session_id})"
)
Expand All @@ -198,7 +203,13 @@ def ping(self, payload: Union[str, bytes] = "") -> None:
)
data = _build_data_frame_for_sending(payload, FrameHeader.OPCODE_PING)
with self.sock_send_lock:
self.sock.send(data)
if self.sock is not None:
self.sock.send(data)
else:
if self.ping_pong_trace_enabled:
self.logger.debug(
"Skipped sending a ping message as the underlying socket is no longer available."
)

def pong(self, payload: Union[str, bytes] = "") -> None:
if self.trace_enabled and self.ping_pong_trace_enabled:
Expand All @@ -210,7 +221,13 @@ def pong(self, payload: Union[str, bytes] = "") -> None:
)
data = _build_data_frame_for_sending(payload, FrameHeader.OPCODE_PONG)
with self.sock_send_lock:
self.sock.send(data)
if self.sock is not None:
self.sock.send(data)
else:
if self.ping_pong_trace_enabled:
self.logger.debug(
"Skipped sending a pong message as the underlying socket is no longer available."
)

def send(self, payload: str) -> None:
if self.trace_enabled:
Expand All @@ -222,7 +239,17 @@ def send(self, payload: str) -> None:
)
data = _build_data_frame_for_sending(payload, FrameHeader.OPCODE_TEXT)
with self.sock_send_lock:
self.sock.send(data)
try:
self.sock.send(data)
except Exception as e:
# In most cases, we want to retry this operation with a newly established connection.
# Getting this exception means that this connection has been replaced with a new one
# and it's no longer usable.
# The SocketModeClient implementation can do one retry when it gets this exception.
raise SlackClientNotConnectedError(
f"Failed to send a message as the connection is no longer active "
f"(session_id: {self.session_id}, error: {e})"
)

def check_state(self) -> None:
try:
Expand Down
24 changes: 22 additions & 2 deletions slack_sdk/socket_mode/websocket_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from typing import Union, Optional, List, Callable, Tuple

import websocket
from websocket import WebSocketApp
from websocket import WebSocketApp, WebSocketException

from slack_sdk.socket_mode.client import BaseSocketModeClient
from slack_sdk.socket_mode.interval_runner import IntervalRunner
Expand Down Expand Up @@ -212,7 +212,27 @@ def disconnect(self) -> None:
def send_message(self, message: str) -> None:
if self.logger.level <= logging.DEBUG:
self.logger.debug(f"Sending a message: {message}")
self.current_session.send(message)
try:
self.current_session.send(message)
except WebSocketException as e:
# We rarely get this exception while replacing the underlying WebSocket connections.
# We can do one more try here as the self.current_session should be ready now.
if self.logger.level <= logging.DEBUG:
self.logger.debug(
f"Failed to send a message (error: {e}, message: {message})"
" as the underlying connection was replaced. Retrying the same request only one time..."
)
# Although acquiring self.connect_operation_lock also for the first method call is the safest way,
# we avoid synchronizing a lot for better performance. That's why we are doing a retry here.
with self.connect_operation_lock:
if self.is_connected():
self.current_session.send(message)
else:
self.logger.warning(
f"The current session (session id: {self.session_id()}) is no longer active. "
"Failed to send a message"
)
raise e

def close(self):
self.closed = True
Expand Down
25 changes: 24 additions & 1 deletion slack_sdk/socket_mode/websockets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import websockets
from websockets.client import WebSocketClientProtocol
from websockets.exceptions import WebSocketException

from slack_sdk.socket_mode.async_client import AsyncBaseSocketModeClient
from slack_sdk.socket_mode.async_listeners import (
Expand Down Expand Up @@ -177,7 +178,29 @@ async def disconnect(self):
async def send_message(self, message: str):
if self.logger.level <= logging.DEBUG:
self.logger.debug(f"Sending a message: {message}")
await self.current_session.send(message)
try:
await self.current_session.send(message)
except WebSocketException as e:
# We rarely get this exception while replacing the underlying WebSocket connections.
# We can do one more try here as the self.current_session should be ready now.
if self.logger.level <= logging.DEBUG:
self.logger.debug(
f"Failed to send a message (error: {e}, message: {message})"
" as the underlying connection was replaced. Retrying the same request only one time..."
)
# Although acquiring self.connect_operation_lock also for the first method call is the safest way,
# we avoid synchronizing a lot for better performance. That's why we are doing a retry here.
try:
if await self.is_connected():
await self.current_session.send(message)
else:
self.logger.warning(
"The current session is no longer active. Failed to send a message"
)
raise e
finally:
if self.connect_operation_lock.locked() is True:
self.connect_operation_lock.release()

async def close(self):
self.closed = True
Expand Down
40 changes: 39 additions & 1 deletion tests/slack_sdk/socket_mode/test_interactions_builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from random import randint
from threading import Thread

from slack_sdk.errors import SlackClientConfigurationError
from slack_sdk.errors import SlackClientConfigurationError, SlackClientNotConnectedError
from slack_sdk.socket_mode.request import SocketModeRequest

from slack_sdk.socket_mode.client import BaseSocketModeClient
Expand Down Expand Up @@ -126,7 +126,45 @@ def socket_mode_request_handler(
self.logger.info(f"Passed with buffer size: {buffer_size}")

finally:
client.close()
self.server.stop()
self.server.close()

self.logger.info(f"Passed with buffer size: {buffer_size_list}")

def test_send_message_while_disconnection(self):
if is_ci_unstable_test_skip_enabled():
return
t = Thread(target=start_socket_mode_server(self, 3011))
t.daemon = True
t.start()
time.sleep(2) # wait for the server

try:
self.reset_sever_state()
client = SocketModeClient(
app_token="xapp-A111-222-xyz",
web_client=self.web_client,
auto_reconnect_enabled=False,
trace_enabled=True,
)
client.wss_uri = "ws://0.0.0.0:3011/link"
client.connect()
time.sleep(1) # wait for the connection
client.send_message("foo")

client.disconnect()
time.sleep(1) # wait for the connection
try:
client.send_message("foo")
self.fail("SlackClientNotConnectedError is expected here")
except SlackClientNotConnectedError as _:
pass
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we should verify if there are expected logs in this test but I manually checked logs/pytest.log.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hard for me to understand the intention of this test. The name of the test, "test sending messages", could be interpreted as validating a variety of behaviours that make up the overall message sending functionality. In context of this PR, I assume that this test checks for the single-retry-when-underlying-WS-connection-is-re-established behaviour, but perhaps a week or two from now I will no longer remember this (my memory is not what it used to be 👴 ).

Two questions:

  • could we rename the test to be more specific as to what we are validating?
  • can we add assertions to the test that make explicit what we expect to happen? I assume that in this test we want to trigger the SlackClientNotConnectedError, but because we pass on it, if the error is not raised, then the test passes. So in effect this test may - or may not - validate behaviour, in that if the exception is raised, we pass, but if it is not, we implicitly pass.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we rename the test to be more specific as to what we are validating?

You are definitely right here. Perhaps, a better test name would be test_send_message_while_disconnection

if the error is not raised, then the test passes.

Good point. we can add an explicit failure for the case where the exception is not propagated. I will update the tests and the typo in comments later on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!


client.connect()
time.sleep(1) # wait for the connection
client.send_message("foo")
finally:
client.close()
self.server.stop()
self.server.close()
39 changes: 39 additions & 0 deletions tests/slack_sdk/socket_mode/test_interactions_websocket_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from random import randint
from threading import Thread

from websocket import WebSocketException

from slack_sdk.socket_mode.client import BaseSocketModeClient

from slack_sdk.socket_mode.request import SocketModeRequest
Expand Down Expand Up @@ -101,3 +103,40 @@ def socket_mode_request_handler(
client.close()
self.server.stop()
self.server.close()

def test_send_message_while_disconnection(self):
if is_ci_unstable_test_skip_enabled():
return
t = Thread(target=start_socket_mode_server(self, 3012))
t.daemon = True
t.start()
time.sleep(2) # wait for the server

try:
self.reset_sever_state()
client = SocketModeClient(
app_token="xapp-A111-222-xyz",
web_client=self.web_client,
auto_reconnect_enabled=False,
trace_enabled=True,
)
client.wss_uri = "ws://0.0.0.0:3012/link"
client.connect()
time.sleep(1) # wait for the connection
client.send_message("foo")

client.disconnect()
time.sleep(1) # wait for the connection
try:
client.send_message("foo")
self.fail("WebSocketException is expected here")
except WebSocketException as _:
pass

client.connect()
time.sleep(1) # wait for the connection
client.send_message("foo")
finally:
client.close()
self.server.stop()
self.server.close()
38 changes: 38 additions & 0 deletions tests/slack_sdk_async/socket_mode/test_interactions_aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,41 @@ async def socket_mode_listener(
await client.close()
self.server.stop()
self.server.close()

@async_test
async def test_send_message_while_disconnection(self):
if is_ci_unstable_test_skip_enabled():
return
t = Thread(target=start_socket_mode_server(self, 3001))
t.daemon = True
t.start()

client = SocketModeClient(
app_token="xapp-A111-222-xyz",
web_client=self.web_client,
auto_reconnect_enabled=False,
trace_enabled=True,
)

try:
time.sleep(1) # wait for the server
client.wss_uri = "ws://0.0.0.0:3001/link"
await client.connect()
await asyncio.sleep(1) # wait for the message receiver
await client.send_message("foo")

await client.disconnect()
await asyncio.sleep(1) # wait for the message receiver
try:
await client.send_message("foo")
self.fail("ConnectionError is expected here")
except ConnectionError as _:
pass

await client.connect()
await asyncio.sleep(1) # wait for the message receiver
await client.send_message("foo")
finally:
await client.close()
self.server.stop()
self.server.close()
Loading