Skip to content
This repository has been archived by the owner on Jan 21, 2025. It is now read-only.

Commit

Permalink
refactor AsyncEventSourceMessage
Browse files Browse the repository at this point in the history
  • Loading branch information
vortigont authored and mathieucarbou committed Jun 27, 2024
1 parent bb4eb89 commit 48968b5
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 23 deletions.
37 changes: 17 additions & 20 deletions src/AsyncEventSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,30 +183,25 @@ AsyncEventSourceClient::~AsyncEventSourceClient(){
close();
}

void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage){
if(dataMessage == NULL)
return;
if(!connected()){
delete dataMessage;
return;
}
void AsyncEventSourceClient::_queueMessage(const char * message, size_t len){
#ifdef ESP32
//length() is not thread-safe, thus acquiring the lock before this call..
std::lock_guard<std::mutex> lock(_lockmq);
#endif

if(_messageQueue.size() >= SSE_MAX_QUEUED_MESSAGES){
#ifdef ESP8266
ets_printf(String(F("ERROR: Too many messages queued\n")).c_str());
#else
log_e("Too many messages queued: deleting message");
#endif
delete dataMessage;
} else {
_messageQueue.emplace_back(dataMessage);
// runqueue trigger when new messages added
if(_client->canSend()) {
_runQueue();
}
return;
}

_messageQueue.emplace_back(message, len);
// runqueue trigger when new messages added
if(_client->canSend()) {
_runQueue();
}
}

Expand All @@ -216,8 +211,8 @@ void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){
std::lock_guard<std::mutex> lock(_lockmq);
#endif
while(len && _messageQueue.size()){
len = _messageQueue.front()->ack(len, time);
if(_messageQueue.front()->finished())
len = _messageQueue.front().ack(len, time);
if(_messageQueue.front().finished())
_messageQueue.pop_front();
}
_runQueue();
Expand Down Expand Up @@ -248,12 +243,14 @@ void AsyncEventSourceClient::close(){
}

void AsyncEventSourceClient::write(const char * message, size_t len){
_queueMessage(new AsyncEventSourceMessage(message, len));
if(!connected()) return;
_queueMessage(message, len);
}

void AsyncEventSourceClient::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){
if(!connected()) return;
String ev = generateEventMessage(message, event, id, reconnect);
_queueMessage(new AsyncEventSourceMessage(ev.c_str(), ev.length()));
_queueMessage(ev.c_str(), ev.length());
}

size_t AsyncEventSourceClient::packetsWaiting() const {
Expand All @@ -267,8 +264,8 @@ void AsyncEventSourceClient::_runQueue() {
// Calls to this private method now already protected by _lockmq acquisition
// so no extra call of _lockmq.lock() here..
for ( auto &i : _messageQueue){
if (!i->sent())
i->send(_client);
if (!i.sent())
i.send(_client);
}
}

Expand Down
5 changes: 2 additions & 3 deletions src/AsyncEventSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,11 @@ class AsyncEventSourceClient {
AsyncClient *_client;
AsyncEventSource *_server;
uint32_t _lastId;
std::list< std::unique_ptr<AsyncEventSourceMessage> > _messageQueue;
// ArFi 2020-08-27 for protecting/serializing _messageQueue
std::list< AsyncEventSourceMessage > _messageQueue;
#ifdef ESP32
mutable std::mutex _lockmq;
#endif
void _queueMessage(AsyncEventSourceMessage *dataMessage);
void _queueMessage(const char * message, size_t len);
void _runQueue();

public:
Expand Down

0 comments on commit 48968b5

Please sign in to comment.