tramex_tools/interface/websocket/
ws_connection.rs

1//! WsConnection struct
2use core::fmt::{Debug, Formatter};
3use ewebsock::{WsEvent, WsMessage, WsReceiver, WsSender};
4use std::vec;
5
6use crate::interface::interface_types::InterfaceTrait;
7use crate::interface::types::BaseMessage;
8use crate::tramex_error;
9use crate::{data::Data, errors::TramexError};
10
11use crate::interface::{layer::Layers, log_get::LogGet, types::WebSocketLog};
12/// WsConnection struct
13pub struct WsConnection {
14    /// WebSocket sender
15    pub ws_sender: WsSender,
16
17    /// WebSocket receiver
18    pub ws_receiver: WsReceiver,
19
20    /// Message ID
21    pub msg_id: u64,
22
23    /// Connecting flag
24    pub connecting: bool,
25
26    /// Asking size max
27    pub asking_size_max: u64,
28
29    /// Available flag
30    pub available: bool,
31
32    /// Name of the receiver
33    pub name: String,
34}
35
36impl WsConnection {
37    /// Create a new WsConnection
38    pub fn new(ws_sender: WsSender, ws_receiver: WsReceiver) -> Self {
39        Self {
40            ws_sender,
41            ws_receiver,
42            msg_id: 1,
43            connecting: true,
44            asking_size_max: 1024,
45            available: true,
46            name: "".to_string(),
47        }
48    }
49
50    /// Connect to a WebSocket
51    /// # Errors
52    /// Return an error as String if the connection failed - see [`ewebsock::connect_with_wakeup`] for more details
53    pub fn connect(url: &str, wakeup: impl Fn() + Send + Sync + 'static) -> Result<(WsSender, WsReceiver), String> {
54        let options = ewebsock::Options {
55            #[cfg(not(target_arch = "wasm32"))]
56            additional_headers: vec![("Origin".to_string(), "tramex".to_string())],
57            ..Default::default()
58        };
59        ewebsock::connect_with_wakeup(url, options, wakeup)
60    }
61
62    /// Try to close the ws
63    /// # Errors
64    /// Return an error if its fail see [`ewebsock::WsSender::close`] for more details
65    pub fn close_impl(&mut self) -> Result<(), TramexError> {
66        self.ws_sender.close();
67        Ok(())
68    }
69}
70
71impl InterfaceTrait for WsConnection {
72    fn get_more_data(&mut self, layer_list: Layers, _data: &mut Data) -> Result<(), Vec<TramexError>> {
73        let msg = LogGet::new(self.msg_id, layer_list, self.asking_size_max);
74        log::debug!("Sending message: {msg:?}");
75        match serde_json::to_string(&msg) {
76            Ok(msg_stringed) => {
77                log::debug!("{msg_stringed}");
78                self.ws_sender.send(WsMessage::Text(msg_stringed));
79                self.msg_id += 1;
80            }
81            Err(err) => {
82                log::error!("Error encoding message: {err:?}");
83                return Err(vec![tramex_error!(
84                    err.to_string(),
85                    crate::errors::ErrorCode::WebSocketErrorEncodingMessage
86                )]);
87            }
88        }
89        Ok(())
90    }
91
92    fn close(&mut self) -> Result<(), TramexError> {
93        self.close_impl()
94    }
95}
96
97impl WsConnection {
98    /// Try to receive data
99    /// # Errors
100    /// Return an error if the data is not received correctly
101    pub fn try_recv(&mut self, data: &mut Data) -> Result<(), Vec<TramexError>> {
102        while let Some(event) = self.ws_receiver.try_recv() {
103            self.connecting = false;
104            match event {
105                WsEvent::Message(msg) => {
106                    self.available = true;
107                    match msg {
108                        WsMessage::Text(event_text) => {
109                            let decoded: Result<WebSocketLog, serde_json::Error> = serde_json::from_str(&event_text);
110                            match decoded {
111                                Ok(decoded_data) => {
112                                    let mut errors = vec![];
113                                    for one_log in decoded_data.logs {
114                                        match one_log.extract_data() {
115                                            Ok(trace) => {
116                                                data.events.push(trace);
117                                            }
118                                            Err(err) => {
119                                                log::error!("Error while extracting data: {err:?}");
120                                                errors.push(err);
121                                            }
122                                        }
123                                    }
124                                    if !errors.is_empty() {
125                                        return Err(errors);
126                                    }
127                                }
128                                Err(_err) => {
129                                    let decoded_base: Result<BaseMessage, serde_json::Error> =
130                                        serde_json::from_str(&event_text);
131                                    match decoded_base {
132                                        Ok(decoded_data) => {
133                                            if decoded_data.message == "ready" {
134                                                log::debug!("Received ready message");
135                                            }
136                                            log::debug!("Received BaseMessage: {decoded_data:?}");
137                                            self.name = decoded_data.name;
138                                        }
139                                        Err(err) => {
140                                            log::error!("Error decoding message: {err:?}");
141                                            log::error!("Message: {event_text:?}");
142                                            return Err(vec![tramex_error!(
143                                                err.to_string(),
144                                                crate::errors::ErrorCode::WebSocketErrorDecodingMessage
145                                            )]);
146                                        }
147                                    }
148                                }
149                            }
150                        }
151                        WsMessage::Unknown(str_error) => {
152                            log::error!("Unknown message: {str_error:?}");
153                            return Err(vec![tramex_error!(
154                                str_error,
155                                crate::errors::ErrorCode::WebSocketUnknownMessageReceived
156                            )]);
157                        }
158                        WsMessage::Binary(bin) => {
159                            log::error!("Unknown binary message: {bin:?}");
160                            return Err(vec![tramex_error!(
161                                format!("Unknown binary message: {bin:?}"),
162                                crate::errors::ErrorCode::WebSocketUnknownBinaryMessageReceived
163                            )]);
164                        }
165                        _ => {
166                            log::debug!("Received Ping-Pong")
167                        }
168                    }
169                }
170                WsEvent::Opened => {
171                    self.available = true;
172                    log::debug!("WebSocket opened");
173                }
174                WsEvent::Closed => {
175                    self.available = false;
176                    log::debug!("WebSocket closed");
177                    return Err(vec![tramex_error!(
178                        "WebSocket closed".to_string(),
179                        crate::errors::ErrorCode::WebSocketClosed
180                    )]);
181                }
182                WsEvent::Error(str_err) => {
183                    self.available = false;
184                    log::error!("WebSocket error: {str_err:?}");
185                    return Err(vec![tramex_error!(str_err, crate::errors::ErrorCode::WebSocketError)]);
186                }
187            }
188        }
189        Ok(())
190    }
191}
192
193impl Debug for WsConnection {
194    fn fmt(&self, formatter: &mut Formatter<'_>) -> core::fmt::Result {
195        formatter
196            .debug_struct("Interface")
197            .field("ws_sender", &"Box<WsSender>")
198            .field("ws_receiver", &"Box<WsReceiver>")
199            .field("connecting", &self.connecting)
200            .finish()
201    }
202}
203
204impl Drop for WsConnection {
205    fn drop(&mut self) {
206        log::debug!("Cleaning WsConnection");
207        self.ws_sender.close()
208    }
209}