Skip to content

Commit

Permalink
for bug #241, support mr(merged-read) config and reload. 2.0.52.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Dec 4, 2014
1 parent 57f844b commit 5589b13
Show file tree
Hide file tree
Showing 18 changed files with 293 additions and 48 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ Supported operating systems and hardware:
* 2013-10-17, Created.<br/>

## History
* v2.0, 2014-12-04, for [#241](https://github.com/winlinvip/simple-rtmp-server/issues/241), support mr(merged-read) config and reload. 2.0.52.
* v2.0, 2014-12-04, enable [#241](https://github.com/winlinvip/simple-rtmp-server/issues/241) and [#248](https://github.com/winlinvip/simple-rtmp-server/issues/248), +25% performance, 2.5k publisher. 2.0.50
* v2.0, 2014-12-04, fix [#248](https://github.com/winlinvip/simple-rtmp-server/issues/248), improve about 15% performance for fast buffer. 2.0.49
* v2.0, 2014-12-03, fix [#244](https://github.com/winlinvip/simple-rtmp-server/issues/244), conn thread use cond to wait for recv thread error. 2.0.47.
Expand Down
20 changes: 20 additions & 0 deletions trunk/conf/full.conf
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,26 @@ http_stream {
vhost __defaultVhost__ {
}

# the MR(merged-read) setting for publisher.
vhost mr.srs.com {
# about MR, read https://github.com/winlinvip/simple-rtmp-server/issues/241
mr {
# whether enable the MR(merged-read)
# default: off
enabled on;
# the latency in ms for MR(merged-read),
# the performance+ when latency+, and memory+,
# memory(buffer) = latency * kbps / 8
# for example, latency=500ms, kbps=3000kbps, each publish connection will consume
# memory = 500 * 3000 / 8 = 187500B = 183KB
# when there are 2500 publisher, the total memory of SRS atleast:
# 183KB * 2500 = 446MB
# the value recomment is [300, 2000]
# default: 500
latency 500;
}
}

# vhost for edge, edge and origin is the same vhost
vhost same.edge.srs.com {
# the mode of vhost, local or remote.
Expand Down
68 changes: 67 additions & 1 deletion trunk/src/app/srs_app_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,18 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root)
return ret;
}
}
srs_trace("vhost %s reload hls success.", vhost.c_str());
srs_trace("vhost %s reload hlsdvrsuccess.", vhost.c_str());
}
// mr, only one per vhost
if (!srs_directive_equals(new_vhost->get("mr"), old_vhost->get("mr"))) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
ISrsReloadHandler* subscribe = *it;
if ((ret = subscribe->on_reload_vhost_mr(vhost)) != ERROR_SUCCESS) {
srs_error("vhost %s notify subscribes mr failed. ret=%d", vhost.c_str(), ret);
return ret;
}
}
srs_trace("vhost %s reload mr success.", vhost.c_str());
}
// http, only one per vhost.
if (!srs_directive_equals(new_vhost->get("http"), old_vhost->get("http"))) {
Expand Down Expand Up @@ -1316,6 +1327,7 @@ int SrsConfig::check_config()
&& n != "time_jitter"
&& n != "atc" && n != "atc_auto"
&& n != "debug_srs_upnode"
&& n != "mr"
) {
ret = ERROR_SYSTEM_CONFIG_INVALID;
srs_error("unsupported vhost directive %s, ret=%d", n.c_str(), ret);
Expand All @@ -1333,6 +1345,16 @@ int SrsConfig::check_config()
return ret;
}
}
} else if (n == "mr") {
for (int j = 0; j < (int)conf->directives.size(); j++) {
string m = conf->at(j)->name.c_str();
if (m != "enabled" && m != "latency"
) {
ret = ERROR_SYSTEM_CONFIG_INVALID;
srs_error("unsupported vhost mr directive %s, ret=%d", m.c_str(), ret);
return ret;
}
}
} else if (n == "ingest") {
for (int j = 0; j < (int)conf->directives.size(); j++) {
string m = conf->at(j)->name.c_str();
Expand Down Expand Up @@ -2078,6 +2100,50 @@ int SrsConfig::get_chunk_size(string vhost)
return ::atoi(conf->arg0().c_str());
}

bool SrsConfig::get_mr_enabled(string vhost)
{

SrsConfDirective* conf = get_vhost(vhost);

if (!conf) {
return SRS_CONSTS_RTMP_MR;
}

conf = conf->get("mr");
if (!conf) {
return SRS_CONSTS_RTMP_MR;
}

conf = conf->get("enabled");
if (!conf || conf->arg0() != "on") {
return SRS_CONSTS_RTMP_MR;
}

return true;
}

int SrsConfig::get_mr_sleep_ms(string vhost)
{

SrsConfDirective* conf = get_vhost(vhost);

if (!conf) {
return SRS_CONSTS_RTMP_MR_SLEEP;
}

conf = conf->get("mr");
if (!conf) {
return SRS_CONSTS_RTMP_MR_SLEEP;
}

conf = conf->get("latency");
if (!conf || conf->arg0().empty()) {
return SRS_CONSTS_RTMP_MR_SLEEP;
}

return ::atoi(conf->arg0().c_str());
}

int SrsConfig::get_global_chunk_size()
{
SrsConfDirective* conf = root->get("chunk_size");
Expand Down
10 changes: 10 additions & 0 deletions trunk/src/app/srs_app_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,16 @@ class SrsConfig
* @remark, default 60000.
*/
virtual int get_chunk_size(std::string vhost);
/**
* whether mr is enabled for vhost.
* @param vhost, the vhost to get the mr.
*/
virtual bool get_mr_enabled(std::string vhost);
/**
* get the mr sleep time in ms for vhost.
* @param vhost, the vhost to get the mr sleep time.
*/
virtual int get_mr_sleep_ms(std::string vhost);
private:
/**
* get the global chunk size.
Expand Down
108 changes: 90 additions & 18 deletions trunk/src/app/srs_app_recv_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_protocol_buffer.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_core_performance.hpp>
#include <srs_app_config.hpp>

using namespace std;

ISrsMessageHandler::ISrsMessageHandler()
{
Expand Down Expand Up @@ -221,11 +224,13 @@ void SrsQueueRecvThread::on_thread_stop()
}

SrsPublishRecvThread::SrsPublishRecvThread(
SrsRtmpServer* rtmp_sdk, int fd, int timeout_ms,
SrsRtmpServer* rtmp_sdk,
SrsRequest* _req, int mr_sock_fd, int timeout_ms,
SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge
): trd(this, rtmp_sdk, timeout_ms)
{
rtmp = rtmp_sdk;

_conn = conn;
_source = source;
_is_fmle = is_fmle;
Expand All @@ -234,12 +239,22 @@ SrsPublishRecvThread::SrsPublishRecvThread(
recv_error_code = ERROR_SUCCESS;
_nb_msgs = 0;
error = st_cond_new();

req = _req;
mr_fd = mr_sock_fd;

mr_fd = fd;
// the mr settings,
// @see https://github.com/winlinvip/simple-rtmp-server/issues/241
mr = _srs_config->get_mr_enabled(req->vhost);
mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost);

_srs_config->subscribe(this);
}

SrsPublishRecvThread::~SrsPublishRecvThread()
{
_srs_config->unsubscribe(this);

trd.stop();
st_cond_destroy(error);
}
Expand Down Expand Up @@ -282,20 +297,8 @@ void SrsPublishRecvThread::on_thread_start()
// for the main thread never send message.

#ifdef SRS_PERF_MERGED_READ
// socket recv buffer, system will double it.
int nb_rbuf = SRS_MR_SOCKET_BUFFER / 2;
socklen_t sock_buf_size = sizeof(int);
if (setsockopt(mr_fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, sock_buf_size) < 0) {
srs_warn("set sock SO_RCVBUF=%d failed.", nb_rbuf);
}
getsockopt(mr_fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, &sock_buf_size);

srs_trace("merged read sockbuf=%d, actual=%d, sleep %d when nread<=%d",
SRS_MR_SOCKET_BUFFER, nb_rbuf, SRS_MR_MAX_SLEEP_MS, SRS_MR_SMALL_BYTES);

// enable the merge read
// @see https://github.com/winlinvip/simple-rtmp-server/issues/241
rtmp->set_merge_read(true, this);
// for mr.
update_buffer(mr, mr_sleep);
#endif
}

Expand Down Expand Up @@ -349,7 +352,11 @@ void SrsPublishRecvThread::on_recv_error(int ret)
#ifdef SRS_PERF_MERGED_READ
void SrsPublishRecvThread::on_read(ssize_t nread)
{
if (nread < 0 || SRS_MR_MAX_SLEEP_MS <= 0) {
if (!mr) {
return;
}

if (nread < 0 || mr_sleep <= 0) {
return;
}

Expand All @@ -360,7 +367,72 @@ void SrsPublishRecvThread::on_read(ssize_t nread)
* @see https://github.com/winlinvip/simple-rtmp-server/issues/241
*/
if (nread < SRS_MR_SMALL_BYTES) {
st_usleep(SRS_MR_MAX_SLEEP_MS * 1000);
st_usleep(mr_sleep * 1000);
}
}
#endif

int SrsPublishRecvThread::on_reload_vhost_mr(string vhost)
{
int ret = ERROR_SUCCESS;

// the mr settings,
// @see https://github.com/winlinvip/simple-rtmp-server/issues/241
bool mr_enabled = _srs_config->get_mr_enabled(req->vhost);
int sleep_ms = _srs_config->get_mr_sleep_ms(req->vhost);
update_buffer(mr_enabled, sleep_ms);

return ret;
}

void SrsPublishRecvThread::update_buffer(bool mr_enabled, int sleep_ms)
{
// TODO: FIXME: refine it.

#ifdef SRS_PERF_MERGED_READ
// previous enabled mr, update the buffer.
if (mr && mr_sleep != sleep_ms) {
// the underlayer api will set to SRS_MR_SOCKET_BUFFER bytes.
// 4KB=4096, 8KB=8192, 16KB=16384, 32KB=32768, 64KB=65536,
// 128KB=131072, 256KB=262144, 512KB=524288
// the buffer should set to SRS_MR_MAX_SLEEP_MS*kbps/8,
// for example, your system delivery stream in 1000kbps,
// sleep 800ms for small bytes, the buffer should set to:
// 800*1000/8=100000B(about 128KB).
// 2000*3000/8=750000B(about 732KB).
int kbps = 3000;
int socket_buffer_size = mr_sleep * kbps / 8;

// socket recv buffer, system will double it.
int nb_rbuf = socket_buffer_size / 2;
socklen_t sock_buf_size = sizeof(int);
if (setsockopt(mr_fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, sock_buf_size) < 0) {
srs_warn("set sock SO_RCVBUF=%d failed.", nb_rbuf);
}
getsockopt(mr_fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, &sock_buf_size);

srs_trace("merged read sockbuf=%d, actual=%d, sleep %d when nread<=%d",
socket_buffer_size, nb_rbuf, mr_sleep, SRS_MR_SMALL_BYTES);

rtmp->set_recv_buffer(nb_rbuf);
}
#endif

// update to new state
mr = mr_enabled;
mr_sleep = sleep_ms;

#ifdef SRS_PERF_MERGED_READ
// apply new state.
if (mr) {
// enable the merge read
// @see https://github.com/winlinvip/simple-rtmp-server/issues/241
rtmp->set_merge_read(true, this);
} else {
// disable the merge read
// @see https://github.com/winlinvip/simple-rtmp-server/issues/241
rtmp->set_merge_read(false, NULL);
}
#endif
}

14 changes: 13 additions & 1 deletion trunk/src/app/srs_app_recv_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_thread.hpp>
#include <srs_protocol_buffer.hpp>
#include <srs_core_performance.hpp>
#include <srs_app_reload.hpp>

class SrsRtmpServer;
class SrsMessage;
class SrsRtmpConn;
class SrsSource;
class SrsRequest;

/**
* for the recv thread to handle the message.
Expand Down Expand Up @@ -138,15 +140,19 @@ class SrsPublishRecvThread : virtual public ISrsMessageHandler
#ifdef SRS_PERF_MERGED_READ
, virtual public IMergeReadHandler
#endif
, virtual public ISrsReloadHandler
{
private:
SrsRecvThread trd;
SrsRtmpServer* rtmp;
SrsRequest* req;
// the msgs already got.
int64_t _nb_msgs;
// for mr(merged read),
// @see https://github.com/winlinvip/simple-rtmp-server/issues/241
bool mr;
int mr_fd;
int mr_sleep;
// the recv thread error code.
int recv_error_code;
SrsRtmpConn* _conn;
Expand All @@ -158,7 +164,8 @@ class SrsPublishRecvThread : virtual public ISrsMessageHandler
// @see https://github.com/winlinvip/simple-rtmp-server/issues/244
st_cond_t error;
public:
SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, int fd, int timeout_ms,
SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk,
SrsRequest* _req, int mr_sock_fd, int timeout_ms,
SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge);
virtual ~SrsPublishRecvThread();
public:
Expand All @@ -183,6 +190,11 @@ class SrsPublishRecvThread : virtual public ISrsMessageHandler
#ifdef SRS_PERF_MERGED_READ
virtual void on_read(ssize_t nread);
#endif
// interface ISrsReloadHandler
public:
virtual int on_reload_vhost_mr(std::string vhost);
private:
virtual void update_buffer(bool mr_enabled, int sleep_ms);
};

#endif
Expand Down
5 changes: 5 additions & 0 deletions trunk/src/app/srs_app_reload.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ int ISrsReloadHandler::on_reload_vhost_dvr(string /*vhost*/)
return ERROR_SUCCESS;
}

int ISrsReloadHandler::on_reload_vhost_mr(string /*vhost*/)
{
return ERROR_SUCCESS;
}

int ISrsReloadHandler::on_reload_vhost_transcode(string /*vhost*/)
{
return ERROR_SUCCESS;
Expand Down
1 change: 1 addition & 0 deletions trunk/src/app/srs_app_reload.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class ISrsReloadHandler
virtual int on_reload_vhost_forward(std::string vhost);
virtual int on_reload_vhost_hls(std::string vhost);
virtual int on_reload_vhost_dvr(std::string vhost);
virtual int on_reload_vhost_mr(std::string vhost);
virtual int on_reload_vhost_transcode(std::string vhost);
virtual int on_reload_ingest_removed(std::string vhost, std::string ingest_id);
virtual int on_reload_ingest_added(std::string vhost, std::string ingest_id);
Expand Down
Loading

1 comment on commit 5589b13

@winlinvip
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.