Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented a response to the server after the client received the message. #310

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions socketio/src/asynchronous/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl ClientBuilder {
#[cfg(feature = "async-callbacks")]
pub fn on<T: Into<Event>, F>(mut self, event: T, callback: F) -> Self
where
F: for<'a> std::ops::FnMut(Payload, Client) -> BoxFuture<'static, ()>
F: for<'a> std::ops::FnMut(Payload, Client, Option<i32>) -> BoxFuture<'static, ()>
+ 'static
+ Send
+ Sync,
Expand Down Expand Up @@ -199,7 +199,7 @@ impl ClientBuilder {
/// ```
pub fn on_any<F>(mut self, callback: F) -> Self
where
F: for<'a> FnMut(Event, Payload, Client) -> BoxFuture<'static, ()> + 'static + Send + Sync,
F: for<'a> FnMut(Event, Payload, Client, Option<i32>) -> BoxFuture<'static, ()> + 'static + Send + Sync,
{
self.on_any = Some(Callback::<DynAsyncAnyCallback>::new(callback));
self
Expand Down
12 changes: 6 additions & 6 deletions socketio/src/asynchronous/client/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use super::client::Client;

/// Internal type, provides a way to store futures and return them in a boxed manner.
pub(crate) type DynAsyncCallback =
Box<dyn for<'a> FnMut(Payload, Client) -> BoxFuture<'static, ()> + 'static + Send + Sync>;
Box<dyn for<'a> FnMut(Payload, Client, Option<i32>) -> BoxFuture<'static, ()> + 'static + Send + Sync>;

pub(crate) type DynAsyncAnyCallback = Box<
dyn for<'a> FnMut(Event, Payload, Client) -> BoxFuture<'static, ()> + 'static + Send + Sync,
dyn for<'a> FnMut(Event, Payload, Client, Option<i32>) -> BoxFuture<'static, ()> + 'static + Send + Sync,
>;

pub(crate) struct Callback<T> {
Expand All @@ -28,7 +28,7 @@ impl<T> Debug for Callback<T> {

impl Deref for Callback<DynAsyncCallback> {
type Target =
dyn for<'a> FnMut(Payload, Client) -> BoxFuture<'static, ()> + 'static + Sync + Send;
dyn for<'a> FnMut(Payload, Client, Option<i32>) -> BoxFuture<'static, ()> + 'static + Sync + Send;

fn deref(&self) -> &Self::Target {
self.inner.as_ref()
Expand All @@ -44,7 +44,7 @@ impl DerefMut for Callback<DynAsyncCallback> {
impl Callback<DynAsyncCallback> {
pub(crate) fn new<T>(callback: T) -> Self
where
T: for<'a> FnMut(Payload, Client) -> BoxFuture<'static, ()> + 'static + Sync + Send,
T: for<'a> FnMut(Payload, Client, Option<i32>) -> BoxFuture<'static, ()> + 'static + Sync + Send,
{
Callback {
inner: Box::new(callback),
Expand All @@ -54,7 +54,7 @@ impl Callback<DynAsyncCallback> {

impl Deref for Callback<DynAsyncAnyCallback> {
type Target =
dyn for<'a> FnMut(Event, Payload, Client) -> BoxFuture<'static, ()> + 'static + Sync + Send;
dyn for<'a> FnMut(Event, Payload, Client, Option<i32>) -> BoxFuture<'static, ()> + 'static + Sync + Send;

fn deref(&self) -> &Self::Target {
self.inner.as_ref()
Expand All @@ -70,7 +70,7 @@ impl DerefMut for Callback<DynAsyncAnyCallback> {
impl Callback<DynAsyncAnyCallback> {
pub(crate) fn new<T>(callback: T) -> Self
where
T: for<'a> FnMut(Event, Payload, Client) -> BoxFuture<'static, ()> + 'static + Sync + Send,
T: for<'a> FnMut(Event, Payload, Client, Option<i32>) -> BoxFuture<'static, ()> + 'static + Sync + Send,
{
Callback {
inner: Box::new(callback),
Expand Down
79 changes: 66 additions & 13 deletions socketio/src/asynchronous/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl Client {
callback: F,
) -> Result<()>
where
F: for<'a> std::ops::FnMut(Payload, Client) -> BoxFuture<'static, ()>
F: for<'a> std::ops::FnMut(Payload, Client, Option<i32>) -> BoxFuture<'static, ()>
+ 'static
+ Send
+ Sync,
Expand All @@ -221,7 +221,7 @@ impl Client {
let id = thread_rng().gen_range(0..999);
let socket_packet =
self.socket
.build_packet_for_payload(data.into(), event.into(), &self.nsp, Some(id))?;
.build_packet_for_payload(data.into(), event.into(), &self.nsp, Some(id), true)?;

let ack = Ack {
id,
Expand All @@ -236,7 +236,57 @@ impl Client {
self.socket.send(socket_packet).await
}

async fn callback<P: Into<Payload>>(&self, event: &Event, payload: P) -> Result<()> {
/// Sends a answer message
///
/// # Example
/// ```
///
/// use futures_util::FutureExt;
///
/// #[tokio::main]
/// async fn main() {
/// use std::time::Duration;
/// use serde_json::json;
/// use tokio::time::sleep;
/// use rust_socketio::asynchronous::ClientBuilder;
/// use rust_socketio::Payload;
///
/// let mut socket = ClientBuilder::new("http://localhost:4200/")
/// .on("foo", |payload: Payload, _| async move { println!("Received: {:#?}", payload) }.boxed())
/// .connect()
/// .on_any(|event, payload, socket, id| async move {
/// socket.emit_answer(id, json!({"result" : true}));
/// }.boxed())
/// .await
/// .expect("connection failed");
///
///
/// sleep(Duration::from_secs(2));
/// }
/// ```
#[inline]
pub async fn emit_answer<D>(
&self,
id: Option<i32>,
data: D,
) -> Result<()>
where
D: Into<Payload>,
{
let id = match id {
None => {
return Err(Error::MissedPacketId());
}
Some(el) => el
};
let socket_packet =
self.socket
.build_packet_for_payload(data.into(), Event::Message, &self.nsp, Some(id), true)?;

self.socket.send(socket_packet).await
}

async fn callback<P: Into<Payload>>(&self, event: &Event, payload: P, id: Option<i32>) -> Result<()> {
let mut on = self.on.write().await;
let mut on_any = self.on_any.write().await;

Expand All @@ -245,14 +295,14 @@ impl Client {
let payload = payload.into();

if let Some(callback) = on_lock.get_mut(event) {
callback(payload.clone(), self.clone()).await;
callback(payload.clone(), self.clone(), id).await;
}

// Call on_any for all common and custom events.
match event {
Event::Message | Event::Custom(_) => {
if let Some(callback) = on_any_lock {
callback(event.clone(), payload, self.clone()).await;
callback(event.clone(), payload, self.clone(), id).await;
}
}
_ => (),
Expand All @@ -277,6 +327,7 @@ impl Client {
ack.callback.deref_mut()(
Payload::String(payload.to_owned()),
self.clone(),
socket_packet.id,
)
.await;
}
Expand All @@ -285,6 +336,7 @@ impl Client {
ack.callback.deref_mut()(
Payload::Binary(payload.to_owned()),
self.clone(),
socket_packet.id,
)
.await;
}
Expand Down Expand Up @@ -312,7 +364,7 @@ impl Client {

if let Some(attachments) = &packet.attachments {
if let Some(binary_payload) = attachments.get(0) {
self.callback(&event, Payload::Binary(binary_payload.to_owned()))
self.callback(&event, Payload::Binary(binary_payload.to_owned()), packet.id)
.await?;
}
}
Expand Down Expand Up @@ -355,7 +407,7 @@ impl Client {
};

// call the correct callback
self.callback(&event, data.to_string()).await?;
self.callback(&event, data.to_string(), packet.id).await?;
}

Ok(())
Expand All @@ -370,20 +422,20 @@ impl Client {
match packet.packet_type {
PacketId::Ack | PacketId::BinaryAck => {
if let Err(err) = self.handle_ack(packet).await {
self.callback(&Event::Error, err.to_string()).await?;
self.callback(&Event::Error, err.to_string(), packet.id).await?;
return Err(err);
}
}
PacketId::BinaryEvent => {
if let Err(err) = self.handle_binary_event(packet).await {
self.callback(&Event::Error, err.to_string()).await?;
self.callback(&Event::Error, err.to_string(), packet.id).await?;
}
}
PacketId::Connect => {
self.callback(&Event::Connect, "").await?;
self.callback(&Event::Connect, "", packet.id).await?;
}
PacketId::Disconnect => {
self.callback(&Event::Close, "").await?;
self.callback(&Event::Close, "", packet.id).await?;
}
PacketId::ConnectError => {
self.callback(
Expand All @@ -393,12 +445,13 @@ impl Client {
.data
.as_ref()
.unwrap_or(&String::from("\"No error message provided\"")),
packet.id
)
.await?;
}
PacketId::Event => {
if let Err(err) = self.handle_event(packet).await {
self.callback(&Event::Error, err.to_string()).await?;
self.callback(&Event::Error, err.to_string(), packet.id).await?;
}
}
}
Expand All @@ -425,7 +478,7 @@ impl Stream for Client {
}
Some(Err(err)) => {
// call the error callback
ready!(Box::pin(self.callback(&Event::Error, err.to_string())).poll_unpin(cx))?;
ready!(Box::pin(self.callback(&Event::Error, err.to_string(), None)).poll_unpin(cx))?;
return Poll::Ready(Some(Err(err)));
}
Some(Ok(packet)) => {
Expand Down
15 changes: 12 additions & 3 deletions socketio/src/asynchronous/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl Socket {
/// Emits to certain event with given data. The data needs to be JSON,
/// otherwise this returns an `InvalidJson` error.
pub async fn emit(&self, nsp: &str, event: Event, data: Payload) -> Result<()> {
let socket_packet = self.build_packet_for_payload(data, event, nsp, None)?;
let socket_packet = self.build_packet_for_payload(data, event, nsp, None, false)?;

self.send(socket_packet).await
}
Expand All @@ -98,6 +98,7 @@ impl Socket {
event: Event,
nsp: &'a str,
id: Option<i32>,
is_answer: bool,
) -> Result<Packet> {
match payload {
Payload::Binary(bin_data) => Ok(Packet::new(
Expand All @@ -115,10 +116,18 @@ impl Socket {
Payload::String(str_data) => {
serde_json::from_str::<serde_json::Value>(&str_data)?;

let payload = format!("[\"{}\",{}]", String::from(event), str_data);
let package_type;
let payload;
if is_answer {
payload = format!("[{}]", str_data);
package_type = PacketId::Ack;
} else {
payload = format!("[\"{}\",{}]", String::from(event), str_data);
package_type = PacketId::Event;
}

Ok(Packet::new(
PacketId::Event,
package_type,
nsp.to_owned(),
Some(payload),
id,
Expand Down
4 changes: 2 additions & 2 deletions socketio/src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl ClientBuilder {
#[allow(unused_mut)]
pub fn on<T: Into<Event>, F>(mut self, event: T, callback: F) -> Self
where
F: FnMut(Payload, RawClient) + 'static + Send,
F: FnMut(Payload, RawClient, Option<i32>) + 'static + Send,
{
let callback = Callback::<SocketCallback>::new(callback);
// SAFETY: Lock is held for such amount of time no code paths lead to a panic while lock is held
Expand Down Expand Up @@ -196,7 +196,7 @@ impl ClientBuilder {
#[allow(unused_mut)]
pub fn on_any<F>(mut self, callback: F) -> Self
where
F: FnMut(Event, Payload, RawClient) + 'static + Send,
F: FnMut(Event, Payload, RawClient, Option<i32>) + 'static + Send,
{
let callback = Some(Callback::<SocketAnyCallback>::new(callback));
// SAFETY: Lock is held for such amount of time no code paths lead to a panic while lock is held
Expand Down
12 changes: 6 additions & 6 deletions socketio/src/client/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use std::{
use super::RawClient;
use crate::{Event, Payload};

pub(crate) type SocketCallback = Box<dyn FnMut(Payload, RawClient) + 'static + Send>;
pub(crate) type SocketAnyCallback = Box<dyn FnMut(Event, Payload, RawClient) + 'static + Send>;
pub(crate) type SocketCallback = Box<dyn FnMut(Payload, RawClient, Option<i32>) + 'static + Send>;
pub(crate) type SocketAnyCallback = Box<dyn FnMut(Event, Payload, RawClient, Option<i32>) + 'static + Send>;

pub(crate) struct Callback<T> {
inner: T,
Expand All @@ -22,7 +22,7 @@ impl Debug for Callback<SocketCallback> {
}

impl Deref for Callback<SocketCallback> {
type Target = dyn FnMut(Payload, RawClient) + 'static + Send;
type Target = dyn FnMut(Payload, RawClient, Option<i32>) + 'static + Send;

fn deref(&self) -> &Self::Target {
self.inner.as_ref()
Expand All @@ -38,7 +38,7 @@ impl DerefMut for Callback<SocketCallback> {
impl Callback<SocketCallback> {
pub(crate) fn new<T>(callback: T) -> Self
where
T: FnMut(Payload, RawClient) + 'static + Send,
T: FnMut(Payload, RawClient, Option<i32>) + 'static + Send,
{
Callback {
inner: Box::new(callback),
Expand All @@ -55,7 +55,7 @@ impl Debug for Callback<SocketAnyCallback> {
}

impl Deref for Callback<SocketAnyCallback> {
type Target = dyn FnMut(Event, Payload, RawClient) + 'static + Send;
type Target = dyn FnMut(Event, Payload, RawClient, Option<i32>) + 'static + Send;

fn deref(&self) -> &Self::Target {
self.inner.as_ref()
Expand All @@ -71,7 +71,7 @@ impl DerefMut for Callback<SocketAnyCallback> {
impl Callback<SocketAnyCallback> {
pub(crate) fn new<T>(callback: T) -> Self
where
T: FnMut(Event, Payload, RawClient) + 'static + Send,
T: FnMut(Event, Payload, RawClient, Option<i32>) + 'static + Send,
{
Callback {
inner: Box::new(callback),
Expand Down
2 changes: 1 addition & 1 deletion socketio/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl Client {
callback: F,
) -> Result<()>
where
F: FnMut(Payload, RawClient) + 'static + Send,
F: FnMut(Payload, RawClient, Option<i32>) + 'static + Send,
E: Into<Event>,
D: Into<Payload>,
{
Expand Down
Loading