Skip to content

Commit

Permalink
Added option to automatically determine port to WebRtcServer
Browse files Browse the repository at this point in the history
If port is not specified at createWebRtcServer, automatically determine the port of the webrtcserver by Worker's port range
  • Loading branch information
satoren committed Sep 12, 2022
1 parent bba2aa0 commit e3d6553
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 21 deletions.
2 changes: 1 addition & 1 deletion node/src/WebRtcServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export interface WebRtcServerListenInfo
/**
* Listening port.
*/
port: number;
port?: number;
}

export type WebRtcServerOptions =
Expand Down
75 changes: 75 additions & 0 deletions node/tests/test-WebRtcServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,81 @@ test('worker.createWebRtcServer() succeeds', async () =>
expect(worker.webRtcServersForTesting.size).toBe(0);
}, 2000);

test('worker.createWebRtcServer() use portrange succeeds', async () =>
{
worker = await createWorker();

const onObserverNewWebRtcServer = jest.fn();

worker.observer.once('newwebrtcserver', onObserverNewWebRtcServer);

const webRtcServer = await worker.createWebRtcServer(
{
listenInfos :
[
{
protocol : 'udp',
ip : '127.0.0.1'
},
{
protocol : 'tcp',
ip : '127.0.0.1',
announcedIp : '1.2.3.4'
}
],
appData : { foo: 123 }
});

expect(onObserverNewWebRtcServer).toHaveBeenCalledTimes(1);
expect(onObserverNewWebRtcServer).toHaveBeenCalledWith(webRtcServer);
expect(webRtcServer.id).toBeType('string');
expect(webRtcServer.closed).toBe(false);
expect(webRtcServer.appData).toEqual({ foo: 123 });

await expect(worker.dump())
.resolves
.toEqual(
{
pid : worker.pid,
webRtcServerIds : [ webRtcServer.id ],
routerIds : [],
channelMessageHandlers :
{
channelRequestHandlers : [ webRtcServer.id ],
payloadChannelRequestHandlers : [],
payloadChannelNotificationHandlers : []
}
});

await expect(webRtcServer.dump())
.resolves
.toMatchObject(
{
id : webRtcServer.id,
udpSockets :
[
{ ip: '127.0.0.1', port: expect.any(Number) }
],
tcpServers :
[
{ ip: '127.0.0.1', port: expect.any(Number) }
],
webRtcTransportIds : [],
localIceUsernameFragments : [],
tupleHashes : []
});

// Private API.
expect(worker.webRtcServersForTesting.size).toBe(1);

worker.close();

expect(webRtcServer.closed).toBe(true);

// Private API.
expect(worker.webRtcServersForTesting.size).toBe(0);
}, 2000);

test('worker.createWebRtcServer() with wrong arguments rejects with TypeError', async () =>
{
worker = await createWorker();
Expand Down
8 changes: 4 additions & 4 deletions rust/src/router/webrtc_transport/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ fn create_with_webrtc_server_succeeds() {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
},
port: port1,
port: Some(port1),
});
let listen_infos = listen_infos.insert(WebRtcServerListenInfo {
protocol: Protocol::Tcp,
listen_ip: ListenIp {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
},
port: port2,
port: Some(port2),
});
WebRtcServerOptions::new(listen_infos)
})
Expand Down Expand Up @@ -263,15 +263,15 @@ fn webrtc_server_close_event() {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
},
port: port1,
port: Some(port1),
});
let listen_infos = listen_infos.insert(WebRtcServerListenInfo {
protocol: Protocol::Tcp,
listen_ip: ListenIp {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
},
port: port2,
port: Some(port2),
});
WebRtcServerOptions::new(listen_infos)
})
Expand Down
3 changes: 2 additions & 1 deletion rust/src/webrtc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ pub struct WebRtcServerListenInfo {
#[serde(flatten)]
pub listen_ip: ListenIp,
/// Listening port.
pub port: u16,
#[serde(skip_serializing_if = "Option::is_none")]
pub port: Option<u16>,
}

/// Struct that protects an invariant of having non-empty list of listen infos.
Expand Down
2 changes: 1 addition & 1 deletion rust/src/webrtc_server/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fn worker_close_event() {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
},
port,
port: Some(port),
},
)))
.await
Expand Down
101 changes: 92 additions & 9 deletions rust/tests/integration/webrtc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ fn create_webrtc_server_succeeds() {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
},
port: port1,
port: Some(port1),
});
let listen_infos = listen_infos.insert(WebRtcServerListenInfo {
protocol: Protocol::Tcp,
listen_ip: ListenIp {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: Some(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4))),
},
port: port2,
port: Some(port2),
});
let mut webrtc_server_options = WebRtcServerOptions::new(listen_infos);

Expand Down Expand Up @@ -132,6 +132,89 @@ fn create_webrtc_server_succeeds() {
});
}


