Skip to content

Commit

Permalink
change thread after zlib decompression
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkrissym committed Oct 3, 2024
1 parent 5b150f0 commit 32d899a
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 98 deletions.
12 changes: 5 additions & 7 deletions Discord.C++/Gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,11 @@ void DiscordCPP::Gateway::on_read(boost::system::error_code error_code, std::siz
message_stream << beast::make_printable(buffer.data());
std::string message = message_stream.str();

threadpool->execute([this, message]() {
try {
on_websocket_incoming_message(message);
} catch (const std::exception& e) {
_log.error("Error while handling incoming message: " + std::string(e.what()));
}
});
try {
on_websocket_incoming_message(message);
} catch (const std::exception& e) {
_log.error("Error while handling incoming message: " + std::string(e.what()));
}
}

buffer.clear();
Expand Down
123 changes: 62 additions & 61 deletions Discord.C++/MainGateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,73 +40,74 @@ json DiscordCPP::MainGateway::get_heartbeat_payload() {
{"d", ((_sequence_number == 0) ? "null" : std::to_string(_sequence_number))}};
}

void DiscordCPP::MainGateway::on_websocket_incoming_message(
const std::string& message) {
void DiscordCPP::MainGateway::on_websocket_incoming_message(const std::string& message) {
std::string decompressed_message = decompress_message(message);
_log.debug("Received message: " + decompressed_message);

json payload = json::parse(decompressed_message);
int op = payload["op"].get<int>();

if ((payload.count("s") > 0) && payload["s"].is_number_integer()) {
_sequence_number = payload["s"].get<int>();
}

switch (op) {
case 0:
if (payload["t"].get<std::string>() == "READY") {
_reconnect_timeout = 0;
_last_heartbeat_ack = time(nullptr);

_invalid_session = false;

_session_id = payload["d"]["session_id"].get<std::string>();
threadpool->execute([this, decompressed_message]() {
_log.debug("Received message: " + decompressed_message);

_resume_url = payload["d"]["resume_gateway_url"].get<std::string>() + "?v=10&encoding=json&compress=zlib-stream";
json payload = json::parse(decompressed_message);
int op = payload["op"].get<int>();

std::string str = set_trace(payload);
if ((payload.count("s") > 0) && payload["s"].is_number_integer()) {
_sequence_number = payload["s"].get<int>();
}

_log.info("connected to: " + str + " ]");
_log.info("session id: " + _session_id);
} else if (payload["t"].get<std::string>() == "RESUMED") {
_reconnect_timeout = 0;
switch (op) {
case 0:
if (payload["t"].get<std::string>() == "READY") {
_reconnect_timeout = 0;
_last_heartbeat_ack = time(nullptr);

_invalid_session = false;

_session_id = payload["d"]["session_id"].get<std::string>();

_resume_url = payload["d"]["resume_gateway_url"].get<std::string>() + "?v=10&encoding=json&compress=zlib-stream";

std::string str = set_trace(payload);

_log.info("connected to: " + str + " ]");
_log.info("session id: " + _session_id);
} else if (payload["t"].get<std::string>() == "RESUMED") {
_reconnect_timeout = 0;
_last_heartbeat_ack = time(nullptr);

std::string str = set_trace(payload);

_log.info("successfully resumed session " + _session_id +
" with trace " + str + " ]");
}
break;
case 1:
send_heartbeat_ack();
break;
case 7:
_log.info("received opcode 7: reconnecting to the gateway");
try {
_client->close(boost::beast::websocket::close_reason(boost::beast::websocket::close_code::going_away, "Server requested reconnect"));
} catch (std::exception& e) {
_log.error("Cannot close websocket: " + std::string(e.what()));
}
break;
case 9:
_invalid_session = true;
break;
case 10:
_heartbeat_interval = payload["d"]["heartbeat_interval"].get<int>();
_log.debug("set heartbeat_interval: " +
std::to_string(_heartbeat_interval));
identify();
break;
case 11:
_log.debug("received heartbeat ACK");
_last_heartbeat_ack = time(nullptr);
break;
default:
break;
}

std::string str = set_trace(payload);

_log.info("successfully resumed session " + _session_id +
" with trace " + str + " ]");
}
break;
case 1:
send_heartbeat_ack();
break;
case 7:
_log.info("received opcode 7: reconnecting to the gateway");
try {
_client->close(boost::beast::websocket::close_reason(boost::beast::websocket::close_code::going_away, "Server requested reconnect"));
} catch (std::exception& e) {
_log.error("Cannot close websocket: " + std::string(e.what()));
}
break;
case 9:
_invalid_session = true;
break;
case 10:
_heartbeat_interval = payload["d"]["heartbeat_interval"].get<int>();
_log.debug("set heartbeat_interval: " +
std::to_string(_heartbeat_interval));
identify();
break;
case 11:
_log.debug("received heartbeat ACK");
_last_heartbeat_ack = time(nullptr);
break;
default:
break;
}

_message_handler(payload);
_message_handler(payload);
});
}

DiscordCPP::SharedFuture<void> DiscordCPP::MainGateway::send_heartbeat_ack() {
Expand Down
62 changes: 32 additions & 30 deletions Discord.C++/VoiceGateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,40 +42,42 @@ void DiscordCPP::VoiceGateway::identify() {
}

void DiscordCPP::VoiceGateway::on_websocket_incoming_message(const std::string& message) {
_log.debug("Received message: " + message);
threadpool->execute([this, message]() {
_log.debug("Received message: " + message);

json payload = json::parse(message);
int op = payload["op"].get<int>();
json payload = json::parse(message);
int op = payload["op"].get<int>();

if ((payload.count("seq") > 0) && payload["seq"].is_number_integer()) {
_sequence_number = payload["seq"].get<int>();
}
if ((payload.count("seq") > 0) && payload["seq"].is_number_integer()) {
_sequence_number = payload["seq"].get<int>();
}

switch (op) {
case 2:
_reconnect_timeout = 0;
_last_heartbeat_ack = time(nullptr);
_resume = true;
_log.info("connected to: " + _url);
break;
case 6:
_log.debug("received heartbeat ack");
_last_heartbeat_ack = time(nullptr);
break;
case 8:
_heartbeat_interval = payload["d"]["heartbeat_interval"].get<int>();
identify();
break;
case 9:
_reconnect_timeout = 0;
_last_heartbeat_ack = time(nullptr);
_resume = true;
_log.info("successfully resumed session for guild with id: " +
_guild_id);
break;
}
switch (op) {
case 2:
_reconnect_timeout = 0;
_last_heartbeat_ack = time(nullptr);
_resume = true;
_log.info("connected to: " + _url);
break;
case 6:
_log.debug("received heartbeat ack");
_last_heartbeat_ack = time(nullptr);
break;
case 8:
_heartbeat_interval = payload["d"]["heartbeat_interval"].get<int>();
identify();
break;
case 9:
_reconnect_timeout = 0;
_last_heartbeat_ack = time(nullptr);
_resume = true;
_log.info("successfully resumed session for guild with id: " +
_guild_id);
break;
}

_message_handler(payload);
_message_handler(payload);
});
}

DiscordCPP::VoiceGateway::VoiceGateway(const std::string& token,
Expand Down

0 comments on commit 32d899a

Please sign in to comment.