mahjong_service/socket/
session.rs1use super::{
2 MahjongWebsocketServer, SocketMessageConnect, SocketMessageDisconnect, SocketMessageListRooms,
3 SocketMessageStr,
4};
5use actix::prelude::*;
6use actix_web_actors::ws;
7use mahjong_core::{GameId, PlayerId};
8use service_contracts::SocketMessage;
9use std::time::{Duration, Instant};
10use tracing::debug;
11
12const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
13const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
14
15pub type RoomId = String;
16pub type SessionId = usize;
17
18#[derive(Debug)]
19pub struct MahjongWebsocketSession {
20 pub addr: Addr<MahjongWebsocketServer>,
21 pub hb: Instant,
22 pub id: SessionId,
23 pub room: RoomId,
24}
25
26impl MahjongWebsocketSession {
27 pub fn get_room_id(game_id: &GameId, player_id: Option<&PlayerId>) -> RoomId {
28 if player_id.is_none() {
29 return game_id.to_string();
30 }
31
32 format!("{}__{}", game_id, player_id.unwrap())
33 }
34 fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
35 ctx.run_interval(HEARTBEAT_INTERVAL, |act, new_ctx| {
36 if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
37 act.addr.do_send(SocketMessageDisconnect { id: act.id });
38 new_ctx.stop();
39 return;
40 }
41
42 new_ctx.ping(b"");
43 });
44 }
45}
46
47impl Actor for MahjongWebsocketSession {
48 type Context = ws::WebsocketContext<Self>;
49
50 fn started(&mut self, ctx: &mut Self::Context) {
51 self.hb(ctx);
52
53 let addr = ctx.address();
54 let room = self.room.clone();
55
56 self.addr
57 .send(SocketMessageConnect {
58 room,
59 addr: addr.recipient(),
60 })
61 .into_actor(self)
62 .then(|res, act, new_ctx| {
63 match res {
64 Ok(res) => {
65 act.id = res;
66 debug!("{} joined room {}", act.id, act.room);
67 }
68 _ => new_ctx.stop(),
69 }
70 fut::ready(())
71 })
72 .wait(ctx);
73 }
74
75 fn stopping(&mut self, _: &mut Self::Context) -> Running {
76 debug!("{} disconnected from {}", self.id, self.room);
77 self.addr.do_send(SocketMessageDisconnect { id: self.id });
78 Running::Stop
79 }
80}
81
82impl Handler<SocketMessageStr> for MahjongWebsocketSession {
83 type Result = ();
84
85 fn handle(&mut self, msg: SocketMessageStr, ctx: &mut Self::Context) {
86 ctx.text(msg.0);
87 }
88}
89
90impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MahjongWebsocketSession {
91 fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
92 let msg = match msg {
93 Err(_) => {
94 ctx.stop();
95 return;
96 }
97 Ok(msg) => msg,
98 };
99
100 match msg {
101 ws::Message::Ping(msg) => {
102 self.hb = Instant::now();
103 ctx.pong(&msg);
104 }
105 ws::Message::Pong(_) => {
106 self.hb = Instant::now();
107 }
108 ws::Message::Text(text) => {
109 let message = serde_json::from_str::<SocketMessage>(&text);
110 if message.is_err() {
111 return;
112 }
113
114 if let Ok(SocketMessage::ListRooms) = message {
115 self.addr
116 .send(SocketMessageListRooms)
117 .into_actor(self)
118 .then(|res, _, new_ctx| {
119 if let Ok(rooms) = res {
120 for room in rooms {
121 let room =
122 serde_json::to_string(&SocketMessage::Name(room)).unwrap();
123
124 new_ctx.text(room);
125 }
126 }
127 fut::ready(())
128 })
129 .wait(ctx)
130 }
131 }
132 ws::Message::Close(reason) => {
133 ctx.close(reason);
134 ctx.stop();
135 }
136 ws::Message::Continuation(_) => {
137 ctx.stop();
138 }
139 _ => {}
140 }
141 }
142}