#[test]
fn create_webrtc_server_use_portrange_succeeds() {
future::block_on(async move {
let (worker1, _worker2) = init().await;

let new_webrtc_server_count = Arc::new(AtomicUsize::new(0));

worker1
.on_new_webrtc_server({
let new_webrtc_server_count = Arc::clone(&new_webrtc_server_count);

move |_webrtc_server| {
new_webrtc_server_count.fetch_add(1, Ordering::SeqCst);
}
})
.detach();

#[derive(Debug, PartialEq)]
struct CustomAppData {
foo: u32,
}

let webrtc_server = worker1
.create_webrtc_server({
let listen_infos = WebRtcServerListenInfos::new(WebRtcServerListenInfo {
protocol: Protocol::Udp,
listen_ip: ListenIp {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
},
port: None,
});
let listen_infos = listen_infos.insert(WebRtcServerListenInfo {
protocol: Protocol::Tcp,
listen_ip: ListenIp {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: Some(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4))),
},
port: None,
});
let mut webrtc_server_options = WebRtcServerOptions::new(listen_infos);

webrtc_server_options.app_data = AppData::new(CustomAppData { foo: 123 });

webrtc_server_options
})
.await
.expect("Failed to create router");

assert_eq!(new_webrtc_server_count.load(Ordering::SeqCst), 1);
assert!(!webrtc_server.closed());
assert_eq!(
webrtc_server.app_data().downcast_ref::<CustomAppData>(),
Some(&CustomAppData { foo: 123 }),
);

let worker_dump = worker1.dump().await.expect("Failed to dump worker");

assert_eq!(worker_dump.router_ids, vec![]);
assert_eq!(worker_dump.webrtc_server_ids, vec![webrtc_server.id()]);

let dump = webrtc_server
.dump()
.await
.expect("Failed to dump WebRTC server");

assert_eq!(dump.id, webrtc_server.id());


assert_eq!(
dump.udp_sockets[0].ip, IpAddr::V4(Ipv4Addr::LOCALHOST)]
);
assert_eq!(
dump.tcp_servers[0].ip, IpAddr::V4(Ipv4Addr::LOCALHOST)]
);

assert_eq!(dump.webrtc_transport_ids, HashedSet::default());
assert_eq!(dump.local_ice_username_fragments, vec![]);
assert_eq!(dump.tuple_hashes, vec![]);
});
}

#[test]
fn unavailable_infos_fails() {
future::block_on(async move {
Expand Down Expand Up @@ -166,15 +249,15 @@ fn unavailable_infos_fails() {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
},
port: port1,
port: Some(port1),
});
let listen_infos = listen_infos.insert(WebRtcServerListenInfo {
protocol: Protocol::Udp,
listen_ip: ListenIp {
ip: IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)),
announced_ip: None,
},
port: port2,
port: Some(port2),
});

WebRtcServerOptions::new(listen_infos)
Expand All @@ -197,15 +280,15 @@ fn unavailable_infos_fails() {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
},
port: port1,
port: Some(port1),
});
let listen_infos = listen_infos.insert(WebRtcServerListenInfo {
protocol: Protocol::Udp,
listen_ip: ListenIp {
ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
announced_ip: Some(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4))),
},
port: port1,
port: Some(port1),
});

WebRtcServerOptions::new(listen_infos)
Expand All @@ -228,7 +311,7 @@ fn unavailable_infos_fails() {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
},
port: port1,
port: Some(port1),
},
)))
.await
Expand All @@ -242,7 +325,7 @@ fn unavailable_infos_fails() {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
},
port: port1,
port: Some(port1),
},
)))
.await;
Expand Down Expand Up @@ -270,7 +353,7 @@ fn close_event() {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
},
port,
port: Some(port),
},
)))
.await
Expand Down
18 changes: 13 additions & 5 deletions worker/src/RTC/WebRtcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ namespace RTC
auto jsonPortIt = jsonListenInfo.find("port");

if (jsonPortIt == jsonListenInfo.end())
MS_THROW_TYPE_ERROR("missing listenInfo.port");
listenInfo.port = 0;
else if (!(jsonPortIt->is_number() && Utils::Json::IsPositiveInteger(*jsonPortIt)))
MS_THROW_TYPE_ERROR("wrong listenInfo.port (not a positive number)");

listenInfo.port = jsonPortIt->get<uint16_t>();
else
listenInfo.port = jsonPortIt->get<uint16_t>();
}

try
Expand All @@ -111,14 +111,22 @@ namespace RTC
if (listenInfo.protocol == RTC::TransportTuple::Protocol::UDP)
{
// This may throw.
auto* udpSocket = new RTC::UdpSocket(this, listenInfo.ip, listenInfo.port);
RTC::UdpSocket* udpSocket;
if (listenInfo.port != 0)
udpSocket = new RTC::UdpSocket(this, listenInfo.ip, listenInfo.port);
else
udpSocket = new RTC::UdpSocket(this, listenInfo.ip);

this->udpSocketOrTcpServers.emplace_back(udpSocket, nullptr, listenInfo.announcedIp);
}
else if (listenInfo.protocol == RTC::TransportTuple::Protocol::TCP)
{
// This may throw.
auto* tcpServer = new RTC::TcpServer(this, this, listenInfo.ip, listenInfo.port);
RTC::TcpServer* tcpServer;
if (listenInfo.port != 0)
tcpServer = new RTC::TcpServer(this, this, listenInfo.ip, listenInfo.port);
else
tcpServer = new RTC::TcpServer(this, this, listenInfo.ip);

this->udpSocketOrTcpServers.emplace_back(nullptr, tcpServer, listenInfo.announcedIp);
}
Expand Down

0 comments on commit e3d6553

Please sign in to comment.