From d1f72075d4f011a311cfa5307d82ed059b28c20e Mon Sep 17 00:00:00 2001 From: GoodMan Date: Sat, 15 Apr 2023 22:06:51 +0400 Subject: [PATCH 1/2] done --- socketio/src/asynchronous/client/builder.rs | 4 +- socketio/src/asynchronous/client/callback.rs | 12 +-- socketio/src/asynchronous/client/client.rs | 79 ++++++++++++++++---- socketio/src/asynchronous/socket.rs | 15 +++- socketio/src/client/builder.rs | 4 +- socketio/src/client/callback.rs | 12 +-- socketio/src/client/client.rs | 2 +- socketio/src/client/raw_client.rs | 53 +++++++++---- socketio/src/error.rs | 2 + socketio/src/socket.rs | 26 +++++-- 10 files changed, 157 insertions(+), 52 deletions(-) diff --git a/socketio/src/asynchronous/client/builder.rs b/socketio/src/asynchronous/client/builder.rs index c25db585..99692b43 100644 --- a/socketio/src/asynchronous/client/builder.rs +++ b/socketio/src/asynchronous/client/builder.rs @@ -165,7 +165,7 @@ impl ClientBuilder { #[cfg(feature = "async-callbacks")] pub fn on, F>(mut self, event: T, callback: F) -> Self where - F: for<'a> std::ops::FnMut(Payload, Client) -> BoxFuture<'static, ()> + F: for<'a> std::ops::FnMut(Payload, Client, Option) -> BoxFuture<'static, ()> + 'static + Send + Sync, @@ -199,7 +199,7 @@ impl ClientBuilder { /// ``` pub fn on_any(mut self, callback: F) -> Self where - F: for<'a> FnMut(Event, Payload, Client) -> BoxFuture<'static, ()> + 'static + Send + Sync, + F: for<'a> FnMut(Event, Payload, Client, Option) -> BoxFuture<'static, ()> + 'static + Send + Sync, { self.on_any = Some(Callback::::new(callback)); self diff --git a/socketio/src/asynchronous/client/callback.rs b/socketio/src/asynchronous/client/callback.rs index 73506fbd..e468ee19 100644 --- a/socketio/src/asynchronous/client/callback.rs +++ b/socketio/src/asynchronous/client/callback.rs @@ -10,10 +10,10 @@ use super::client::Client; /// Internal type, provides a way to store futures and return them in a boxed manner. pub(crate) type DynAsyncCallback = - Box FnMut(Payload, Client) -> BoxFuture<'static, ()> + 'static + Send + Sync>; + Box FnMut(Payload, Client, Option) -> BoxFuture<'static, ()> + 'static + Send + Sync>; pub(crate) type DynAsyncAnyCallback = Box< - dyn for<'a> FnMut(Event, Payload, Client) -> BoxFuture<'static, ()> + 'static + Send + Sync, + dyn for<'a> FnMut(Event, Payload, Client, Option) -> BoxFuture<'static, ()> + 'static + Send + Sync, >; pub(crate) struct Callback { @@ -28,7 +28,7 @@ impl Debug for Callback { impl Deref for Callback { type Target = - dyn for<'a> FnMut(Payload, Client) -> BoxFuture<'static, ()> + 'static + Sync + Send; + dyn for<'a> FnMut(Payload, Client, Option) -> BoxFuture<'static, ()> + 'static + Sync + Send; fn deref(&self) -> &Self::Target { self.inner.as_ref() @@ -44,7 +44,7 @@ impl DerefMut for Callback { impl Callback { pub(crate) fn new(callback: T) -> Self where - T: for<'a> FnMut(Payload, Client) -> BoxFuture<'static, ()> + 'static + Sync + Send, + T: for<'a> FnMut(Payload, Client, Option) -> BoxFuture<'static, ()> + 'static + Sync + Send, { Callback { inner: Box::new(callback), @@ -54,7 +54,7 @@ impl Callback { impl Deref for Callback { type Target = - dyn for<'a> FnMut(Event, Payload, Client) -> BoxFuture<'static, ()> + 'static + Sync + Send; + dyn for<'a> FnMut(Event, Payload, Client, Option) -> BoxFuture<'static, ()> + 'static + Sync + Send; fn deref(&self) -> &Self::Target { self.inner.as_ref() @@ -70,7 +70,7 @@ impl DerefMut for Callback { impl Callback { pub(crate) fn new(callback: T) -> Self where - T: for<'a> FnMut(Event, Payload, Client) -> BoxFuture<'static, ()> + 'static + Sync + Send, + T: for<'a> FnMut(Event, Payload, Client, Option) -> BoxFuture<'static, ()> + 'static + Sync + Send, { Callback { inner: Box::new(callback), diff --git a/socketio/src/asynchronous/client/client.rs b/socketio/src/asynchronous/client/client.rs index a6c0f771..d2fd0af6 100644 --- a/socketio/src/asynchronous/client/client.rs +++ b/socketio/src/asynchronous/client/client.rs @@ -211,7 +211,7 @@ impl Client { callback: F, ) -> Result<()> where - F: for<'a> std::ops::FnMut(Payload, Client) -> BoxFuture<'static, ()> + F: for<'a> std::ops::FnMut(Payload, Client, Option) -> BoxFuture<'static, ()> + 'static + Send + Sync, @@ -221,7 +221,7 @@ impl Client { let id = thread_rng().gen_range(0..999); let socket_packet = self.socket - .build_packet_for_payload(data.into(), event.into(), &self.nsp, Some(id))?; + .build_packet_for_payload(data.into(), event.into(), &self.nsp, Some(id), true)?; let ack = Ack { id, @@ -236,7 +236,57 @@ impl Client { self.socket.send(socket_packet).await } - async fn callback>(&self, event: &Event, payload: P) -> Result<()> { + /// Sends a answer message + /// + /// # Example + /// ``` + /// + /// use futures_util::FutureExt; + /// + /// #[tokio::main] + /// async fn main() { + /// use std::time::Duration; + /// use serde_json::json; + /// use tokio::time::sleep; + /// use rust_socketio::asynchronous::ClientBuilder; + /// use rust_socketio::Payload; + /// + /// let mut socket = ClientBuilder::new("http://localhost:4200/") + /// .on("foo", |payload: Payload, _| async move { println!("Received: {:#?}", payload) }.boxed()) + /// .connect() + /// .on_any(|event, payload, socket, id| async move { + /// socket.emit_answer(id, json!({"result" : true})); + /// }.boxed()) + /// .await + /// .expect("connection failed"); + /// + /// + /// sleep(Duration::from_secs(2)); + /// } + /// ``` + #[inline] + pub async fn emit_answer( + &self, + id: Option, + data: D, + ) -> Result<()> + where + D: Into, + { + let id = match id { + None => { + return Err(Error::MissedPacketId()); + } + Some(el) => el + }; + let socket_packet = + self.socket + .build_packet_for_payload(data.into(), Event::Message, &self.nsp, Some(id), true)?; + + self.socket.send(socket_packet).await + } + + async fn callback>(&self, event: &Event, payload: P, id: Option) -> Result<()> { let mut on = self.on.write().await; let mut on_any = self.on_any.write().await; @@ -245,14 +295,14 @@ impl Client { let payload = payload.into(); if let Some(callback) = on_lock.get_mut(event) { - callback(payload.clone(), self.clone()).await; + callback(payload.clone(), self.clone(), id).await; } // Call on_any for all common and custom events. match event { Event::Message | Event::Custom(_) => { if let Some(callback) = on_any_lock { - callback(event.clone(), payload, self.clone()).await; + callback(event.clone(), payload, self.clone(), id).await; } } _ => (), @@ -277,6 +327,7 @@ impl Client { ack.callback.deref_mut()( Payload::String(payload.to_owned()), self.clone(), + socket_packet.id, ) .await; } @@ -285,6 +336,7 @@ impl Client { ack.callback.deref_mut()( Payload::Binary(payload.to_owned()), self.clone(), + socket_packet.id, ) .await; } @@ -312,7 +364,7 @@ impl Client { if let Some(attachments) = &packet.attachments { if let Some(binary_payload) = attachments.get(0) { - self.callback(&event, Payload::Binary(binary_payload.to_owned())) + self.callback(&event, Payload::Binary(binary_payload.to_owned()), packet.id) .await?; } } @@ -355,7 +407,7 @@ impl Client { }; // call the correct callback - self.callback(&event, data.to_string()).await?; + self.callback(&event, data.to_string(), packet.id).await?; } Ok(()) @@ -370,20 +422,20 @@ impl Client { match packet.packet_type { PacketId::Ack | PacketId::BinaryAck => { if let Err(err) = self.handle_ack(packet).await { - self.callback(&Event::Error, err.to_string()).await?; + self.callback(&Event::Error, err.to_string(), packet.id).await?; return Err(err); } } PacketId::BinaryEvent => { if let Err(err) = self.handle_binary_event(packet).await { - self.callback(&Event::Error, err.to_string()).await?; + self.callback(&Event::Error, err.to_string(), packet.id).await?; } } PacketId::Connect => { - self.callback(&Event::Connect, "").await?; + self.callback(&Event::Connect, "", packet.id).await?; } PacketId::Disconnect => { - self.callback(&Event::Close, "").await?; + self.callback(&Event::Close, "", packet.id).await?; } PacketId::ConnectError => { self.callback( @@ -393,12 +445,13 @@ impl Client { .data .as_ref() .unwrap_or(&String::from("\"No error message provided\"")), + packet.id ) .await?; } PacketId::Event => { if let Err(err) = self.handle_event(packet).await { - self.callback(&Event::Error, err.to_string()).await?; + self.callback(&Event::Error, err.to_string(), packet.id).await?; } } } @@ -425,7 +478,7 @@ impl Stream for Client { } Some(Err(err)) => { // call the error callback - ready!(Box::pin(self.callback(&Event::Error, err.to_string())).poll_unpin(cx))?; + ready!(Box::pin(self.callback(&Event::Error, err.to_string(), None)).poll_unpin(cx))?; return Poll::Ready(Some(Err(err))); } Some(Ok(packet)) => { diff --git a/socketio/src/asynchronous/socket.rs b/socketio/src/asynchronous/socket.rs index 7ab4c7f5..1a684199 100644 --- a/socketio/src/asynchronous/socket.rs +++ b/socketio/src/asynchronous/socket.rs @@ -84,7 +84,7 @@ impl Socket { /// Emits to certain event with given data. The data needs to be JSON, /// otherwise this returns an `InvalidJson` error. pub async fn emit(&self, nsp: &str, event: Event, data: Payload) -> Result<()> { - let socket_packet = self.build_packet_for_payload(data, event, nsp, None)?; + let socket_packet = self.build_packet_for_payload(data, event, nsp, None, false)?; self.send(socket_packet).await } @@ -98,6 +98,7 @@ impl Socket { event: Event, nsp: &'a str, id: Option, + is_answer: bool, ) -> Result { match payload { Payload::Binary(bin_data) => Ok(Packet::new( @@ -115,10 +116,18 @@ impl Socket { Payload::String(str_data) => { serde_json::from_str::(&str_data)?; - let payload = format!("[\"{}\",{}]", String::from(event), str_data); + let package_type; + let payload; + if is_answer { + payload = format!("[{}]", str_data); + package_type = PacketId::Ack; + } else { + payload = format!("[\"{}\",{}]", String::from(event), str_data); + package_type = PacketId::Event; + } Ok(Packet::new( - PacketId::Event, + package_type, nsp.to_owned(), Some(payload), id, diff --git a/socketio/src/client/builder.rs b/socketio/src/client/builder.rs index aa62d812..77b00c49 100644 --- a/socketio/src/client/builder.rs +++ b/socketio/src/client/builder.rs @@ -168,7 +168,7 @@ impl ClientBuilder { #[allow(unused_mut)] pub fn on, F>(mut self, event: T, callback: F) -> Self where - F: FnMut(Payload, RawClient) + 'static + Send, + F: FnMut(Payload, RawClient, Option) + 'static + Send, { let callback = Callback::::new(callback); // SAFETY: Lock is held for such amount of time no code paths lead to a panic while lock is held @@ -196,7 +196,7 @@ impl ClientBuilder { #[allow(unused_mut)] pub fn on_any(mut self, callback: F) -> Self where - F: FnMut(Event, Payload, RawClient) + 'static + Send, + F: FnMut(Event, Payload, RawClient, Option) + 'static + Send, { let callback = Some(Callback::::new(callback)); // SAFETY: Lock is held for such amount of time no code paths lead to a panic while lock is held diff --git a/socketio/src/client/callback.rs b/socketio/src/client/callback.rs index 1015ec03..d3073628 100644 --- a/socketio/src/client/callback.rs +++ b/socketio/src/client/callback.rs @@ -6,8 +6,8 @@ use std::{ use super::RawClient; use crate::{Event, Payload}; -pub(crate) type SocketCallback = Box; -pub(crate) type SocketAnyCallback = Box; +pub(crate) type SocketCallback = Box) + 'static + Send>; +pub(crate) type SocketAnyCallback = Box) + 'static + Send>; pub(crate) struct Callback { inner: T, @@ -22,7 +22,7 @@ impl Debug for Callback { } impl Deref for Callback { - type Target = dyn FnMut(Payload, RawClient) + 'static + Send; + type Target = dyn FnMut(Payload, RawClient, Option) + 'static + Send; fn deref(&self) -> &Self::Target { self.inner.as_ref() @@ -38,7 +38,7 @@ impl DerefMut for Callback { impl Callback { pub(crate) fn new(callback: T) -> Self where - T: FnMut(Payload, RawClient) + 'static + Send, + T: FnMut(Payload, RawClient, Option) + 'static + Send, { Callback { inner: Box::new(callback), @@ -55,7 +55,7 @@ impl Debug for Callback { } impl Deref for Callback { - type Target = dyn FnMut(Event, Payload, RawClient) + 'static + Send; + type Target = dyn FnMut(Event, Payload, RawClient, Option) + 'static + Send; fn deref(&self) -> &Self::Target { self.inner.as_ref() @@ -71,7 +71,7 @@ impl DerefMut for Callback { impl Callback { pub(crate) fn new(callback: T) -> Self where - T: FnMut(Event, Payload, RawClient) + 'static + Send, + T: FnMut(Event, Payload, RawClient, Option) + 'static + Send, { Callback { inner: Box::new(callback), diff --git a/socketio/src/client/client.rs b/socketio/src/client/client.rs index 7d9b699e..140be8e5 100644 --- a/socketio/src/client/client.rs +++ b/socketio/src/client/client.rs @@ -117,7 +117,7 @@ impl Client { callback: F, ) -> Result<()> where - F: FnMut(Payload, RawClient) + 'static + Send, + F: FnMut(Payload, RawClient, Option) + 'static + Send, E: Into, D: Into, { diff --git a/socketio/src/client/raw_client.rs b/socketio/src/client/raw_client.rs index 443213bb..c3c54871 100644 --- a/socketio/src/client/raw_client.rs +++ b/socketio/src/client/raw_client.rs @@ -10,6 +10,7 @@ use std::ops::DerefMut; use std::sync::{Arc, Mutex}; use std::time::Duration; use std::time::Instant; +use crate::Error; use crate::socket::Socket as InnerSocket; @@ -147,7 +148,7 @@ impl RawClient { let _ = self.socket.send(disconnect_packet); self.socket.disconnect()?; - let _ = self.callback(&Event::Close, ""); // trigger on_close + let _ = self.callback(&Event::Close, "", None); // trigger on_close Ok(()) } @@ -195,14 +196,14 @@ impl RawClient { callback: F, ) -> Result<()> where - F: FnMut(Payload, RawClient) + 'static + Send, + F: FnMut(Payload, RawClient, Option) + 'static + Send, E: Into, D: Into, { let id = thread_rng().gen_range(0..999); let socket_packet = self.socket - .build_packet_for_payload(data.into(), event.into(), &self.nsp, Some(id))?; + .build_packet_for_payload(data.into(), event.into(), &self.nsp, Some(id), false)?; let ack = Ack { id, @@ -218,11 +219,33 @@ impl RawClient { Ok(()) } + pub fn emit_answer( + &self, + id: Option, + data: D, + ) -> Result<()> + where + D: Into, + { + let id = match id { + None => { + return Err(Error::MissedPacketId()); + } + Some(el) => el + }; + let socket_packet = + self.socket + .build_packet_for_payload(data.into(), Event::Message, &self.nsp, Some(id), true)?; + + self.socket.send(socket_packet)?; + Ok(()) + } + pub(crate) fn poll(&self) -> Result> { loop { match self.socket.poll() { Err(err) => { - self.callback(&Event::Error, err.to_string())?; + self.callback(&Event::Error, err.to_string(), None)?; return Err(err); } Ok(Some(packet)) => { @@ -243,7 +266,7 @@ impl RawClient { Iter { socket: self } } - fn callback>(&self, event: &Event, payload: P) -> Result<()> { + fn callback>(&self, event: &Event, payload: P, id: Option) -> Result<()> { let mut on = self.on.lock()?; let mut on_any = self.on_any.lock()?; let lock = on.deref_mut(); @@ -252,12 +275,12 @@ impl RawClient { let payload = payload.into(); if let Some(callback) = lock.get_mut(event) { - callback(payload.clone(), self.clone()); + callback(payload.clone(), self.clone(), id); } match event { Event::Message | Event::Custom(_) => { if let Some(callback) = on_any_lock { - callback(event.clone(), payload, self.clone()) + callback(event.clone(), payload, self.clone(), id) } } _ => {} @@ -281,6 +304,7 @@ impl RawClient { ack.callback.deref_mut()( Payload::String(payload.to_owned()), self.clone(), + None ); } if let Some(ref attachments) = socket_packet.attachments { @@ -288,6 +312,7 @@ impl RawClient { ack.callback.deref_mut()( Payload::Binary(payload.to_owned()), self.clone(), + None ); } } @@ -314,7 +339,7 @@ impl RawClient { if let Some(attachments) = &packet.attachments { if let Some(binary_payload) = attachments.get(0) { - self.callback(&event, Payload::Binary(binary_payload.to_owned()))?; + self.callback(&event, Payload::Binary(binary_payload.to_owned()), packet.id)?; } } Ok(()) @@ -348,6 +373,7 @@ impl RawClient { .get(1) .unwrap_or_else(|| contents.get(0).unwrap()) .to_string(), + packet.id )?; } } @@ -363,20 +389,20 @@ impl RawClient { match packet.packet_type { PacketId::Ack | PacketId::BinaryAck => { if let Err(err) = self.handle_ack(packet) { - self.callback(&Event::Error, err.to_string())?; + self.callback(&Event::Error, err.to_string(), packet.id)?; return Err(err); } } PacketId::BinaryEvent => { if let Err(err) = self.handle_binary_event(packet) { - self.callback(&Event::Error, err.to_string())?; + self.callback(&Event::Error, err.to_string(), packet.id)?; } } PacketId::Connect => { - self.callback(&Event::Connect, "")?; + self.callback(&Event::Connect, "", packet.id)?; } PacketId::Disconnect => { - self.callback(&Event::Close, "")?; + self.callback(&Event::Close, "", packet.id)?; } PacketId::ConnectError => { self.callback( @@ -386,11 +412,12 @@ impl RawClient { .clone() .data .unwrap_or_else(|| String::from("\"No error message provided\"")), + packet.id )?; } PacketId::Event => { if let Err(err) = self.handle_event(packet) { - self.callback(&Event::Error, err.to_string())?; + self.callback(&Event::Error, err.to_string(), packet.id)?; } } } diff --git a/socketio/src/error.rs b/socketio/src/error.rs index cc25d897..ad4431df 100644 --- a/socketio/src/error.rs +++ b/socketio/src/error.rs @@ -14,6 +14,8 @@ use url::ParseError as UrlParseError; pub enum Error { // Conform to https://rust-lang.github.io/api-guidelines/naming.html#names-use-a-consistent-word-order-c-word-order // Negative verb-object + #[error("Missed packet id")] + MissedPacketId(), #[error("Invalid packet id: {0}")] InvalidPacketId(char), #[error("Error while parsing an incomplete packet")] diff --git a/socketio/src/socket.rs b/socketio/src/socket.rs index a0cf8550..66a551c1 100644 --- a/socketio/src/socket.rs +++ b/socketio/src/socket.rs @@ -73,7 +73,7 @@ impl Socket { /// Emits to certain event with given data. The data needs to be JSON, /// otherwise this returns an `InvalidJson` error. pub fn emit(&self, nsp: &str, event: Event, data: Payload) -> Result<()> { - let socket_packet = self.build_packet_for_payload(data, event, nsp, None)?; + let socket_packet = self.build_packet_for_payload(data, event, nsp, None, false)?; self.send(socket_packet) } @@ -87,6 +87,7 @@ impl Socket { event: Event, nsp: &'a str, id: Option, + is_answer: bool, ) -> Result { match payload { Payload::Binary(bin_data) => Ok(Packet::new( @@ -102,14 +103,27 @@ impl Socket { Some(vec![bin_data]), )), Payload::String(str_data) => { - let payload = if serde_json::from_str::(&str_data).is_ok() { - format!("[\"{}\",{}]", String::from(event), str_data) + let package_type; + let payload; + + if is_answer { + payload = if serde_json::from_str::(&str_data).is_ok() { + format!("[{}]", str_data) + } else { + format!("[\"{}\"]", str_data) + }; + package_type = PacketId::Ack; } else { - format!("[\"{}\",\"{}\"]", String::from(event), str_data) - }; + payload = if serde_json::from_str::(&str_data).is_ok() { + format!("[\"{}\",{}]", String::from(event), str_data) + } else { + format!("[\"{}\",\"{}\"]", String::from(event), str_data) + }; + package_type = PacketId::Event; + } Ok(Packet::new( - PacketId::Event, + package_type, nsp.to_owned(), Some(payload), id, From e0d4543eeb0b666a26bb4b62314f51eb7b0f37b9 Mon Sep 17 00:00:00 2001 From: GoodMan Date: Sat, 15 Apr 2023 22:06:51 +0400 Subject: [PATCH 2/2] implemented emit answer to socket server --- socketio/src/asynchronous/client/builder.rs | 4 +- socketio/src/asynchronous/client/callback.rs | 12 +-- socketio/src/asynchronous/client/client.rs | 79 ++++++++++++++++---- socketio/src/asynchronous/socket.rs | 15 +++- socketio/src/client/builder.rs | 4 +- socketio/src/client/callback.rs | 12 +-- socketio/src/client/client.rs | 2 +- socketio/src/client/raw_client.rs | 53 +++++++++---- socketio/src/error.rs | 2 + socketio/src/socket.rs | 26 +++++-- 10 files changed, 157 insertions(+), 52 deletions(-) diff --git a/socketio/src/asynchronous/client/builder.rs b/socketio/src/asynchronous/client/builder.rs index c25db585..99692b43 100644 --- a/socketio/src/asynchronous/client/builder.rs +++ b/socketio/src/asynchronous/client/builder.rs @@ -165,7 +165,7 @@ impl ClientBuilder { #[cfg(feature = "async-callbacks")] pub fn on, F>(mut self, event: T, callback: F) -> Self where - F: for<'a> std::ops::FnMut(Payload, Client) -> BoxFuture<'static, ()> + F: for<'a> std::ops::FnMut(Payload, Client, Option) -> BoxFuture<'static, ()> + 'static + Send + Sync, @@ -199,7 +199,7 @@ impl ClientBuilder { /// ``` pub fn on_any(mut self, callback: F) -> Self where - F: for<'a> FnMut(Event, Payload, Client) -> BoxFuture<'static, ()> + 'static + Send + Sync, + F: for<'a> FnMut(Event, Payload, Client, Option) -> BoxFuture<'static, ()> + 'static + Send + Sync, { self.on_any = Some(Callback::::new(callback)); self diff --git a/socketio/src/asynchronous/client/callback.rs b/socketio/src/asynchronous/client/callback.rs index 73506fbd..e468ee19 100644 --- a/socketio/src/asynchronous/client/callback.rs +++ b/socketio/src/asynchronous/client/callback.rs @@ -10,10 +10,10 @@ use super::client::Client; /// Internal type, provides a way to store futures and return them in a boxed manner. pub(crate) type DynAsyncCallback = - Box FnMut(Payload, Client) -> BoxFuture<'static, ()> + 'static + Send + Sync>; + Box FnMut(Payload, Client, Option) -> BoxFuture<'static, ()> + 'static + Send + Sync>; pub(crate) type DynAsyncAnyCallback = Box< - dyn for<'a> FnMut(Event, Payload, Client) -> BoxFuture<'static, ()> + 'static + Send + Sync, + dyn for<'a> FnMut(Event, Payload, Client, Option) -> BoxFuture<'static, ()> + 'static + Send + Sync, >; pub(crate) struct Callback { @@ -28,7 +28,7 @@ impl Debug for Callback { impl Deref for Callback { type Target = - dyn for<'a> FnMut(Payload, Client) -> BoxFuture<'static, ()> + 'static + Sync + Send; + dyn for<'a> FnMut(Payload, Client, Option) -> BoxFuture<'static, ()> + 'static + Sync + Send; fn deref(&self) -> &Self::Target { self.inner.as_ref() @@ -44,7 +44,7 @@ impl DerefMut for Callback { impl Callback { pub(crate) fn new(callback: T) -> Self where - T: for<'a> FnMut(Payload, Client) -> BoxFuture<'static, ()> + 'static + Sync + Send, + T: for<'a> FnMut(Payload, Client, Option) -> BoxFuture<'static, ()> + 'static + Sync + Send, { Callback { inner: Box::new(callback), @@ -54,7 +54,7 @@ impl Callback { impl Deref for Callback { type Target = - dyn for<'a> FnMut(Event, Payload, Client) -> BoxFuture<'static, ()> + 'static + Sync + Send; + dyn for<'a> FnMut(Event, Payload, Client, Option) -> BoxFuture<'static, ()> + 'static + Sync + Send; fn deref(&self) -> &Self::Target { self.inner.as_ref() @@ -70,7 +70,7 @@ impl DerefMut for Callback { impl Callback { pub(crate) fn new(callback: T) -> Self where - T: for<'a> FnMut(Event, Payload, Client) -> BoxFuture<'static, ()> + 'static + Sync + Send, + T: for<'a> FnMut(Event, Payload, Client, Option) -> BoxFuture<'static, ()> + 'static + Sync + Send, { Callback { inner: Box::new(callback), diff --git a/socketio/src/asynchronous/client/client.rs b/socketio/src/asynchronous/client/client.rs index a6c0f771..d2fd0af6 100644 --- a/socketio/src/asynchronous/client/client.rs +++ b/socketio/src/asynchronous/client/client.rs @@ -211,7 +211,7 @@ impl Client { callback: F, ) -> Result<()> where - F: for<'a> std::ops::FnMut(Payload, Client) -> BoxFuture<'static, ()> + F: for<'a> std::ops::FnMut(Payload, Client, Option) -> BoxFuture<'static, ()> + 'static + Send + Sync, @@ -221,7 +221,7 @@ impl Client { let id = thread_rng().gen_range(0..999); let socket_packet = self.socket - .build_packet_for_payload(data.into(), event.into(), &self.nsp, Some(id))?; + .build_packet_for_payload(data.into(), event.into(), &self.nsp, Some(id), true)?; let ack = Ack { id, @@ -236,7 +236,57 @@ impl Client { self.socket.send(socket_packet).await } - async fn callback>(&self, event: &Event, payload: P) -> Result<()> { + /// Sends a answer message + /// + /// # Example + /// ``` + /// + /// use futures_util::FutureExt; + /// + /// #[tokio::main] + /// async fn main() { + /// use std::time::Duration; + /// use serde_json::json; + /// use tokio::time::sleep; + /// use rust_socketio::asynchronous::ClientBuilder; + /// use rust_socketio::Payload; + /// + /// let mut socket = ClientBuilder::new("http://localhost:4200/") + /// .on("foo", |payload: Payload, _| async move { println!("Received: {:#?}", payload) }.boxed()) + /// .connect() + /// .on_any(|event, payload, socket, id| async move { + /// socket.emit_answer(id, json!({"result" : true})); + /// }.boxed()) + /// .await + /// .expect("connection failed"); + /// + /// + /// sleep(Duration::from_secs(2)); + /// } + /// ``` + #[inline] + pub async fn emit_answer( + &self, + id: Option, + data: D, + ) -> Result<()> + where + D: Into, + { + let id = match id { + None => { + return Err(Error::MissedPacketId()); + } + Some(el) => el + }; + let socket_packet = + self.socket + .build_packet_for_payload(data.into(), Event::Message, &self.nsp, Some(id), true)?; + + self.socket.send(socket_packet).await + } + + async fn callback>(&self, event: &Event, payload: P, id: Option) -> Result<()> { let mut on = self.on.write().await; let mut on_any = self.on_any.write().await; @@ -245,14 +295,14 @@ impl Client { let payload = payload.into(); if let Some(callback) = on_lock.get_mut(event) { - callback(payload.clone(), self.clone()).await; + callback(payload.clone(), self.clone(), id).await; } // Call on_any for all common and custom events. match event { Event::Message | Event::Custom(_) => { if let Some(callback) = on_any_lock { - callback(event.clone(), payload, self.clone()).await; + callback(event.clone(), payload, self.clone(), id).await; } } _ => (), @@ -277,6 +327,7 @@ impl Client { ack.callback.deref_mut()( Payload::String(payload.to_owned()), self.clone(), + socket_packet.id, ) .await; } @@ -285,6 +336,7 @@ impl Client { ack.callback.deref_mut()( Payload::Binary(payload.to_owned()), self.clone(), + socket_packet.id, ) .await; } @@ -312,7 +364,7 @@ impl Client { if let Some(attachments) = &packet.attachments { if let Some(binary_payload) = attachments.get(0) { - self.callback(&event, Payload::Binary(binary_payload.to_owned())) + self.callback(&event, Payload::Binary(binary_payload.to_owned()), packet.id) .await?; } } @@ -355,7 +407,7 @@ impl Client { }; // call the correct callback - self.callback(&event, data.to_string()).await?; + self.callback(&event, data.to_string(), packet.id).await?; } Ok(()) @@ -370,20 +422,20 @@ impl Client { match packet.packet_type { PacketId::Ack | PacketId::BinaryAck => { if let Err(err) = self.handle_ack(packet).await { - self.callback(&Event::Error, err.to_string()).await?; + self.callback(&Event::Error, err.to_string(), packet.id).await?; return Err(err); } } PacketId::BinaryEvent => { if let Err(err) = self.handle_binary_event(packet).await { - self.callback(&Event::Error, err.to_string()).await?; + self.callback(&Event::Error, err.to_string(), packet.id).await?; } } PacketId::Connect => { - self.callback(&Event::Connect, "").await?; + self.callback(&Event::Connect, "", packet.id).await?; } PacketId::Disconnect => { - self.callback(&Event::Close, "").await?; + self.callback(&Event::Close, "", packet.id).await?; } PacketId::ConnectError => { self.callback( @@ -393,12 +445,13 @@ impl Client { .data .as_ref() .unwrap_or(&String::from("\"No error message provided\"")), + packet.id ) .await?; } PacketId::Event => { if let Err(err) = self.handle_event(packet).await { - self.callback(&Event::Error, err.to_string()).await?; + self.callback(&Event::Error, err.to_string(), packet.id).await?; } } } @@ -425,7 +478,7 @@ impl Stream for Client { } Some(Err(err)) => { // call the error callback - ready!(Box::pin(self.callback(&Event::Error, err.to_string())).poll_unpin(cx))?; + ready!(Box::pin(self.callback(&Event::Error, err.to_string(), None)).poll_unpin(cx))?; return Poll::Ready(Some(Err(err))); } Some(Ok(packet)) => { diff --git a/socketio/src/asynchronous/socket.rs b/socketio/src/asynchronous/socket.rs index 7ab4c7f5..1a684199 100644 --- a/socketio/src/asynchronous/socket.rs +++ b/socketio/src/asynchronous/socket.rs @@ -84,7 +84,7 @@ impl Socket { /// Emits to certain event with given data. The data needs to be JSON, /// otherwise this returns an `InvalidJson` error. pub async fn emit(&self, nsp: &str, event: Event, data: Payload) -> Result<()> { - let socket_packet = self.build_packet_for_payload(data, event, nsp, None)?; + let socket_packet = self.build_packet_for_payload(data, event, nsp, None, false)?; self.send(socket_packet).await } @@ -98,6 +98,7 @@ impl Socket { event: Event, nsp: &'a str, id: Option, + is_answer: bool, ) -> Result { match payload { Payload::Binary(bin_data) => Ok(Packet::new( @@ -115,10 +116,18 @@ impl Socket { Payload::String(str_data) => { serde_json::from_str::(&str_data)?; - let payload = format!("[\"{}\",{}]", String::from(event), str_data); + let package_type; + let payload; + if is_answer { + payload = format!("[{}]", str_data); + package_type = PacketId::Ack; + } else { + payload = format!("[\"{}\",{}]", String::from(event), str_data); + package_type = PacketId::Event; + } Ok(Packet::new( - PacketId::Event, + package_type, nsp.to_owned(), Some(payload), id, diff --git a/socketio/src/client/builder.rs b/socketio/src/client/builder.rs index aa62d812..77b00c49 100644 --- a/socketio/src/client/builder.rs +++ b/socketio/src/client/builder.rs @@ -168,7 +168,7 @@ impl ClientBuilder { #[allow(unused_mut)] pub fn on, F>(mut self, event: T, callback: F) -> Self where - F: FnMut(Payload, RawClient) + 'static + Send, + F: FnMut(Payload, RawClient, Option) + 'static + Send, { let callback = Callback::::new(callback); // SAFETY: Lock is held for such amount of time no code paths lead to a panic while lock is held @@ -196,7 +196,7 @@ impl ClientBuilder { #[allow(unused_mut)] pub fn on_any(mut self, callback: F) -> Self where - F: FnMut(Event, Payload, RawClient) + 'static + Send, + F: FnMut(Event, Payload, RawClient, Option) + 'static + Send, { let callback = Some(Callback::::new(callback)); // SAFETY: Lock is held for such amount of time no code paths lead to a panic while lock is held diff --git a/socketio/src/client/callback.rs b/socketio/src/client/callback.rs index 1015ec03..d3073628 100644 --- a/socketio/src/client/callback.rs +++ b/socketio/src/client/callback.rs @@ -6,8 +6,8 @@ use std::{ use super::RawClient; use crate::{Event, Payload}; -pub(crate) type SocketCallback = Box; -pub(crate) type SocketAnyCallback = Box; +pub(crate) type SocketCallback = Box) + 'static + Send>; +pub(crate) type SocketAnyCallback = Box) + 'static + Send>; pub(crate) struct Callback { inner: T, @@ -22,7 +22,7 @@ impl Debug for Callback { } impl Deref for Callback { - type Target = dyn FnMut(Payload, RawClient) + 'static + Send; + type Target = dyn FnMut(Payload, RawClient, Option) + 'static + Send; fn deref(&self) -> &Self::Target { self.inner.as_ref() @@ -38,7 +38,7 @@ impl DerefMut for Callback { impl Callback { pub(crate) fn new(callback: T) -> Self where - T: FnMut(Payload, RawClient) + 'static + Send, + T: FnMut(Payload, RawClient, Option) + 'static + Send, { Callback { inner: Box::new(callback), @@ -55,7 +55,7 @@ impl Debug for Callback { } impl Deref for Callback { - type Target = dyn FnMut(Event, Payload, RawClient) + 'static + Send; + type Target = dyn FnMut(Event, Payload, RawClient, Option) + 'static + Send; fn deref(&self) -> &Self::Target { self.inner.as_ref() @@ -71,7 +71,7 @@ impl DerefMut for Callback { impl Callback { pub(crate) fn new(callback: T) -> Self where - T: FnMut(Event, Payload, RawClient) + 'static + Send, + T: FnMut(Event, Payload, RawClient, Option) + 'static + Send, { Callback { inner: Box::new(callback), diff --git a/socketio/src/client/client.rs b/socketio/src/client/client.rs index 7d9b699e..140be8e5 100644 --- a/socketio/src/client/client.rs +++ b/socketio/src/client/client.rs @@ -117,7 +117,7 @@ impl Client { callback: F, ) -> Result<()> where - F: FnMut(Payload, RawClient) + 'static + Send, + F: FnMut(Payload, RawClient, Option) + 'static + Send, E: Into, D: Into, { diff --git a/socketio/src/client/raw_client.rs b/socketio/src/client/raw_client.rs index 443213bb..c3c54871 100644 --- a/socketio/src/client/raw_client.rs +++ b/socketio/src/client/raw_client.rs @@ -10,6 +10,7 @@ use std::ops::DerefMut; use std::sync::{Arc, Mutex}; use std::time::Duration; use std::time::Instant; +use crate::Error; use crate::socket::Socket as InnerSocket; @@ -147,7 +148,7 @@ impl RawClient { let _ = self.socket.send(disconnect_packet); self.socket.disconnect()?; - let _ = self.callback(&Event::Close, ""); // trigger on_close + let _ = self.callback(&Event::Close, "", None); // trigger on_close Ok(()) } @@ -195,14 +196,14 @@ impl RawClient { callback: F, ) -> Result<()> where - F: FnMut(Payload, RawClient) + 'static + Send, + F: FnMut(Payload, RawClient, Option) + 'static + Send, E: Into, D: Into, { let id = thread_rng().gen_range(0..999); let socket_packet = self.socket - .build_packet_for_payload(data.into(), event.into(), &self.nsp, Some(id))?; + .build_packet_for_payload(data.into(), event.into(), &self.nsp, Some(id), false)?; let ack = Ack { id, @@ -218,11 +219,33 @@ impl RawClient { Ok(()) } + pub fn emit_answer( + &self, + id: Option, + data: D, + ) -> Result<()> + where + D: Into, + { + let id = match id { + None => { + return Err(Error::MissedPacketId()); + } + Some(el) => el + }; + let socket_packet = + self.socket + .build_packet_for_payload(data.into(), Event::Message, &self.nsp, Some(id), true)?; + + self.socket.send(socket_packet)?; + Ok(()) + } + pub(crate) fn poll(&self) -> Result> { loop { match self.socket.poll() { Err(err) => { - self.callback(&Event::Error, err.to_string())?; + self.callback(&Event::Error, err.to_string(), None)?; return Err(err); } Ok(Some(packet)) => { @@ -243,7 +266,7 @@ impl RawClient { Iter { socket: self } } - fn callback>(&self, event: &Event, payload: P) -> Result<()> { + fn callback>(&self, event: &Event, payload: P, id: Option) -> Result<()> { let mut on = self.on.lock()?; let mut on_any = self.on_any.lock()?; let lock = on.deref_mut(); @@ -252,12 +275,12 @@ impl RawClient { let payload = payload.into(); if let Some(callback) = lock.get_mut(event) { - callback(payload.clone(), self.clone()); + callback(payload.clone(), self.clone(), id); } match event { Event::Message | Event::Custom(_) => { if let Some(callback) = on_any_lock { - callback(event.clone(), payload, self.clone()) + callback(event.clone(), payload, self.clone(), id) } } _ => {} @@ -281,6 +304,7 @@ impl RawClient { ack.callback.deref_mut()( Payload::String(payload.to_owned()), self.clone(), + None ); } if let Some(ref attachments) = socket_packet.attachments { @@ -288,6 +312,7 @@ impl RawClient { ack.callback.deref_mut()( Payload::Binary(payload.to_owned()), self.clone(), + None ); } } @@ -314,7 +339,7 @@ impl RawClient { if let Some(attachments) = &packet.attachments { if let Some(binary_payload) = attachments.get(0) { - self.callback(&event, Payload::Binary(binary_payload.to_owned()))?; + self.callback(&event, Payload::Binary(binary_payload.to_owned()), packet.id)?; } } Ok(()) @@ -348,6 +373,7 @@ impl RawClient { .get(1) .unwrap_or_else(|| contents.get(0).unwrap()) .to_string(), + packet.id )?; } } @@ -363,20 +389,20 @@ impl RawClient { match packet.packet_type { PacketId::Ack | PacketId::BinaryAck => { if let Err(err) = self.handle_ack(packet) { - self.callback(&Event::Error, err.to_string())?; + self.callback(&Event::Error, err.to_string(), packet.id)?; return Err(err); } } PacketId::BinaryEvent => { if let Err(err) = self.handle_binary_event(packet) { - self.callback(&Event::Error, err.to_string())?; + self.callback(&Event::Error, err.to_string(), packet.id)?; } } PacketId::Connect => { - self.callback(&Event::Connect, "")?; + self.callback(&Event::Connect, "", packet.id)?; } PacketId::Disconnect => { - self.callback(&Event::Close, "")?; + self.callback(&Event::Close, "", packet.id)?; } PacketId::ConnectError => { self.callback( @@ -386,11 +412,12 @@ impl RawClient { .clone() .data .unwrap_or_else(|| String::from("\"No error message provided\"")), + packet.id )?; } PacketId::Event => { if let Err(err) = self.handle_event(packet) { - self.callback(&Event::Error, err.to_string())?; + self.callback(&Event::Error, err.to_string(), packet.id)?; } } } diff --git a/socketio/src/error.rs b/socketio/src/error.rs index cc25d897..ad4431df 100644 --- a/socketio/src/error.rs +++ b/socketio/src/error.rs @@ -14,6 +14,8 @@ use url::ParseError as UrlParseError; pub enum Error { // Conform to https://rust-lang.github.io/api-guidelines/naming.html#names-use-a-consistent-word-order-c-word-order // Negative verb-object + #[error("Missed packet id")] + MissedPacketId(), #[error("Invalid packet id: {0}")] InvalidPacketId(char), #[error("Error while parsing an incomplete packet")] diff --git a/socketio/src/socket.rs b/socketio/src/socket.rs index a0cf8550..66a551c1 100644 --- a/socketio/src/socket.rs +++ b/socketio/src/socket.rs @@ -73,7 +73,7 @@ impl Socket { /// Emits to certain event with given data. The data needs to be JSON, /// otherwise this returns an `InvalidJson` error. pub fn emit(&self, nsp: &str, event: Event, data: Payload) -> Result<()> { - let socket_packet = self.build_packet_for_payload(data, event, nsp, None)?; + let socket_packet = self.build_packet_for_payload(data, event, nsp, None, false)?; self.send(socket_packet) } @@ -87,6 +87,7 @@ impl Socket { event: Event, nsp: &'a str, id: Option, + is_answer: bool, ) -> Result { match payload { Payload::Binary(bin_data) => Ok(Packet::new( @@ -102,14 +103,27 @@ impl Socket { Some(vec![bin_data]), )), Payload::String(str_data) => { - let payload = if serde_json::from_str::(&str_data).is_ok() { - format!("[\"{}\",{}]", String::from(event), str_data) + let package_type; + let payload; + + if is_answer { + payload = if serde_json::from_str::(&str_data).is_ok() { + format!("[{}]", str_data) + } else { + format!("[\"{}\"]", str_data) + }; + package_type = PacketId::Ack; } else { - format!("[\"{}\",\"{}\"]", String::from(event), str_data) - }; + payload = if serde_json::from_str::(&str_data).is_ok() { + format!("[\"{}\",{}]", String::from(event), str_data) + } else { + format!("[\"{}\",\"{}\"]", String::from(event), str_data) + }; + package_type = PacketId::Event; + } Ok(Packet::new( - PacketId::Event, + package_type, nsp.to_owned(), Some(payload), id,