Skip to content

Commit

Permalink
Remove unsafe pin projection code and replace it with safe pin_projec…
Browse files Browse the repository at this point in the history
…t crate
  • Loading branch information
romac committed Nov 18, 2020
1 parent c92d0a7 commit 7c8f444
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 20 deletions.
1 change: 1 addition & 0 deletions rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ futures = { version = "0.3", optional = true }
http = { version = "0.2", optional = true }
tokio = { version = "0.3", optional = true }
tracing = { version = "0.1", optional = true }
pin-project = "1.0.1"

[dependencies.hyper]
version = "0.14.0-dev"
Expand Down
16 changes: 7 additions & 9 deletions rpc/src/client/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use async_trait::async_trait;
use futures::task::{Context, Poll};
use futures::Stream;
use getrandom::getrandom;
use pin_project::pin_project;
use std::collections::HashMap;
use std::convert::TryInto;
use std::pin::Pin;
Expand Down Expand Up @@ -53,14 +54,19 @@ pub trait SubscriptionClient {
/// ```
///
/// [`Event`]: ./event/struct.Event.html
#[pin_project]
#[derive(Debug)]
pub struct Subscription {
/// The query for which events will be produced.
pub query: Query,

/// The ID of this subscription (automatically assigned).
pub id: SubscriptionId,

// Our internal result event receiver for this subscription.
#[pin]
event_rx: ChannelRx<Result<Event>>,

// Allows us to interact with the subscription driver (exclusively to
// terminate this subscription).
cmd_tx: ChannelTx<SubscriptionDriverCmd>,
Expand All @@ -70,7 +76,7 @@ impl Stream for Subscription {
type Item = Result<Event>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.pin_get_event_rx().poll_next(cx)
self.project().event_rx.poll_next(cx)
}
}

Expand Down Expand Up @@ -108,14 +114,6 @@ impl Subscription {
)
})?
}

/// Pinning is structural for the underlying channels.
/// As such we can project the underlying channel as a pinned value.
///
/// See https://doc.rust-lang.org/std/pin/index.html#pinning-is-structural-for-field
fn pin_get_event_rx(self: Pin<&mut Self>) -> Pin<&mut ChannelRx<Result<Event>>> {
unsafe { self.map_unchecked_mut(|s| &mut s.event_rx) }
}
}

/// A command that can be sent to the subscription driver.
Expand Down
17 changes: 6 additions & 11 deletions rpc/src/client/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
use std::pin::Pin;

use crate::{Error, Result};
use futures::task::{Context, Poll};
use futures::Stream;
use pin_project::pin_project;
use tokio::sync::mpsc;

use crate::{Error, Result};

/// Constructor for an unbounded channel.
pub fn unbounded<T>() -> (ChannelTx<T>, ChannelRx<T>) {
let (tx, rx) = mpsc::unbounded_channel();
Expand All @@ -36,29 +38,22 @@ impl<T> ChannelTx<T> {
}

/// Receiver interface for a channel.
#[pin_project]
#[derive(Debug)]
pub struct ChannelRx<T>(mpsc::UnboundedReceiver<T>);
pub struct ChannelRx<T>(#[pin] mpsc::UnboundedReceiver<T>);

impl<T> ChannelRx<T> {
/// Wait indefinitely until we receive a value from the channel (or the
/// channel is closed).
pub async fn recv(&mut self) -> Option<T> {
self.0.recv().await
}

/// Pinning is structural for the underlying channel.
/// As such we can project the underlying channel as a pinned value.
///
/// See https://doc.rust-lang.org/std/pin/index.html#pinning-is-structural-for-field
fn pin_get(self: Pin<&mut Self>) -> Pin<&mut mpsc::UnboundedReceiver<T>> {
unsafe { self.map_unchecked_mut(|s| &mut s.0) }
}
}

impl<T> Stream for ChannelRx<T> {
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.pin_get().poll_next(cx)
self.project().0.poll_next(cx)
}
}

0 comments on commit 7c8f444

Please sign in to comment.