Skip to content

Commit

Permalink
Auto merge of #105 - danlrobertson:use-uuid, r=antrik
Browse files Browse the repository at this point in the history
Use autoincrementing number for handler ids

Use `Uuid` instead of `i64` for the handler keys to ensure file descriptor number reuse doesn't cause errors, and make sure to close file descriptors.

**NB:** The scope of the PR has changed to change the handler id's from a platform specific `i64` to an autoincrementing `u64`. `macos` has not yet been ported, and probably suffers from the same leak as the `unix` module did prior to this PR
  • Loading branch information
bors-servo authored Nov 10, 2016
2 parents e6ea839 + 2e1081c commit 357abb9
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 54 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ libc = "0.2.12"
rand = "0.3"
serde = "0.8"
uuid = {version = "0.3", features = ["v4"]}
fnv = "1.0.3"

[dev-dependencies]
crossbeam = "0.2"
10 changes: 5 additions & 5 deletions src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,12 @@ impl IpcReceiverSet {
})
}

pub fn add<T>(&mut self, receiver: IpcReceiver<T>) -> Result<i64,Error>
pub fn add<T>(&mut self, receiver: IpcReceiver<T>) -> Result<u64,Error>
where T: Deserialize + Serialize {
Ok(try!(self.os_receiver_set.add(receiver.os_receiver)))
}

