-
Notifications
You must be signed in to change notification settings - Fork 32
/
handler.rs
104 lines (88 loc) · 2.97 KB
/
handler.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
use crate::{ws, Client, Clients, Result};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use warp::{http::StatusCode, reply::json, ws::Message, Reply};
#[derive(Deserialize, Debug)]
pub struct RegisterRequest {
user_id: usize,
topic: String,
}
#[derive(Deserialize)]
pub struct TopicActionRequest {
topic: String,
client_id: String,
}
#[derive(Serialize, Debug)]
pub struct RegisterResponse {
url: String,
}
#[derive(Deserialize, Debug)]
pub struct Event {
topic: String,
user_id: Option<usize>,
message: String,
}
pub async fn publish_handler(body: Event, clients: Clients) -> Result<impl Reply> {
clients
.read()
.await
.iter()
.filter(|(_, client)| match body.user_id {
Some(v) => client.user_id == v,
None => true,
})
.filter(|(_, client)| client.topics.contains(&body.topic))
.for_each(|(_, client)| {
if let Some(sender) = &client.sender {
let _ = sender.send(Ok(Message::text(body.message.clone())));
}
});
Ok(StatusCode::OK)
}
pub async fn register_handler(body: RegisterRequest, clients: Clients) -> Result<impl Reply> {
let user_id = body.user_id;
let topic = body.topic; // Capture the entry topic
let uuid = Uuid::new_v4().as_simple().to_string();
register_client(uuid.clone(), user_id, topic, clients).await; // Pass the entry topic
Ok(json(&RegisterResponse {
url: format!("ws://127.0.0.1:8000/ws/{}", uuid),
}))
}
async fn register_client(id: String, user_id: usize, topic: String, clients: Clients) {
clients.write().await.insert(
id,
Client {
user_id,
topics: vec![topic],
sender: None,
},
);
}
pub async fn unregister_handler(id: String, clients: Clients) -> Result<impl Reply> {
clients.write().await.remove(&id);
Ok(StatusCode::OK)
}
pub async fn ws_handler(ws: warp::ws::Ws, id: String, clients: Clients) -> Result<impl Reply> {
let client = clients.read().await.get(&id).cloned();
match client {
Some(c) => Ok(ws.on_upgrade(move |socket| ws::client_connection(socket, id, clients, c))),
None => Err(warp::reject::not_found()),
}
}
pub async fn health_handler() -> Result<impl Reply> {
Ok(StatusCode::OK)
}
pub async fn add_topic(body: TopicActionRequest, clients: Clients) -> Result<impl Reply> {
let mut clients_write = clients.write().await;
if let Some(client) = clients_write.get_mut(&body.client_id) {
client.topics.push(body.topic);
}
Ok(warp::reply::with_status("Added topic successfully", StatusCode::OK))
}
pub async fn remove_topic(body: TopicActionRequest, clients: Clients) -> Result<impl Reply> {
let mut clients_write = clients.write().await;
if let Some(client) = clients_write.get_mut(&body.client_id) {
client.topics.retain(|t| t != &body.topic);
}
Ok(warp::reply::with_status("Removed topic successfully", StatusCode::OK))
}