Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to fix invalid sessions received when reconnecting to the Gateway #592

Merged
merged 6 commits into from
Nov 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/nyxx.dart
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ export 'src/http/managers/interaction_manager.dart' show InteractionManager;
export 'src/http/managers/entitlement_manager.dart' show EntitlementManager;

export 'src/gateway/gateway.dart' show Gateway;
export 'src/gateway/message.dart' show Disconnecting, Dispose, ErrorReceived, EventReceived, GatewayMessage, Send, ShardData, ShardMessage;
export 'src/gateway/message.dart' show Disconnecting, Dispose, ErrorReceived, EventReceived, GatewayMessage, Send, Sent, ShardData, ShardMessage;
export 'src/gateway/shard.dart' show Shard;

export 'src/models/discord_color.dart' show DiscordColor;
Expand Down
9 changes: 2 additions & 7 deletions lib/src/gateway/gateway.dart
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,7 @@ class Gateway extends GatewayManager with EventParser {
Gateway(this.client, this.gatewayBot, this.shards, this.totalShards, this.shardIds) : super.create() {
for (final shard in shards) {
shard.listen(
(message) {
if (message is ErrorReceived) {
shard.logger.warning('Received error: ${message.error}', message.error, message.stackTrace);
}

_messagesController.add(message);
},
_messagesController.add,
onError: _messagesController.addError,
onDone: () async {
if (_closing) {
Expand All @@ -109,6 +103,7 @@ class Gateway extends GatewayManager with EventParser {

throw ShardDisconnectedError(shard);
},
cancelOnError: false,
);
}

Expand Down
9 changes: 9 additions & 0 deletions lib/src/gateway/message.dart
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ class Disconnecting extends ShardMessage {
Disconnecting({required this.reason});
}

/// A shard message sent when the shard adds a payload to the connection.
class Sent extends ShardMessage {
/// The payload that was sent.
final Send payload;

/// Create a new [Sent].
Sent({required this.payload});
}

/// The base class for all control messages sent from the client to the shard.
abstract class GatewayMessage with ToStringHelper {}

Expand Down
10 changes: 7 additions & 3 deletions lib/src/gateway/shard.dart
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ class Shard extends Stream<ShardMessage> implements StreamSink<GatewayMessage> {
/// Create a new [Shard].
Shard(this.id, this.isolate, this.receiveStream, this.sendPort, this.client) {
final subscription = listen((message) {
if (message is ErrorReceived) {
if (message is Sent) {
logger
..fine('Sent payload: ${message.payload.opcode.name}')
..finer('Opcode: ${message.payload.opcode.value}, Data: ${message.payload.data}');
} else if (message is ErrorReceived) {
logger.warning('Error: ${message.error}', message.error, message.stackTrace);
} else if (message is Disconnecting) {
logger.info('Disconnecting: ${message.reason}');
Expand All @@ -61,7 +65,7 @@ class Shard extends Stream<ShardMessage> implements StreamSink<GatewayMessage> {
if (isResumable) {
logger.info('Reconnecting: invalid session');
} else {
logger.severe('Unresumable invalid session, disconnecting');
logger.warning('Reconnecting: unresumable invalid session');
}
case HelloEvent(:final heartbeatInterval):
logger.finest('Heartbeat Interval: $heartbeatInterval');
Expand Down Expand Up @@ -141,7 +145,7 @@ class Shard extends Stream<ShardMessage> implements StreamSink<GatewayMessage> {
void add(GatewayMessage event) {
if (event is Send) {
logger
..fine('Send: ${event.opcode.name}')
..fine('Sending: ${event.opcode.name}')
..finer('Opcode: ${event.opcode.value}, Data: ${event.data}');
} else if (event is Dispose) {
logger.info('Disposing');
Expand Down
29 changes: 19 additions & 10 deletions lib/src/gateway/shard_runner.dart
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class ShardRunner {
final controller = StreamController<ShardMessage>();

// The subscription to the control messages stream.
// This subscription is paused whenever the shard is not successfully connected,.
// This subscription is paused whenever the shard is not successfully connected.
final controlSubscription = messages.listen((message) {
if (message is Send) {
connection!.add(message);
Expand Down Expand Up @@ -87,6 +87,7 @@ class ShardRunner {

// Open the websocket connection.
connection = await ShardConnection.connect(gatewayUri.toString(), this);
connection!.onSent.listen(controller.add);

// Obtain the heartbeat interval from the HELLO event and start heartbeating.
final hello = await connection!.first;
Expand All @@ -105,7 +106,7 @@ class ShardRunner {
sendIdentify();
}

canResume = false;
canResume = true;

// We are connected, start handling control messages.
controlSubscription.resume();
Expand All @@ -127,7 +128,7 @@ class ShardRunner {
}
} else if (event is ReconnectEvent) {
canResume = true;
connection!.close();
connection!.close(4000);
} else if (event is InvalidSessionEvent) {
if (event.isResumable) {
canResume = true;
Expand All @@ -136,7 +137,8 @@ class ShardRunner {
gatewayUri = originalGatewayUri;
}

connection!.close();
// Don't use 4000 as it will always try to resume
connection!.close(4999);
} else if (event is HeartbeatAckEvent) {
lastHeartbeatAcked = true;
heartbeatStopwatch = null;
Expand All @@ -159,8 +161,7 @@ class ShardRunner {
// Check if we can resume based on close code.
// A manual close where we set closeCode earlier would have a close code of 1000, so this
// doesn't change closeCode if we set it manually.
// 1001 is the close code used for a ping failure, so include it in the resumable codes.
const resumableCodes = [null, 1001, 4000, 4001, 4002, 4003, 4007, 4008, 4009];
const resumableCodes = [null, 4000, 4001, 4002, 4003, 4007, 4008, 4009];
final closeCode = connection!.websocket.closeCode;
canResume = canResume || resumableCodes.contains(closeCode);

Expand All @@ -171,9 +172,11 @@ class ShardRunner {
}
} catch (error, stackTrace) {
controller.add(ErrorReceived(error: error, stackTrace: stackTrace));
// Prevents the while-true loop from looping too often when no internet is available.
await Future.delayed(Duration(milliseconds: 100));
} finally {
// Reset connection properties.
connection?.close();
connection?.close(4000);
connection = null;
heartbeatTimer?.cancel();
heartbeatTimer = null;
Expand Down Expand Up @@ -246,11 +249,13 @@ class ShardConnection extends Stream<GatewayEvent> implements StreamSink<Send> {
final Stream<GatewayEvent> events;
final ShardRunner runner;

final StreamController<Sent> _sentController = StreamController();
Stream<Sent> get onSent => _sentController.stream;

ShardConnection(this.websocket, this.events, this.runner);

static Future<ShardConnection> connect(String gatewayUri, ShardRunner runner) async {
final connection = await WebSocket.connect(gatewayUri);
connection.pingInterval = const Duration(seconds: 20);

final uncompressedStream = switch (runner.data.apiOptions.compression) {
GatewayCompression.transport => decompressTransport(connection.cast<List<int>>()),
Expand Down Expand Up @@ -293,6 +298,7 @@ class ShardConnection extends Stream<GatewayEvent> implements StreamSink<Send> {
};

websocket.add(encoded);
_sentController.add(Sent(payload: event));
}

@override
Expand All @@ -302,10 +308,13 @@ class ShardConnection extends Stream<GatewayEvent> implements StreamSink<Send> {
Future<void> addStream(Stream<Send> stream) => stream.forEach(add);

@override
Future<void> close([int? code]) => websocket.close(code ?? 1000);
Future<void> close([int? code]) async {
await websocket.close(code ?? 1000);
await _sentController.close();
}

@override
Future<void> get done => websocket.done;
Future<void> get done => websocket.done.then((_) => _sentController.done);
}

Stream<dynamic> decompressTransport(Stream<List<int>> raw) {
Expand Down
2 changes: 1 addition & 1 deletion lib/src/plugin/logging.dart
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class Logging extends NyxxPlugin {
}
}

final outSink = rec.level > stderrLevel ? stderr : stdout;
final outSink = rec.level >= stderrLevel ? stderr : stdout;
outSink.write(messageString);
});
}
Expand Down