pub fn add_opaque(&mut self, receiver: OpaqueIpcReceiver) -> Result<i64,Error> {
pub fn add_opaque(&mut self, receiver: OpaqueIpcReceiver) -> Result<u64,Error> {
Ok(try!(self.os_receiver_set.add(receiver.os_receiver)))
}

Expand Down Expand Up @@ -307,12 +307,12 @@ impl IpcSharedMemory {
}

pub enum IpcSelectionResult {
MessageReceived(i64, OpaqueIpcMessage),
ChannelClosed(i64),
MessageReceived(u64, OpaqueIpcMessage),
ChannelClosed(u64),
}

impl IpcSelectionResult {
pub fn unwrap(self) -> (i64, OpaqueIpcMessage) {
pub fn unwrap(self) -> (u64, OpaqueIpcMessage) {
match self {
IpcSelectionResult::MessageReceived(id, message) => (id, message),
IpcSelectionResult::ChannelClosed(id) => {
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ extern crate rand;
extern crate serde;
#[cfg(any(feature = "force-inprocess", target_os = "windows", target_os = "android"))]
extern crate uuid;
#[cfg(all(not(feature = "force-inprocess"), any(target_os = "linux",
target_os = "freebsd")))]
extern crate fnv;

pub mod ipc;
pub mod platform;
Expand Down
53 changes: 27 additions & 26 deletions src/platform/inprocess/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,30 +147,30 @@ impl OsIpcSender {
}

pub struct OsIpcReceiverSet {
last_index: usize,
receiver_ids: Vec<usize>,
incrementor: Incrementor,
receiver_ids: Vec<u64>,
receivers: Vec<OsIpcReceiver>,
}

impl OsIpcReceiverSet {
pub fn new() -> Result<OsIpcReceiverSet,MpscError> {
Ok(OsIpcReceiverSet {
last_index: 0,
incrementor: Incrementor::new(),
receiver_ids: vec![],
receivers: vec![],
})
}

pub fn add(&mut self, receiver: OsIpcReceiver) -> Result<i64,MpscError> {
self.last_index += 1;
self.receiver_ids.push(self.last_index);
pub fn add(&mut self, receiver: OsIpcReceiver) -> Result<u64,MpscError> {
let last_index = self.incrementor.increment();
self.receiver_ids.push(last_index);
self.receivers.push(receiver.consume());
Ok(self.last_index as i64)
Ok(last_index)
}

pub fn select(&mut self) -> Result<Vec<OsIpcSelectionResult>,MpscError> {
let mut receivers: Vec<Option<mpsc::Receiver<MpscChannelMessage>>> = Vec::with_capacity(self.receivers.len());
let mut r_id: i64 = -1;
let mut r_id: Option<u64> = None;
let mut r_index: usize = 0;

{
Expand All @@ -195,7 +195,7 @@ impl OsIpcReceiverSet {
for (index,h) in handles.iter().enumerate() {
if h.id() == id {
r_index = index;
r_id = self.receiver_ids[index] as i64;
r_id = Some(self.receiver_ids[index]);
break;
}
}
Expand All @@ -206,31 +206,32 @@ impl OsIpcReceiverSet {
mem::replace(&mut *r.receiver.borrow_mut(), mem::replace(&mut receivers[index], None));
}

if r_id == -1 {
return Err(MpscError::UnknownError);
}

let receivers = &mut self.receivers;
match receivers[r_index].recv() {
Ok((data, channels, shmems)) =>
Ok(vec![OsIpcSelectionResult::DataReceived(r_id, data, channels, shmems)]),
Err(MpscError::ChannelClosedError) => {
receivers.remove(r_index);
self.receiver_ids.remove(r_index);
Ok(vec![OsIpcSelectionResult::ChannelClosed(r_id)])
},
Err(err) => Err(err),
match r_id {
None => Err(MpscError::UnknownError),
Some(r_id) => {
let receivers = &mut self.receivers;
match receivers[r_index].recv() {
Ok((data, channels, shmems)) =>
Ok(vec![OsIpcSelectionResult::DataReceived(r_id, data, channels, shmems)]),
Err(MpscError::ChannelClosedError) => {
receivers.remove(r_index);
self.receiver_ids.remove(r_index);
Ok(vec![OsIpcSelectionResult::ChannelClosed(r_id)])
},
Err(err) => Err(err),
}
}
}
}
}

pub enum OsIpcSelectionResult {
DataReceived(i64, Vec<u8>, Vec<OsOpaqueIpcChannel>, Vec<OsIpcSharedMemory>),
ChannelClosed(i64),
DataReceived(u64, Vec<u8>, Vec<OsOpaqueIpcChannel>, Vec<OsIpcSharedMemory>),
ChannelClosed(u64),
}

impl OsIpcSelectionResult {
pub fn unwrap(self) -> (i64, Vec<u8>, Vec<OsOpaqueIpcChannel>, Vec<OsIpcSharedMemory>) {
pub fn unwrap(self) -> (u64, Vec<u8>, Vec<OsOpaqueIpcChannel>, Vec<OsIpcSharedMemory>) {
match self {
OsIpcSelectionResult::DataReceived(id, data, channels, shared_memory_regions) => {
(id, data, channels, shared_memory_regions)
Expand Down
14 changes: 7 additions & 7 deletions src/platform/macos/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,13 +516,13 @@ impl OsIpcReceiverSet {
}
}

pub fn add(&mut self, receiver: OsIpcReceiver) -> Result<i64,MachError> {
pub fn add(&mut self, receiver: OsIpcReceiver) -> Result<u64,MachError> {
let receiver_port = receiver.consume_port();
let os_result = unsafe {
mach_sys::mach_port_move_member(mach_task_self(), receiver_port, self.port.get())
};
if os_result == KERN_SUCCESS {
Ok(receiver_port as i64)
Ok(receiver_port as u64)
} else {
Err(MachError(os_result))
}
Expand All @@ -534,12 +534,12 @@ impl OsIpcReceiverSet {
}

pub enum OsIpcSelectionResult {
DataReceived(i64, Vec<u8>, Vec<OsOpaqueIpcChannel>, Vec<OsIpcSharedMemory>),
ChannelClosed(i64),
DataReceived(u64, Vec<u8>, Vec<OsOpaqueIpcChannel>, Vec<OsIpcSharedMemory>),
ChannelClosed(u64),
}

impl OsIpcSelectionResult {
pub fn unwrap(self) -> (i64, Vec<u8>, Vec<OsOpaqueIpcChannel>, Vec<OsIpcSharedMemory>) {
pub fn unwrap(self) -> (u64, Vec<u8>, Vec<OsOpaqueIpcChannel>, Vec<OsIpcSharedMemory>) {
match self {
OsIpcSelectionResult::DataReceived(id, data, channels, shared_memory_regions) => {
(id, data, channels, shared_memory_regions)
Expand Down Expand Up @@ -609,7 +609,7 @@ fn select(port: mach_port_t, blocking_mode: BlockingMode)

let local_port = (*message).header.msgh_local_port;
if (*message).header.msgh_id == MACH_NOTIFY_NO_SENDERS {
return Ok(OsIpcSelectionResult::ChannelClosed(local_port as i64))
return Ok(OsIpcSelectionResult::ChannelClosed(local_port as u64))
}

let (mut ports, mut shared_memory_regions) = (Vec::new(), Vec::new());
Expand Down Expand Up @@ -643,7 +643,7 @@ fn select(port: mach_port_t, blocking_mode: BlockingMode)
libc::free(allocated_buffer)
}

Ok(OsIpcSelectionResult::DataReceived(local_port as i64,
Ok(OsIpcSelectionResult::DataReceived(local_port as u64,
payload,
ports,
shared_memory_regions))
Expand Down
19 changes: 19 additions & 0 deletions src/platform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,25 @@
// except according to those terms.

mod os {
#[cfg(any(feature = "force-inprocess", not(target_os = "macos")))]
struct Incrementor {
last_value: u64,
}

#[cfg(any(feature = "force-inprocess", not(target_os = "macos")))]
impl Incrementor {
fn new() -> Incrementor {
Incrementor {
last_value: 0
}
}

fn increment(&mut self) -> u64 {
self.last_value += 1;
self.last_value
}
}

#[cfg(all(not(feature = "force-inprocess"), any(target_os = "linux",
target_os = "freebsd")))]
include!("unix/mod.rs");
Expand Down
40 changes: 26 additions & 14 deletions src/platform/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use fnv::FnvHasher;
use bincode::serde::DeserializeError;
use libc::{self, MAP_FAILED, MAP_SHARED, POLLIN, PROT_READ, PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET};
use libc::{SO_LINGER, S_IFMT, S_IFSOCK, c_char, c_int, c_void, getsockopt};
use libc::{iovec, mkstemp, mode_t, msghdr, nfds_t, off_t, poll, pollfd, recvmsg, sendmsg};
use libc::{setsockopt, size_t, sockaddr, sockaddr_un, socketpair, socklen_t, sa_family_t};
use std::cell::Cell;
use std::cmp;
use std::collections::HashSet;
use std::collections::HashMap;
use std::ffi::{CStr, CString};
use std::fmt::{self, Debug, Formatter};
use std::hash::BuildHasherDefault;
use std::io::Error;
use std::marker::PhantomData;
use std::mem;
Expand Down Expand Up @@ -412,7 +414,9 @@ impl OsIpcChannel {
}

pub struct OsIpcReceiverSet {
incrementor: Incrementor,
pollfds: Vec<pollfd>,
fdids: HashMap<c_int, u64, BuildHasherDefault<FnvHasher>>
}

impl Drop for OsIpcReceiverSet {
Expand All @@ -428,19 +432,24 @@ impl Drop for OsIpcReceiverSet {

impl OsIpcReceiverSet {
pub fn new() -> Result<OsIpcReceiverSet,UnixError> {
let fnv = BuildHasherDefault::<FnvHasher>::default();
Ok(OsIpcReceiverSet {
incrementor: Incrementor::new(),
pollfds: Vec::new(),
fdids: HashMap::with_hasher(fnv)
})
}

pub fn add(&mut self, receiver: OsIpcReceiver) -> Result<i64,UnixError> {
pub fn add(&mut self, receiver: OsIpcReceiver) -> Result<u64,UnixError> {
let last_index = self.incrementor.increment();
let fd = receiver.consume_fd();
self.pollfds.push(pollfd {
fd: fd,
events: POLLIN,
revents: 0,
});
Ok(fd as i64)
self.fdids.insert(fd, last_index);
Ok(last_index)
}

pub fn select(&mut self) -> Result<Vec<OsIpcSelectionResult>,UnixError> {
Expand All @@ -452,43 +461,46 @@ impl OsIpcReceiverSet {
return Err(UnixError::last())
}

let mut hangups = HashSet::new();
for pollfd in self.pollfds.iter_mut() {
if (pollfd.revents & POLLIN) != 0 {
match recv(pollfd.fd, BlockingMode::Blocking) {
Ok((data, channels, shared_memory_regions)) => {
selection_results.push(OsIpcSelectionResult::DataReceived(
pollfd.fd as i64,
*self.fdids.get(&pollfd.fd).unwrap(),
data,
channels,
shared_memory_regions));
}
Err(err) if err.channel_is_closed() => {
hangups.insert(pollfd.fd);
selection_results.push(OsIpcSelectionResult::ChannelClosed(
pollfd.fd as i64))
let id = self.fdids.remove(&pollfd.fd).unwrap();
unsafe {
libc::close(pollfd.fd);
}
selection_results.push(OsIpcSelectionResult::ChannelClosed(id))
}
Err(err) => return Err(err),
}
pollfd.revents = pollfd.revents & !POLLIN
}
}

if !hangups.is_empty() {
self.pollfds.retain(|pollfd| !hangups.contains(&pollfd.fd));
}
// File descriptors not in fdids are closed channels, and the descriptor
// has been closed. This must be done after we have finished iterating over
// the pollfds vec.
let fdids = &self.fdids;
self.pollfds.retain(|pollfd| fdids.contains_key(&pollfd.fd));

Ok(selection_results)
}
}

pub enum OsIpcSelectionResult {
DataReceived(i64, Vec<u8>, Vec<OsOpaqueIpcChannel>, Vec<OsIpcSharedMemory>),
ChannelClosed(i64),
DataReceived(u64, Vec<u8>, Vec<OsOpaqueIpcChannel>, Vec<OsIpcSharedMemory>),
ChannelClosed(u64),
}

impl OsIpcSelectionResult {
pub fn unwrap(self) -> (i64, Vec<u8>, Vec<OsOpaqueIpcChannel>, Vec<OsIpcSharedMemory>) {
pub fn unwrap(self) -> (u64, Vec<u8>, Vec<OsOpaqueIpcChannel>, Vec<OsIpcSharedMemory>) {
match self {
OsIpcSelectionResult::DataReceived(id, data, channels, shared_memory_regions) => {
(id, data, channels, shared_memory_regions)
Expand Down
4 changes: 2 additions & 2 deletions src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ struct RouterProxyComm {

struct Router {
msg_receiver: Receiver<RouterMsg>,
msg_wakeup_id: i64,
msg_wakeup_id: u64,
ipc_receiver_set: IpcReceiverSet,
handlers: HashMap<i64,RouterHandler>,
handlers: HashMap<u64,RouterHandler>,
}

impl Router {
Expand Down

0 comments on commit 357abb9

Please sign in to comment.