bones_framework/networking/
lan.rs1#![allow(missing_docs)]
12
13use std::{net::IpAddr, time::Duration};
14
15use iroh::{endpoint::get_remote_node_id, NodeAddr};
16use mdns_sd::{ServiceDaemon, ServiceInfo};
17use smallvec::SmallVec;
18use tracing::warn;
19
20use crate::networking::socket::establish_peer_connections;
21use crate::utils::BiChannelServer;
22
23use super::socket::Socket;
24use super::*;
25
26#[derive(Clone)]
28pub struct ServerInfo {
29 pub service: ServiceInfo,
31 pub ping: Option<u16>,
33}
34
35#[derive(Clone)]
37pub struct ServiceDiscoveryReceiver(mdns_sd::Receiver<mdns_sd::ServiceEvent>);
38
39static LAN_MATCHMAKER: Lazy<LanMatchmaker> = Lazy::new(|| {
43 let (client, server) = bi_channel();
44
45 RUNTIME.spawn(async move {
46 if let Err(err) = lan_matchmaker(server).await {
47 warn!("lan matchmaker failed: {err:?}");
48 }
49 });
50
51 LanMatchmaker(client)
52});
53
54static MDNS: Lazy<ServiceDaemon> =
55 Lazy::new(|| ServiceDaemon::new().expect("Couldn't start MDNS service discovery thread."));
56
57#[derive(DerefMut, Deref)]
58struct Pinger(BiChannelClient<PingerRequest, PingerResponse>);
59
60type PingerRequest = SmallVec<[IpAddr; 10]>;
61type PingerResponse = SmallVec<[(IpAddr, Option<u16>); 10]>;
62
63static PINGER: Lazy<Pinger> = Lazy::new(|| {
64 let (client, server) = bi_channel();
65
66 std::thread::spawn(move || pinger(server));
67
68 Pinger(client)
69});
70
71pub fn start_server(server: ServerInfo, player_count: u32) {
75 MDNS.register(server.service)
76 .expect("Could not register MDNS service.");
77 LAN_MATCHMAKER
78 .try_send(LanMatchmakerRequest::StartServer { player_count })
79 .unwrap();
80}
81
82pub fn stop_server(server: &ServerInfo) {
84 if let Err(err) = stop_server_by_name(server.service.get_fullname()) {
85 warn!("Lan: failed to stop server: {err:?}");
86 }
87 LAN_MATCHMAKER
88 .try_send(LanMatchmakerRequest::StopServer)
89 .unwrap();
90}
91
92fn stop_server_by_name(name: &str) -> anyhow::Result<()> {
94 loop {
95 match MDNS.unregister(name) {
96 Ok(_) => break,
97 Err(mdns_sd::Error::Again) => (),
98 Err(e) => {
99 anyhow::bail!("Error unregistering MDNS service: {e}")
100 }
101 }
102 }
103 Ok(())
104}
105
106pub fn wait_players(joined_players: &mut usize, server: &ServerInfo) -> Option<NetworkMatchSocket> {
108 while let Ok(response) = LAN_MATCHMAKER.try_recv() {
109 match response {
110 LanMatchmakerResponse::ServerStarted => {}
111 LanMatchmakerResponse::PlayerCount(count) => {
112 *joined_players = count;
113 }
114 LanMatchmakerResponse::GameStarting {
115 socket,
116 player_idx,
117 player_count: _,
118 } => {
119 info!(?player_idx, "Starting network game");
120 if let Err(err) = stop_server_by_name(server.service.get_fullname()) {
121 warn!("Lan: failed to stop server: {err:?}");
122 }
123 return Some(NetworkMatchSocket(Arc::new(socket)));
124 }
125 }
126 }
127 None
128}
129
130pub fn join_server(server: &ServerInfo) -> anyhow::Result<()> {
132 let addr_raw = server
133 .service
134 .get_properties()
135 .get_property_val_str("node-addr")
136 .ok_or_else(|| anyhow::anyhow!("missing node-addr property from discovery"))?;
137 let addr_raw = hex::decode(addr_raw)?;
138 let addr: NodeAddr = postcard::from_bytes(&addr_raw)?;
139 LAN_MATCHMAKER
140 .try_send(lan::LanMatchmakerRequest::JoinServer { addr })
141 .unwrap();
142 Ok(())
143}
144
145pub fn leave_server() {
147 LAN_MATCHMAKER
148 .try_send(LanMatchmakerRequest::StopJoin)
149 .unwrap();
150}
151
152pub fn wait_game_start() -> Option<NetworkMatchSocket> {
154 while let Ok(message) = LAN_MATCHMAKER.try_recv() {
155 match message {
156 LanMatchmakerResponse::ServerStarted | LanMatchmakerResponse::PlayerCount(_) => {}
157 LanMatchmakerResponse::GameStarting {
158 socket,
159 player_idx,
160 player_count: _,
161 } => {
162 info!(?player_idx, "Starting network game");
163 return Some(NetworkMatchSocket(Arc::new(socket)));
164 }
165 }
166 }
167 None
168}
169
170pub fn prepare_to_join(
172 service_type: &str,
173 servers: &mut Vec<ServerInfo>,
174 service_discovery_recv: &mut Option<ServiceDiscoveryReceiver>,
175 ping_update_timer: &Timer,
176) {
177 if ping_update_timer.finished() {
179 PINGER
180 .try_send(
181 servers
182 .iter()
183 .map(|x| *x.service.get_addresses().iter().next().unwrap())
184 .collect(),
185 )
186 .ok();
187 }
188 if let Ok(pings) = PINGER.try_recv() {
189 for (server, ping) in pings {
190 for info in servers.iter_mut() {
191 if info.service.get_addresses().contains(&server) {
192 info.ping = ping;
193 }
194 }
195 }
196 }
197
198 let events = service_discovery_recv.get_or_insert_with(|| {
199 ServiceDiscoveryReceiver(
200 MDNS.browse(service_type)
201 .expect("Couldn't start service discovery"),
202 )
203 });
204
205 while let Ok(event) = events.0.try_recv() {
206 match event {
207 mdns_sd::ServiceEvent::ServiceResolved(info) => {
208 info!("Found lan service!");
209 servers.push(lan::ServerInfo {
210 service: info,
211 ping: None,
212 })
213 }
214 mdns_sd::ServiceEvent::ServiceRemoved(_, full_name) => {
215 servers.retain(|server| server.service.get_fullname() != full_name);
216 }
217 _ => (),
218 }
219 }
220}
221
222pub async fn prepare_to_host<'a>(
226 host_info: &'a mut Option<ServerInfo>,
227 service_type: &str,
228 service_name: &str,
229) -> (bool, &'a mut ServerInfo) {
230 let create_service_info = || async {
231 info!("New service hosting");
232 let ep = get_network_endpoint().await;
233 let mut my_addr = ep.node_addr().await.expect("network endpoint dead");
234 my_addr
235 .info
236 .direct_addresses
237 .retain(std::net::SocketAddr::is_ipv4);
238 let port = my_addr.info.direct_addresses.first().unwrap().port();
239 let mut props = std::collections::HashMap::default();
240 let addr_encoded = hex::encode(postcard::to_stdvec(&my_addr).unwrap());
241 props.insert("node-addr".to_string(), addr_encoded);
242 let service =
243 mdns_sd::ServiceInfo::new(service_type, service_name, service_name, "", port, props)
244 .unwrap()
245 .enable_addr_auto();
246 ServerInfo {
247 service,
248 ping: None,
249 }
250 };
251
252 if host_info.is_none() {
253 let info = create_service_info().await;
254 host_info.replace(info);
255 }
256 let service_info = host_info.as_mut().unwrap();
257
258 let mut is_recreated = false;
259 if service_info.service.get_hostname() != service_name {
260 stop_server_by_name(service_info.service.get_fullname()).unwrap();
261 is_recreated = true;
262 *service_info = create_service_info().await;
263 }
264 (is_recreated, service_info)
265}
266
267async fn lan_matchmaker(
272 matchmaker_channel: BiChannelServer<LanMatchmakerRequest, LanMatchmakerResponse>,
273) -> anyhow::Result<()> {
274 while let Ok(request) = matchmaker_channel.recv().await {
275 match request {
276 LanMatchmakerRequest::StartServer { player_count } => {
278 if let Err(err) = lan_start_server(&matchmaker_channel, player_count).await {
279 warn!("lan server failed: {err:?}");
280 }
281 }
283 LanMatchmakerRequest::StopServer => (),
285 LanMatchmakerRequest::StopJoin => (),
286
287 LanMatchmakerRequest::JoinServer { addr } => {
289 if let Err(err) = lan_join_server(&matchmaker_channel, addr).await {
290 warn!("failed to join server: {err:?}");
291 }
292 }
293 }
294 }
295
296 Ok(())
297}
298
299async fn lan_start_server(
300 matchmaker_channel: &BiChannelServer<LanMatchmakerRequest, LanMatchmakerResponse>,
301 mut player_count: u32,
302) -> anyhow::Result<()> {
303 info!("Starting LAN server");
304 matchmaker_channel
305 .send(LanMatchmakerResponse::ServerStarted)
306 .await?;
307
308 let mut connections = Vec::new();
309 let ep = get_network_endpoint().await;
310
311 loop {
312 tokio::select! {
313 next_request = matchmaker_channel.recv() => {
314 match next_request? {
315 LanMatchmakerRequest::StartServer {
316 player_count: new_player_count,
317 } => {
318 connections.clear();
319 player_count = new_player_count;
320 }
321 LanMatchmakerRequest::StopServer => {
322 break;
323 }
324 LanMatchmakerRequest::StopJoin => {} LanMatchmakerRequest::JoinServer { .. } => {
326 anyhow::bail!("Cannot join server while hosting server");
327 }
328 }
329 }
330
331 incomming = ep.accept() => {
333 let Some(incomming) = incomming else {
334 anyhow::bail!("unable to accept new connections");
335 };
336 let result = async move {
337 let mut connecting = incomming.accept()?;
338 let alpn = connecting.alpn().await?;
339 anyhow::ensure!(alpn == PLAY_ALPN, "unexpected ALPN");
340 let conn = connecting.await?;
341 anyhow::Ok(conn)
342 };
343
344 match result.await {
345 Ok(conn) => {
346 connections.push(conn);
347 let current_players = connections.len() + 1;
348 info!(%current_players, "New player connection");
349 }
350 Err(err) => {
351 warn!("failed to accept connection: {:?}", err);
352 continue;
353 }
354 }
355 }
356 }
357
358 connections.retain(|conn| {
360 if conn.close_reason().is_some() {
361 info!("Player closed connection");
362 false
363 } else {
364 true
365 }
366 });
367
368 let current_players = connections.len();
369 let target_players = player_count;
370 info!(%current_players, %target_players);
371
372 if connections.len() == (player_count - 1) as usize {
374 info!("All players joined.");
375
376 let endpoint = get_network_endpoint().await;
377
378 for (i, conn) in connections.iter().enumerate() {
380 let mut peers = Vec::new();
381 connections
382 .iter()
383 .enumerate()
384 .filter(|x| x.0 != i)
385 .for_each(|(i, conn)| {
386 let id = get_remote_node_id(conn).expect("invalid connection");
387 let mut addr = NodeAddr::new(id);
388 if let Some(info) = endpoint.remote_info(id) {
389 if let Some(relay_url) = info.relay_url {
390 addr = addr.with_relay_url(relay_url.relay_url);
391 }
392 addr = addr.with_direct_addresses(
393 info.addrs.into_iter().map(|addr| addr.addr),
394 );
395 }
396
397 peers.push((u32::try_from(i + 1).unwrap(), addr));
398 });
399
400 let mut uni = conn.open_uni().await?;
401 uni.write_all(&postcard::to_vec::<_, 20>(&MatchmakerNetMsg::MatchReady {
402 player_idx: (i + 1).try_into()?,
403 peers,
404 player_count,
405 })?)
406 .await?;
407 uni.finish()?;
408 uni.stopped().await?;
409 }
410
411 let connections = connections
412 .into_iter()
413 .enumerate()
414 .map(|(i, c)| (u32::try_from(i + 1).unwrap(), c))
415 .collect();
416
417 matchmaker_channel
419 .send(LanMatchmakerResponse::GameStarting {
420 socket: Socket::new(0, connections),
421 player_idx: 0,
422 player_count,
423 })
424 .await?;
425 info!(player_idx=0, %player_count, "Matchmaking finished");
426
427 break;
429
430 } else {
432 matchmaker_channel
433 .send(LanMatchmakerResponse::PlayerCount(current_players))
434 .await?;
435 }
436 }
437
438 Ok(())
439}
440
441async fn lan_join_server(
442 matchmaker_channel: &BiChannelServer<LanMatchmakerRequest, LanMatchmakerResponse>,
443 addr: NodeAddr,
444) -> anyhow::Result<()> {
445 let ep = get_network_endpoint().await;
446 let conn = ep.connect(addr, PLAY_ALPN).await?;
447
448 let mut uni = conn.accept_uni().await?;
450 let bytes = uni.read_to_end(20).await?;
451 let message: MatchmakerNetMsg = postcard::from_bytes(&bytes)?;
452
453 match message {
454 MatchmakerNetMsg::MatchReady {
455 peers: peer_addrs,
456 player_idx,
457 player_count,
458 } => {
459 info!(%player_count, %player_idx, ?peer_addrs, "Matchmaking finished");
460
461 let peer_connections =
462 establish_peer_connections(player_idx, player_count, peer_addrs, Some(conn))
463 .await?;
464
465 let socket = Socket::new(player_idx, peer_connections);
466 info!("Connections established.");
467
468 matchmaker_channel
469 .send(LanMatchmakerResponse::GameStarting {
470 socket,
471 player_idx,
472 player_count,
473 })
474 .await?;
475 }
476 }
477 Ok(())
478}
479
480#[derive(Serialize, Deserialize)]
481enum MatchmakerNetMsg {
482 MatchReady {
483 peers: Vec<(u32, NodeAddr)>,
485 player_idx: u32,
487 player_count: u32,
488 },
489}
490
491#[derive(DerefMut, Deref)]
493pub struct LanMatchmaker(BiChannelClient<LanMatchmakerRequest, LanMatchmakerResponse>);
494
495#[derive(Debug)]
497pub enum LanMatchmakerRequest {
498 StartServer {
500 player_count: u32,
502 },
503 JoinServer {
505 addr: NodeAddr,
507 },
508 StopServer,
510 StopJoin,
512}
513
514pub enum LanMatchmakerResponse {
516 ServerStarted,
518 PlayerCount(usize),
520 GameStarting {
522 socket: Socket,
524 player_idx: u32,
526 player_count: u32,
528 },
529}
530
531fn pinger(server: BiChannelServer<PingerRequest, PingerResponse>) {
532 while let Ok(servers) = server.recv_blocking() {
533 let mut pings = SmallVec::new();
534 for server in servers {
535 let start = Instant::now();
536 let ping_result =
537 ping_rs::send_ping(&server, Duration::from_secs(2), &[1, 2, 3, 4], None);
538
539 let ping = if let Err(e) = ping_result {
540 warn!("Error pinging {server}: {e:?}");
541 None
542 } else {
543 Some((Instant::now() - start).as_millis() as u16)
544 };
545
546 pings.push((server, ping));
547 }
548 if server.send_blocking(pings).is_err() {
549 break;
550 }
551 }
552}