Skip to content

Commit

Permalink
for #742, refine code for recv thread.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Jan 23, 2017
1 parent a07986f commit 4583a63
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 110 deletions.
153 changes: 81 additions & 72 deletions trunk/src/app/srs_app_recv_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,27 @@ using namespace std;
// the max small bytes to group
#define SRS_MR_SMALL_BYTES 4096

ISrsMessageHandler::ISrsMessageHandler()
ISrsMessageConsumer::ISrsMessageConsumer()
{
}

ISrsMessageHandler::~ISrsMessageHandler()
ISrsMessageConsumer::~ISrsMessageConsumer()
{
}

SrsRecvThread::SrsRecvThread(ISrsMessageHandler* msg_handler, SrsRtmpServer* rtmp_sdk, int tm)
ISrsMessagePumper::ISrsMessagePumper()
{
}

ISrsMessagePumper::~ISrsMessagePumper()
{
}

SrsRecvThread::SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, int tm)
{
rtmp = r;
pumper = p;
timeout = tm;
handler = msg_handler;
rtmp = rtmp_sdk;
trd = new SrsReusableThread2("recv", this);
}

Expand Down Expand Up @@ -87,29 +95,29 @@ int SrsRecvThread::cycle()
int ret = ERROR_SUCCESS;

while (!trd->interrupted()) {
if (!handler->can_handle()) {
// When the pumper is interrupted, wait then retry.
if (pumper->interrupted()) {
st_usleep(timeout * 1000);
continue;
}

SrsCommonMessage* msg = NULL;

// recv and handle message
ret = rtmp->recv_message(&msg);
if (ret == ERROR_SUCCESS) {
ret = handler->handle(msg);
// Process the received message.
if ((ret = rtmp->recv_message(&msg)) == ERROR_SUCCESS) {
ret = pumper->consume(msg);
}

if (ret != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) {
srs_error("thread process message failed. ret=%d", ret);
srs_error("recv thread error. ret=%d", ret);
}

// we use no timeout to recv, should never got any error.
// Interrupt the receive thread for any error.
trd->interrupt();

// notice the handler got a recv error.
handler->on_recv_error(ret);
// Notify the pumper to quit for error.
pumper->interrupt(ret);

return ret;
}
Expand All @@ -128,15 +136,15 @@ void SrsRecvThread::on_thread_start()
// @see: https://github.com/ossrs/srs/issues/217
rtmp->set_recv_timeout(SRS_CONSTS_NO_TMMS);

handler->on_thread_start();
pumper->on_start();
}

void SrsRecvThread::on_thread_stop()
{
// reset the timeout to pulse mode.
rtmp->set_recv_timeout(timeout * 1000);

handler->on_thread_stop();
pumper->on_stop();
}

SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms)
Expand Down Expand Up @@ -196,16 +204,7 @@ int SrsQueueRecvThread::error_code()
return recv_error_code;
}

bool SrsQueueRecvThread::can_handle()
{
// we only recv one message and then process it,
// for the message may cause the thread to stop,
// when stop, the thread is freed, so the messages
// are dropped.
return empty();
}

int SrsQueueRecvThread::handle(SrsCommonMessage* msg)
int SrsQueueRecvThread::consume(SrsCommonMessage* msg)
{
// put into queue, the send thread will get and process it,
// @see SrsRtmpConn::process_play_control_msg
Expand All @@ -218,24 +217,34 @@ int SrsQueueRecvThread::handle(SrsCommonMessage* msg)
return ERROR_SUCCESS;
}

void SrsQueueRecvThread::on_recv_error(int ret)
bool SrsQueueRecvThread::interrupted()
{
// we only recv one message and then process it,
// for the message may cause the thread to stop,
// when stop, the thread is freed, so the messages
// are dropped.
return !empty();
}

void SrsQueueRecvThread::interrupt(int ret)
{
recv_error_code = ret;

#ifdef SRS_PERF_QUEUE_COND_WAIT
if (_consumer) {
_consumer->wakeup();
}
#endif
}

void SrsQueueRecvThread::on_thread_start()
void SrsQueueRecvThread::on_start()
{
// disable the protocol auto response,
// for the isolate recv thread should never send any messages.
rtmp->set_auto_response(false);
}

void SrsQueueRecvThread::on_thread_stop()
void SrsQueueRecvThread::on_stop()
{
// enable the protocol auto response,
// for the isolate recv thread terminated.
Expand Down Expand Up @@ -325,7 +334,48 @@ void SrsPublishRecvThread::stop()
trd.stop();
}

void SrsPublishRecvThread::on_thread_start()
int SrsPublishRecvThread::consume(SrsCommonMessage* msg)
{
int ret = ERROR_SUCCESS;

// when cid changed, change it.
if (ncid != cid) {
_srs_context->set_id(ncid);
cid = ncid;
}

_nb_msgs++;

// log to show the time of recv thread.
srs_verbose("recv thread now=%"PRId64"us, got msg time=%"PRId64"ms, size=%d",
srs_update_system_time_ms(), msg->header.timestamp, msg->size);

// the rtmp connection will handle this message
ret = _conn->handle_publish_message(_source, msg, _is_fmle, _is_edge);

// must always free it,
// the source will copy it if need to use.
srs_freep(msg);

return ret;
}

bool SrsPublishRecvThread::interrupted()
{
// Never interrupted, always can handle message.
return false;
}

void SrsPublishRecvThread::interrupt(int ret)
{
recv_error_code = ret;

// when recv thread error, signal the conn thread to process it.
// @see https://github.com/ossrs/srs/issues/244
st_cond_signal(error);
}

void SrsPublishRecvThread::on_start()
{
// we donot set the auto response to false,
// for the main thread never send message.
Expand All @@ -342,7 +392,7 @@ void SrsPublishRecvThread::on_thread_start()
#endif
}

void SrsPublishRecvThread::on_thread_stop()
void SrsPublishRecvThread::on_stop()
{
// we donot set the auto response to true,
// for we donot set to false yet.
Expand All @@ -360,47 +410,6 @@ void SrsPublishRecvThread::on_thread_stop()
#endif
}

bool SrsPublishRecvThread::can_handle()
{
// publish thread always can handle message.
return true;
}

int SrsPublishRecvThread::handle(SrsCommonMessage* msg)
{
int ret = ERROR_SUCCESS;

// when cid changed, change it.
if (ncid != cid) {
_srs_context->set_id(ncid);
cid = ncid;
}

_nb_msgs++;

// log to show the time of recv thread.
srs_verbose("recv thread now=%"PRId64"us, got msg time=%"PRId64"ms, size=%d",
srs_update_system_time_ms(), msg->header.timestamp, msg->size);

// the rtmp connection will handle this message
ret = _conn->handle_publish_message(_source, msg, _is_fmle, _is_edge);

// must always free it,
// the source will copy it if need to use.
srs_freep(msg);

return ret;
}

void SrsPublishRecvThread::on_recv_error(int ret)
{
recv_error_code = ret;

// when recv thread error, signal the conn thread to process it.
// @see https://github.com/ossrs/srs/issues/244
st_cond_signal(error);
}

#ifdef SRS_PERF_MERGED_READ
void SrsPublishRecvThread::on_read(ssize_t nread)
{
Expand Down
Loading

0 comments on commit 4583a63

Please sign in to comment.