Skip to content

Commit

Permalink
Guard all accesses to publisher->streams with publisher->streams_mutex (
Browse files Browse the repository at this point in the history
  • Loading branch information
atoppi authored Oct 7, 2024
1 parent fb9c2cc commit d014cff
Showing 1 changed file with 51 additions and 9 deletions.
60 changes: 51 additions & 9 deletions src/plugins/janus_videoroom.c
Original file line number Diff line number Diff line change
Expand Up @@ -2713,6 +2713,7 @@ static void janus_videoroom_publisher_dereference_nodebug(janus_videoroom_publis

static void janus_videoroom_publisher_destroy(janus_videoroom_publisher *p) {
if(p && g_atomic_int_compare_and_exchange(&p->destroyed, 0, 1)) {
janus_mutex_lock(&p->streams_mutex);
/* Forwarders with RTCP support may have an extra reference, stop their source */
janus_mutex_lock(&p->rtp_forwarders_mutex);
if(g_hash_table_size(p->rtp_forwarders) > 0) {
Expand Down Expand Up @@ -2746,6 +2747,7 @@ static void janus_videoroom_publisher_destroy(janus_videoroom_publisher *p) {
}
}
janus_mutex_unlock(&p->rtp_forwarders_mutex);
janus_mutex_unlock(&p->streams_mutex);
janus_refcount_decrease(&p->ref);
}
}
Expand Down Expand Up @@ -4534,6 +4536,7 @@ json_t *janus_videoroom_query_session(janus_plugin_session *handle) {
if(participant->e2ee)
json_object_set_new(info, "e2ee", json_true());
json_t *media = json_array();
janus_mutex_lock(&participant->streams_mutex);
GList *temp = participant->streams;
while(temp) {
janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)temp->data;
Expand Down Expand Up @@ -4576,6 +4579,7 @@ json_t *janus_videoroom_query_session(janus_plugin_session *handle) {
json_array_append_new(media, m);
temp = temp->next;
}
janus_mutex_unlock(&participant->streams_mutex);
json_object_set_new(info, "streams", media);
}
if(participant != NULL)
Expand Down Expand Up @@ -5821,6 +5825,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
goto prepare_response;
}
janus_refcount_increase(&publisher->ref); /* This is just to handle the request for now */
janus_mutex_lock(&publisher->streams_mutex);
janus_mutex_lock(&publisher->rtp_forwarders_mutex);
if(publisher->udp_sock <= 0) {
publisher->udp_sock = socket(!ipv6_disabled ? AF_INET6 : AF_INET, SOCK_DGRAM, IPPROTO_UDP);
Expand Down Expand Up @@ -5850,9 +5855,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
json_t *s = json_array_get(streams, i);
json_t *stream_mid = json_object_get(s, "mid");
const char *mid = json_string_value(stream_mid);
janus_mutex_lock(&publisher->streams_mutex);
ps = g_hash_table_lookup(publisher->streams_bymid, mid);
janus_mutex_unlock(&publisher->streams_mutex);
if(ps == NULL) {
/* FIXME Should we return an error instead? */
JANUS_LOG(LOG_WARN, "No such stream with mid '%s', skipping forwarder...\n", mid);
Expand Down Expand Up @@ -6079,7 +6082,6 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
guint32 audio_handle = 0;
guint32 video_handle[3] = {0, 0, 0};
guint32 data_handle = 0;
janus_mutex_lock(&publisher->streams_mutex);
if(audio_port > 0) {
/* FIXME Find the audio stream */
GList *temp = publisher->streams;
Expand Down Expand Up @@ -6224,7 +6226,6 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
}
}
}
janus_mutex_unlock(&publisher->streams_mutex);
if(audio_handle > 0) {
json_object_set_new(rtp_stream, "audio_stream_id", json_integer(audio_handle));
json_object_set_new(rtp_stream, "audio", json_integer(audio_port));
Expand Down Expand Up @@ -6254,6 +6255,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
json_object_set_new(rtp_stream, "warning", json_string("deprecated_api"));
}
janus_mutex_unlock(&publisher->rtp_forwarders_mutex);
janus_mutex_unlock(&publisher->streams_mutex);
janus_mutex_unlock(&videoroom->mutex);
/* These two unrefs are related to the message handling */
janus_refcount_decrease(&publisher->ref);
Expand Down Expand Up @@ -6351,6 +6353,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
goto prepare_response;
}
janus_refcount_increase(&publisher->ref); /* Just to handle the message now */
janus_mutex_lock(&publisher->streams_mutex);
janus_mutex_lock(&publisher->rtp_forwarders_mutex);
/* Find the forwarder by iterating on all the streams */
gboolean found = FALSE;
Expand Down Expand Up @@ -6380,6 +6383,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
temp = temp->next;
}
janus_mutex_unlock(&publisher->rtp_forwarders_mutex);
janus_mutex_unlock(&publisher->streams_mutex);
janus_refcount_decrease(&publisher->ref);
janus_mutex_unlock(&videoroom->mutex);
janus_refcount_decrease(&videoroom->ref);
Expand Down Expand Up @@ -6846,6 +6850,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
/* To see if the participant is talking, we need to find the audio stream(s) */
if(g_atomic_int_get(&p->session->started)) {
gboolean found = FALSE, talking = FALSE;
janus_mutex_lock(&p->streams_mutex);
GList *temp = p->streams;
while(temp) {
janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)temp->data;
Expand All @@ -6856,6 +6861,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
}
temp = temp->next;
}
janus_mutex_unlock(&p->streams_mutex);
if(found)
json_object_set_new(pl, "talking", talking ? json_true() : json_false());
}
Expand Down Expand Up @@ -6908,9 +6914,11 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
g_hash_table_iter_init(&iter, videoroom->participants);
while (!g_atomic_int_get(&videoroom->destroyed) && g_hash_table_iter_next(&iter, NULL, &value)) {
janus_videoroom_publisher *p = value;
janus_mutex_lock(&p->streams_mutex);
janus_mutex_lock(&p->rtp_forwarders_mutex);
if(g_hash_table_size(p->rtp_forwarders) == 0) {
janus_mutex_unlock(&p->rtp_forwarders_mutex);
janus_mutex_unlock(&p->streams_mutex);
continue;
}
json_t *pl = json_object();
Expand Down Expand Up @@ -6948,6 +6956,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
temp = temp->next;
}
janus_mutex_unlock(&p->rtp_forwarders_mutex);
janus_mutex_unlock(&p->streams_mutex);
json_object_set_new(pl, "forwarders", flist);
json_array_append_new(list, pl);
}
Expand Down Expand Up @@ -7000,9 +7009,12 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
/* Something changed */
if(!participant->recording_active) {
/* Not recording (anymore?) */
janus_mutex_lock(&participant->streams_mutex);
janus_videoroom_recorder_close(participant);
janus_mutex_unlock(&participant->streams_mutex);
} else if(participant->recording_active && g_atomic_int_get(&participant->session->started)) {
/* We've started recording, send a PLI and go on */
janus_mutex_lock(&participant->streams_mutex);
GList *temp = participant->streams;
while(temp) {
janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)temp->data;
Expand All @@ -7013,6 +7025,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
}
temp = temp->next;
}
janus_mutex_unlock(&participant->streams_mutex);
}
}
janus_mutex_unlock(&participant->rec_mutex);
Expand Down Expand Up @@ -7169,9 +7182,11 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
g_snprintf(error_cause, 512, "Only local publishers can be remotized");
goto prepare_response;
}
janus_mutex_lock(&publisher->streams_mutex);
janus_mutex_lock(&publisher->rtp_forwarders_mutex);
if(g_hash_table_lookup(publisher->remote_recipients, remote_id) != NULL) {
janus_mutex_unlock(&publisher->rtp_forwarders_mutex);
janus_mutex_unlock(&publisher->streams_mutex);
janus_refcount_decrease(&publisher->ref);
janus_refcount_decrease(&videoroom->ref);
JANUS_LOG(LOG_ERR, "Remotization already exists (%s)\n", remote_id);
Expand All @@ -7185,6 +7200,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
if(publisher->udp_sock <= 0 ||
(!ipv6_disabled && setsockopt(publisher->udp_sock, IPPROTO_IPV6, IPV6_V6ONLY, &v6only, sizeof(v6only)) != 0)) {
janus_mutex_unlock(&publisher->rtp_forwarders_mutex);
janus_mutex_unlock(&publisher->streams_mutex);
janus_refcount_decrease(&publisher->ref);
janus_refcount_decrease(&videoroom->ref);
JANUS_LOG(LOG_ERR, "Could not open UDP socket for RTP stream for publisher (%s), %d (%s)\n",
Expand All @@ -7195,7 +7211,6 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
}
}
/* Add a new RTP forwarder for each of the publisher streams */
janus_mutex_lock(&publisher->streams_mutex);
janus_videoroom_publisher_stream *ps = NULL;
janus_rtp_forwarder *f = NULL;
gboolean rtcp_added = FALSE, add_rtcp = FALSE;
Expand Down Expand Up @@ -7253,7 +7268,6 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
}
temp = temp->next;
}
janus_mutex_unlock(&publisher->streams_mutex);
/* Keep track of this remotization */
janus_videoroom_remote_recipient *recipient = g_malloc(sizeof(janus_videoroom_remote_recipient));
recipient->remote_id = g_strdup(remote_id);
Expand All @@ -7264,6 +7278,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
g_hash_table_insert(publisher->remote_recipients, g_strdup(remote_id), recipient);
/* Done */
janus_mutex_unlock(&publisher->rtp_forwarders_mutex);
janus_mutex_unlock(&publisher->streams_mutex);
response = json_object();
json_object_set_new(response, "videoroom", json_string("success"));
json_object_set_new(response, "room", string_ids ? json_string(publisher->room_id_str) : json_integer(publisher->room_id));
Expand Down Expand Up @@ -7333,10 +7348,12 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
}
janus_refcount_increase(&publisher->ref);
janus_mutex_unlock(&videoroom->mutex);
janus_mutex_lock(&publisher->streams_mutex);
janus_mutex_lock(&publisher->rtp_forwarders_mutex);
/* Check if we know of this remotization */
if(g_hash_table_remove(publisher->remote_recipients, remote_id) == FALSE) {
janus_mutex_unlock(&publisher->rtp_forwarders_mutex);
janus_mutex_unlock(&publisher->streams_mutex);
janus_refcount_decrease(&publisher->ref);
janus_refcount_decrease(&videoroom->ref);
JANUS_LOG(LOG_ERR, "No such remotization (%s)\n", remote_id);
Expand Down Expand Up @@ -7368,6 +7385,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
temp = temp->next;
}
janus_mutex_unlock(&publisher->rtp_forwarders_mutex);
janus_mutex_unlock(&publisher->streams_mutex);
/* Done */
response = json_object();
json_object_set_new(response, "videoroom", json_string("success"));
Expand Down Expand Up @@ -7801,9 +7819,11 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
janus_mutex_init(&ps->subscribers_mutex);
janus_mutex_init(&ps->rtp_forwarders_mutex);
ps->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_rtp_forwarder_destroy);
janus_mutex_lock(&publisher->streams_mutex);
publisher->streams = g_list_append(publisher->streams, ps);
g_hash_table_insert(publisher->streams_byid, GINT_TO_POINTER(ps->mindex), ps);
g_hash_table_insert(publisher->streams_bymid, g_strdup(ps->mid), ps);
janus_mutex_unlock(&publisher->streams_mutex);
mindex++;
}
/* Done, spawn a thread for this remote publisher */
Expand Down Expand Up @@ -8082,11 +8102,11 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
g_hash_table_insert(publisher->streams_bymid, g_strdup(ps->mid), ps);
changes = TRUE;
}
janus_mutex_unlock(&publisher->streams_mutex);
if(changes) {
/* Notify all other participants this publisher's media has changed */
janus_videoroom_notify_about_publisher(publisher, TRUE);
}
janus_mutex_unlock(&publisher->streams_mutex);
/* Done */
janus_refcount_decrease(&publisher->ref);
janus_refcount_decrease(&videoroom->ref);
Expand Down Expand Up @@ -8352,9 +8372,10 @@ void janus_videoroom_setup_media(janus_plugin_session *handle) {
if(session->participant_type == janus_videoroom_p_type_publisher) {
janus_videoroom_publisher *participant = janus_videoroom_session_get_publisher(session);
/* Notify all other participants that there's a new boy in town */
janus_mutex_lock(&participant->rec_mutex);
janus_mutex_lock(&participant->streams_mutex);
janus_videoroom_notify_about_publisher(participant, FALSE);
/* Check if we need to start recording */
janus_mutex_lock(&participant->rec_mutex);
if((participant->room && participant->room->record) || participant->recording_active) {
GList *temp = participant->streams;
while(temp) {
Expand All @@ -8364,6 +8385,7 @@ void janus_videoroom_setup_media(janus_plugin_session *handle) {
}
participant->recording_active = TRUE;
}
janus_mutex_unlock(&participant->streams_mutex);
janus_mutex_unlock(&participant->rec_mutex);
janus_refcount_decrease(&participant->ref);
} else if(session->participant_type == janus_videoroom_p_type_subscriber) {
Expand Down Expand Up @@ -9035,7 +9057,9 @@ static void janus_videoroom_hangup_media_internal(gpointer session_data) {
janus_mutex_lock(&participant->rec_mutex);
g_free(participant->recording_base);
participant->recording_base = NULL;
janus_mutex_lock(&participant->streams_mutex);
janus_videoroom_recorder_close(participant);
janus_mutex_unlock(&participant->streams_mutex)
janus_mutex_unlock(&participant->rec_mutex);
participant->acodec = JANUS_AUDIOCODEC_NONE;
participant->vcodec = JANUS_VIDEOCODEC_NONE;
Expand Down Expand Up @@ -9632,6 +9656,7 @@ static void *janus_videoroom_handler(void *data) {
/* Add proper info on all the streams */
gboolean audio_added = FALSE, video_added = FALSE, talking_found = FALSE, talking = FALSE;
json_t *media = json_array();
janus_mutex_lock(&p->streams_mutex);
GList *temp = p->streams;
while(temp) {
janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)temp->data;
Expand Down Expand Up @@ -9688,6 +9713,7 @@ static void *janus_videoroom_handler(void *data) {
json_array_append_new(media, info);
temp = temp->next;
}
janus_mutex_unlock(&p->streams_mutex);
json_object_set_new(pl, "streams", media);
if(talking_found)
json_object_set_new(pl, "talking", talking ? json_true() : json_false());
Expand Down Expand Up @@ -10731,9 +10757,12 @@ static void *janus_videoroom_handler(void *data) {
/* Something changed */
if(!participant->recording_active) {
/* Not recording (anymore?) */
janus_mutex_lock(&participant->streams_mutex)
janus_videoroom_recorder_close(participant);
janus_mutex_unlock(&participant->streams_mutex)
} else if(participant->recording_active && g_atomic_int_get(&participant->session->started)) {
/* We've started recording, send a PLI/FIR and go on */
janus_mutex_lock(&participant->streams_mutex);
GList *temp = participant->streams;
while(temp) {
janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)temp->data;
Expand All @@ -10744,6 +10773,7 @@ static void *janus_videoroom_handler(void *data) {
}
temp = temp->next;
}
janus_mutex_unlock(&participant->streams_mutex);
}
}
janus_mutex_unlock(&participant->rec_mutex);
Expand Down Expand Up @@ -10787,10 +10817,10 @@ static void *janus_videoroom_handler(void *data) {
}
}
}
janus_mutex_unlock(&participant->streams_mutex);
/* If at least a description changed, notify everyone else about the publisher details */
if(desc_updated)
janus_videoroom_notify_about_publisher(participant, TRUE);
janus_mutex_unlock(&participant->streams_mutex);
}
/* Done */
event = json_object();
Expand Down Expand Up @@ -11830,6 +11860,7 @@ static void *janus_videoroom_handler(void *data) {
feeds = json_array();
json_object_set_new(root, "streams", feeds);
janus_refcount_increase(&publisher->ref);
janus_mutex_lock(&publisher->streams_mutex);
GList *temp = publisher->streams, *touched_already = NULL;
while(temp) {
janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)temp->data;
Expand Down Expand Up @@ -11857,6 +11888,7 @@ static void *janus_videoroom_handler(void *data) {
}
temp = temp->next;
}
janus_mutex_unlock(&publisher->streams_mutex);
g_list_free(touched_already);
janus_refcount_decrease(&publisher->ref);
/* Take note of the fact this is a legacy request */
Expand Down Expand Up @@ -12883,13 +12915,15 @@ static void *janus_videoroom_handler(void *data) {
/* Is this room recorded, or are we recording this publisher already? */
janus_mutex_lock(&participant->rec_mutex);
if(videoroom->record || participant->recording_active) {
janus_mutex_lock(&participant->streams_mutex);
GList *temp = participant->streams;
while(temp) {
janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)temp->data;
janus_videoroom_recorder_create(ps);
temp = temp->next;
}
participant->recording_active = TRUE;
janus_mutex_unlock(&participant->streams_mutex);
}
janus_mutex_unlock(&participant->rec_mutex);
/* Send the answer back to the publisher */
Expand All @@ -12910,7 +12944,9 @@ static void *janus_videoroom_handler(void *data) {
/* If this is an update/renegotiation, notify participants about this */
if(sdp_update && g_atomic_int_get(&session->started)) {
/* Notify all other participants this publisher's media has changed */
janus_mutex_lock(&participant->streams_mutex);
janus_videoroom_notify_about_publisher(participant, TRUE);
janus_mutex_unlock(&participant->streams_mutex);
}
/* Done */
if(res != JANUS_OK) {
Expand Down Expand Up @@ -13223,6 +13259,7 @@ static void janus_videoroom_rtp_forwarder_rtcp_receive(janus_rtp_forwarder *rf,
janus_videoroom_publisher *p = ps->publisher;
if(p == NULL || g_atomic_int_get(&p->destroyed))
return;
janus_mutex_lock(&p->streams_mutex);
janus_mutex_lock(&p->rtp_forwarders_mutex);
if(g_hash_table_size(p->rtp_forwarders) == 0) {
janus_mutex_unlock(&p->rtp_forwarders_mutex);
Expand Down Expand Up @@ -13256,6 +13293,7 @@ static void janus_videoroom_rtp_forwarder_rtcp_receive(janus_rtp_forwarder *rf,
temp = temp->next;
}
janus_mutex_unlock(&p->rtp_forwarders_mutex);
janus_mutex_unlock(&p->streams_mutex);
if(found)
janus_videoroom_reqpli(ps, "RTCP from remotized forwarder");
}
Expand Down Expand Up @@ -13468,7 +13506,9 @@ static void *janus_videoroom_remote_publisher_thread(void *user_data) {
string_ids ? (gpointer)g_strdup(publisher->user_id_str) : (gpointer)janus_uint64_dup(publisher->user_id),
publisher);
/* Let's also notify all other participants that the publisher is here */
janus_mutex_lock(&publisher->streams_mutex);
janus_videoroom_notify_about_publisher(publisher, FALSE);
janus_mutex_unlock(&publisher->streams_mutex);

/* Loop */
int num = 0, i = 0;
Expand Down Expand Up @@ -13652,7 +13692,9 @@ static void *janus_videoroom_remote_publisher_thread(void *user_data) {
janus_mutex_lock(&publisher->rec_mutex);
g_free(publisher->recording_base);
publisher->recording_base = NULL;
janus_mutex_lock(&publisher->streams_mutex)
janus_videoroom_recorder_close(publisher);
janus_mutex_unlock(&publisher->streams_mutex)
janus_mutex_unlock(&publisher->rec_mutex);
publisher->acodec = JANUS_AUDIOCODEC_NONE;
publisher->vcodec = JANUS_VIDEOCODEC_NONE;
Expand Down

0 comments on commit d014cff

Please sign in to comment.