Skip to content

Commit

Permalink
Use TypeId to identify subscription::Map
Browse files Browse the repository at this point in the history
  • Loading branch information
hecrj committed Feb 4, 2024
1 parent e14e8e2 commit f39a5fd
Showing 1 changed file with 27 additions and 10 deletions.
37 changes: 27 additions & 10 deletions futures/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{BoxStream, MaybeSend};

use futures::channel::mpsc;
use futures::never::Never;
use std::any::TypeId;
use std::hash::Hash;

/// A stream of runtime events.
Expand Down Expand Up @@ -88,7 +89,10 @@ impl<Message> Subscription<Message> {
}

/// Transforms the [`Subscription`] output with the given function.
pub fn map<A>(mut self, f: fn(Message) -> A) -> Subscription<A>
pub fn map<A>(
mut self,
f: impl Fn(Message) -> A + MaybeSend + Clone + 'static,
) -> Subscription<A>
where
Message: 'static,
A: 'static,
Expand All @@ -97,8 +101,9 @@ impl<Message> Subscription<Message> {
recipes: self
.recipes
.drain(..)
.map(|recipe| {
Box::new(Map::new(recipe, f)) as Box<dyn Recipe<Output = A>>
.map(move |recipe| {
Box::new(Map::new(recipe, f.clone()))
as Box<dyn Recipe<Output = A>>
})
.collect(),
}
Expand Down Expand Up @@ -143,27 +148,39 @@ pub trait Recipe {
fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output>;
}

struct Map<A, B> {
struct Map<A, B, F>
where
F: Fn(A) -> B + 'static,
{
id: TypeId,
recipe: Box<dyn Recipe<Output = A>>,
mapper: fn(A) -> B,
mapper: F,
}

impl<A, B> Map<A, B> {
fn new(recipe: Box<dyn Recipe<Output = A>>, mapper: fn(A) -> B) -> Self {
Map { recipe, mapper }
impl<A, B, F> Map<A, B, F>
where
F: Fn(A) -> B + 'static,
{
fn new(recipe: Box<dyn Recipe<Output = A>>, mapper: F) -> Self {
Map {
id: TypeId::of::<F>(),
recipe,
mapper,
}
}
}

impl<A, B> Recipe for Map<A, B>
impl<A, B, F> Recipe for Map<A, B, F>
where
A: 'static,
B: 'static,
F: Fn(A) -> B + 'static + MaybeSend,
{
type Output = B;

fn hash(&self, state: &mut Hasher) {
self.id.hash(state);
self.recipe.hash(state);
self.mapper.hash(state);
}

fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
Expand Down

0 comments on commit f39a5fd

Please sign in to comment.