mahjong_service/socket/
session.rs

1use super::{
2    MahjongWebsocketServer, SocketMessageConnect, SocketMessageDisconnect, SocketMessageListRooms,
3    SocketMessageStr,
4};
5use actix::prelude::*;
6use actix_web_actors::ws;
7use mahjong_core::{GameId, PlayerId};
8use service_contracts::SocketMessage;
9use std::time::{Duration, Instant};
10use tracing::debug;
11
12const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
13const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
14
15pub type RoomId = String;
16pub type SessionId = usize;
17
18#[derive(Debug)]
19pub struct MahjongWebsocketSession {
20    pub addr: Addr<MahjongWebsocketServer>,
21    pub hb: Instant,
22    pub id: SessionId,
23    pub room: RoomId,
24}
25
26impl MahjongWebsocketSession {
27    pub fn get_room_id(game_id: &GameId, player_id: Option<&PlayerId>) -> RoomId {
28        if player_id.is_none() {
29            return game_id.to_string();
30        }
31
32        format!("{}__{}", game_id, player_id.unwrap())
33    }
34    fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
35        ctx.run_interval(HEARTBEAT_INTERVAL, |act, new_ctx| {
36            if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
37                act.addr.do_send(SocketMessageDisconnect { id: act.id });
38                new_ctx.stop();
39                return;
40            }
41
42            new_ctx.ping(b"");
43        });
44    }
45}
46
47impl Actor for MahjongWebsocketSession {
48    type Context = ws::WebsocketContext<Self>;
49
50    fn started(&mut self, ctx: &mut Self::Context) {
51        self.hb(ctx);
52
53        let addr = ctx.address();
54        let room = self.room.clone();
55
56        self.addr
57            .send(SocketMessageConnect {
58                room,
59                addr: addr.recipient(),
60            })
61            .into_actor(self)
62            .then(|res, act, new_ctx| {
63                match res {
64                    Ok(res) => {
65                        act.id = res;
66                        debug!("{} joined room {}", act.id, act.room);
67                    }
68                    _ => new_ctx.stop(),
69                }
70                fut::ready(())
71            })
72            .wait(ctx);
73    }
74
75    fn stopping(&mut self, _: &mut Self::Context) -> Running {
76        debug!("{} disconnected from {}", self.id, self.room);
77        self.addr.do_send(SocketMessageDisconnect { id: self.id });
78        Running::Stop
79    }
80}
81
82impl Handler<SocketMessageStr> for MahjongWebsocketSession {
83    type Result = ();
84
85    fn handle(&mut self, msg: SocketMessageStr, ctx: &mut Self::Context) {
86        ctx.text(msg.0);
87    }
88}
89
90impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MahjongWebsocketSession {
91    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
92        let msg = match msg {
93            Err(_) => {
94                ctx.stop();
95                return;
96            }
97            Ok(msg) => msg,
98        };
99
100        match msg {
101            ws::Message::Ping(msg) => {
102                self.hb = Instant::now();
103                ctx.pong(&msg);
104            }
105            ws::Message::Pong(_) => {
106                self.hb = Instant::now();
107            }
108            ws::Message::Text(text) => {
109                let message = serde_json::from_str::<SocketMessage>(&text);
110                if message.is_err() {
111                    return;
112                }
113
114                if let Ok(SocketMessage::ListRooms) = message {
115                    self.addr
116                        .send(SocketMessageListRooms)
117                        .into_actor(self)
118                        .then(|res, _, new_ctx| {
119                            if let Ok(rooms) = res {
120                                for room in rooms {
121                                    let room =
122                                        serde_json::to_string(&SocketMessage::Name(room)).unwrap();
123
124                                    new_ctx.text(room);
125                                }
126                            }
127                            fut::ready(())
128                        })
129                        .wait(ctx)
130                }
131            }
132            ws::Message::Close(reason) => {
133                ctx.close(reason);
134                ctx.stop();
135            }
136            ws::Message::Continuation(_) => {
137                ctx.stop();
138            }
139            _ => {}
140        }
141    }
142}