tramex_tools/interface/websocket/
ws_connection.rs1use core::fmt::{Debug, Formatter};
3use ewebsock::{WsEvent, WsMessage, WsReceiver, WsSender};
4use std::vec;
5
6use crate::interface::interface_types::InterfaceTrait;
7use crate::interface::types::BaseMessage;
8use crate::tramex_error;
9use crate::{data::Data, errors::TramexError};
10
11use crate::interface::{layer::Layers, log_get::LogGet, types::WebSocketLog};
12pub struct WsConnection {
14 pub ws_sender: WsSender,
16
17 pub ws_receiver: WsReceiver,
19
20 pub msg_id: u64,
22
23 pub connecting: bool,
25
26 pub asking_size_max: u64,
28
29 pub available: bool,
31
32 pub name: String,
34}
35
36impl WsConnection {
37 pub fn new(ws_sender: WsSender, ws_receiver: WsReceiver) -> Self {
39 Self {
40 ws_sender,
41 ws_receiver,
42 msg_id: 1,
43 connecting: true,
44 asking_size_max: 1024,
45 available: true,
46 name: "".to_string(),
47 }
48 }
49
50 pub fn connect(url: &str, wakeup: impl Fn() + Send + Sync + 'static) -> Result<(WsSender, WsReceiver), String> {
54 let options = ewebsock::Options {
55 #[cfg(not(target_arch = "wasm32"))]
56 additional_headers: vec![("Origin".to_string(), "tramex".to_string())],
57 ..Default::default()
58 };
59 ewebsock::connect_with_wakeup(url, options, wakeup)
60 }
61
62 pub fn close_impl(&mut self) -> Result<(), TramexError> {
66 self.ws_sender.close();
67 Ok(())
68 }
69}
70
71impl InterfaceTrait for WsConnection {
72 fn get_more_data(&mut self, layer_list: Layers, _data: &mut Data) -> Result<(), Vec<TramexError>> {
73 let msg = LogGet::new(self.msg_id, layer_list, self.asking_size_max);
74 log::debug!("Sending message: {msg:?}");
75 match serde_json::to_string(&msg) {
76 Ok(msg_stringed) => {
77 log::debug!("{msg_stringed}");
78 self.ws_sender.send(WsMessage::Text(msg_stringed));
79 self.msg_id += 1;
80 }
81 Err(err) => {
82 log::error!("Error encoding message: {err:?}");
83 return Err(vec![tramex_error!(
84 err.to_string(),
85 crate::errors::ErrorCode::WebSocketErrorEncodingMessage
86 )]);
87 }
88 }
89 Ok(())
90 }
91
92 fn close(&mut self) -> Result<(), TramexError> {
93 self.close_impl()
94 }
95}
96
97impl WsConnection {
98 pub fn try_recv(&mut self, data: &mut Data) -> Result<(), Vec<TramexError>> {
102 while let Some(event) = self.ws_receiver.try_recv() {
103 self.connecting = false;
104 match event {
105 WsEvent::Message(msg) => {
106 self.available = true;
107 match msg {
108 WsMessage::Text(event_text) => {
109 let decoded: Result<WebSocketLog, serde_json::Error> = serde_json::from_str(&event_text);
110 match decoded {
111 Ok(decoded_data) => {
112 let mut errors = vec![];
113 for one_log in decoded_data.logs {
114 match one_log.extract_data() {
115 Ok(trace) => {
116 data.events.push(trace);
117 }
118 Err(err) => {
119 log::error!("Error while extracting data: {err:?}");
120 errors.push(err);
121 }
122 }
123 }
124 if !errors.is_empty() {
125 return Err(errors);
126 }
127 }
128 Err(_err) => {
129 let decoded_base: Result<BaseMessage, serde_json::Error> =
130 serde_json::from_str(&event_text);
131 match decoded_base {
132 Ok(decoded_data) => {
133 if decoded_data.message == "ready" {
134 log::debug!("Received ready message");
135 }
136 log::debug!("Received BaseMessage: {decoded_data:?}");
137 self.name = decoded_data.name;
138 }
139 Err(err) => {
140 log::error!("Error decoding message: {err:?}");
141 log::error!("Message: {event_text:?}");
142 return Err(vec![tramex_error!(
143 err.to_string(),
144 crate::errors::ErrorCode::WebSocketErrorDecodingMessage
145 )]);
146 }
147 }
148 }
149 }
150 }
151 WsMessage::Unknown(str_error) => {
152 log::error!("Unknown message: {str_error:?}");
153 return Err(vec![tramex_error!(
154 str_error,
155 crate::errors::ErrorCode::WebSocketUnknownMessageReceived
156 )]);
157 }
158 WsMessage::Binary(bin) => {
159 log::error!("Unknown binary message: {bin:?}");
160 return Err(vec![tramex_error!(
161 format!("Unknown binary message: {bin:?}"),
162 crate::errors::ErrorCode::WebSocketUnknownBinaryMessageReceived
163 )]);
164 }
165 _ => {
166 log::debug!("Received Ping-Pong")
167 }
168 }
169 }
170 WsEvent::Opened => {
171 self.available = true;
172 log::debug!("WebSocket opened");
173 }
174 WsEvent::Closed => {
175 self.available = false;
176 log::debug!("WebSocket closed");
177 return Err(vec![tramex_error!(
178 "WebSocket closed".to_string(),
179 crate::errors::ErrorCode::WebSocketClosed
180 )]);
181 }
182 WsEvent::Error(str_err) => {
183 self.available = false;
184 log::error!("WebSocket error: {str_err:?}");
185 return Err(vec![tramex_error!(str_err, crate::errors::ErrorCode::WebSocketError)]);
186 }
187 }
188 }
189 Ok(())
190 }
191}
192
193impl Debug for WsConnection {
194 fn fmt(&self, formatter: &mut Formatter<'_>) -> core::fmt::Result {
195 formatter
196 .debug_struct("Interface")
197 .field("ws_sender", &"Box<WsSender>")
198 .field("ws_receiver", &"Box<WsReceiver>")
199 .field("connecting", &self.connecting)
200 .finish()
201 }
202}
203
204impl Drop for WsConnection {
205 fn drop(&mut self) {
206 log::debug!("Cleaning WsConnection");
207 self.ws_sender.close()
208 }
209}