bones_framework/networking/
lan.rs

1//! LAN matchmaking and socket implementation.
2//!
3//! ## Matchmaking
4//!
5//! The LAN matchmaker works by allowing the player to start a match and wait for people to join it,
6//! or to join player's started match.
7//!
8//! Communication happens directly between LAN peers over the QUIC protocol.
9
10// TODO
11#![allow(missing_docs)]
12
13use std::{net::IpAddr, time::Duration};
14
15use iroh::{endpoint::get_remote_node_id, NodeAddr};
16use mdns_sd::{ServiceDaemon, ServiceInfo};
17use smallvec::SmallVec;
18use tracing::warn;
19
20use crate::networking::socket::establish_peer_connections;
21use crate::utils::BiChannelServer;
22
23use super::socket::Socket;
24use super::*;
25
26/// Service discover info and ping.
27#[derive(Clone)]
28pub struct ServerInfo {
29    /// mutli-cast dns service discover info.
30    pub service: ServiceInfo,
31    /// The ping in milliseconds
32    pub ping: Option<u16>,
33}
34
35/// Receiver for LAN service discovery channel.
36#[derive(Clone)]
37pub struct ServiceDiscoveryReceiver(mdns_sd::Receiver<mdns_sd::ServiceEvent>);
38
39/// Channel used to do matchmaking over LAN.
40///
41/// Spawns a task to handle the actual matchmaking.
42static LAN_MATCHMAKER: Lazy<LanMatchmaker> = Lazy::new(|| {
43    let (client, server) = bi_channel();
44
45    RUNTIME.spawn(async move {
46        if let Err(err) = lan_matchmaker(server).await {
47            warn!("lan matchmaker failed: {err:?}");
48        }
49    });
50
51    LanMatchmaker(client)
52});
53
54static MDNS: Lazy<ServiceDaemon> =
55    Lazy::new(|| ServiceDaemon::new().expect("Couldn't start MDNS service discovery thread."));
56
57#[derive(DerefMut, Deref)]
58struct Pinger(BiChannelClient<PingerRequest, PingerResponse>);
59
60type PingerRequest = SmallVec<[IpAddr; 10]>;
61type PingerResponse = SmallVec<[(IpAddr, Option<u16>); 10]>;
62
63static PINGER: Lazy<Pinger> = Lazy::new(|| {
64    let (client, server) = bi_channel();
65
66    std::thread::spawn(move || pinger(server));
67
68    Pinger(client)
69});
70
71/// Host a server.
72///
73/// The number of players is limited to `u32::MAX`.
74pub fn start_server(server: ServerInfo, player_count: u32) {
75    MDNS.register(server.service)
76        .expect("Could not register MDNS service.");
77    LAN_MATCHMAKER
78        .try_send(LanMatchmakerRequest::StartServer { player_count })
79        .unwrap();
80}
81
82/// Stop hosting a server.
83pub fn stop_server(server: &ServerInfo) {
84    if let Err(err) = stop_server_by_name(server.service.get_fullname()) {
85        warn!("Lan: failed to stop server: {err:?}");
86    }
87    LAN_MATCHMAKER
88        .try_send(LanMatchmakerRequest::StopServer)
89        .unwrap();
90}
91
92/// Stop hosting a server specified by name. (Use [`ServiceInfo::get_fullname()`].)
93fn stop_server_by_name(name: &str) -> anyhow::Result<()> {
94    loop {
95        match MDNS.unregister(name) {
96            Ok(_) => break,
97            Err(mdns_sd::Error::Again) => (),
98            Err(e) => {
99                anyhow::bail!("Error unregistering MDNS service: {e}")
100            }
101        }
102    }
103    Ok(())
104}
105
106/// Wait for players to join a hosted server.
107pub fn wait_players(joined_players: &mut usize, server: &ServerInfo) -> Option<NetworkMatchSocket> {
108    while let Ok(response) = LAN_MATCHMAKER.try_recv() {
109        match response {
110            LanMatchmakerResponse::ServerStarted => {}
111            LanMatchmakerResponse::PlayerCount(count) => {
112                *joined_players = count;
113            }
114            LanMatchmakerResponse::GameStarting {
115                socket,
116                player_idx,
117                player_count: _,
118            } => {
119                info!(?player_idx, "Starting network game");
120                if let Err(err) = stop_server_by_name(server.service.get_fullname()) {
121                    warn!("Lan: failed to stop server: {err:?}");
122                }
123                return Some(NetworkMatchSocket(Arc::new(socket)));
124            }
125        }
126    }
127    None
128}
129
130/// Join a server hosted by someone else.
131pub fn join_server(server: &ServerInfo) -> anyhow::Result<()> {
132    let addr_raw = server
133        .service
134        .get_properties()
135        .get_property_val_str("node-addr")
136        .ok_or_else(|| anyhow::anyhow!("missing node-addr property from discovery"))?;
137    let addr_raw = hex::decode(addr_raw)?;
138    let addr: NodeAddr = postcard::from_bytes(&addr_raw)?;
139    LAN_MATCHMAKER
140        .try_send(lan::LanMatchmakerRequest::JoinServer { addr })
141        .unwrap();
142    Ok(())
143}
144
145/// Leave a joined server.
146pub fn leave_server() {
147    LAN_MATCHMAKER
148        .try_send(LanMatchmakerRequest::StopJoin)
149        .unwrap();
150}
151
152/// Wait for a joined game to start.
153pub fn wait_game_start() -> Option<NetworkMatchSocket> {
154    while let Ok(message) = LAN_MATCHMAKER.try_recv() {
155        match message {
156            LanMatchmakerResponse::ServerStarted | LanMatchmakerResponse::PlayerCount(_) => {}
157            LanMatchmakerResponse::GameStarting {
158                socket,
159                player_idx,
160                player_count: _,
161            } => {
162                info!(?player_idx, "Starting network game");
163                return Some(NetworkMatchSocket(Arc::new(socket)));
164            }
165        }
166    }
167    None
168}
169
170/// Update server pings and turn on service discovery.
171pub fn prepare_to_join(
172    service_type: &str,
173    servers: &mut Vec<ServerInfo>,
174    service_discovery_recv: &mut Option<ServiceDiscoveryReceiver>,
175    ping_update_timer: &Timer,
176) {
177    // Update server pings
178    if ping_update_timer.finished() {
179        PINGER
180            .try_send(
181                servers
182                    .iter()
183                    .map(|x| *x.service.get_addresses().iter().next().unwrap())
184                    .collect(),
185            )
186            .ok();
187    }
188    if let Ok(pings) = PINGER.try_recv() {
189        for (server, ping) in pings {
190            for info in servers.iter_mut() {
191                if info.service.get_addresses().contains(&server) {
192                    info.ping = ping;
193                }
194            }
195        }
196    }
197
198    let events = service_discovery_recv.get_or_insert_with(|| {
199        ServiceDiscoveryReceiver(
200            MDNS.browse(service_type)
201                .expect("Couldn't start service discovery"),
202        )
203    });
204
205    while let Ok(event) = events.0.try_recv() {
206        match event {
207            mdns_sd::ServiceEvent::ServiceResolved(info) => {
208                info!("Found lan service!");
209                servers.push(lan::ServerInfo {
210                    service: info,
211                    ping: None,
212                })
213            }
214            mdns_sd::ServiceEvent::ServiceRemoved(_, full_name) => {
215                servers.retain(|server| server.service.get_fullname() != full_name);
216            }
217            _ => (),
218        }
219    }
220}
221
222/// Get the current host info or create a new one. When there's an existing
223/// service but its `service_name` is different, the service is recreated and
224/// only then the returned `bool` is `true`.
225pub async fn prepare_to_host<'a>(
226    host_info: &'a mut Option<ServerInfo>,
227    service_type: &str,
228    service_name: &str,
229) -> (bool, &'a mut ServerInfo) {
230    let create_service_info = || async {
231        info!("New service hosting");
232        let ep = get_network_endpoint().await;
233        let mut my_addr = ep.node_addr().await.expect("network endpoint dead");
234        my_addr
235            .info
236            .direct_addresses
237            .retain(std::net::SocketAddr::is_ipv4);
238        let port = my_addr.info.direct_addresses.first().unwrap().port();
239        let mut props = std::collections::HashMap::default();
240        let addr_encoded = hex::encode(postcard::to_stdvec(&my_addr).unwrap());
241        props.insert("node-addr".to_string(), addr_encoded);
242        let service =
243            mdns_sd::ServiceInfo::new(service_type, service_name, service_name, "", port, props)
244                .unwrap()
245                .enable_addr_auto();
246        ServerInfo {
247            service,
248            ping: None,
249        }
250    };
251
252    if host_info.is_none() {
253        let info = create_service_info().await;
254        host_info.replace(info);
255    }
256    let service_info = host_info.as_mut().unwrap();
257
258    let mut is_recreated = false;
259    if service_info.service.get_hostname() != service_name {
260        stop_server_by_name(service_info.service.get_fullname()).unwrap();
261        is_recreated = true;
262        *service_info = create_service_info().await;
263    }
264    (is_recreated, service_info)
265}
266
267/// Implementation of the lan matchmaker task.
268///
269/// This is a long-running tasks that listens for messages sent through the `LAN_MATCHMAKER`
270/// channel.
271async fn lan_matchmaker(
272    matchmaker_channel: BiChannelServer<LanMatchmakerRequest, LanMatchmakerResponse>,
273) -> anyhow::Result<()> {
274    while let Ok(request) = matchmaker_channel.recv().await {
275        match request {
276            // Start server
277            LanMatchmakerRequest::StartServer { player_count } => {
278                if let Err(err) = lan_start_server(&matchmaker_channel, player_count).await {
279                    warn!("lan server failed: {err:?}");
280                }
281                // Once we are done with server matchmaking
282            }
283            // Server not running or joining so do nothing
284            LanMatchmakerRequest::StopServer => (),
285            LanMatchmakerRequest::StopJoin => (),
286
287            // Join a hosted match
288            LanMatchmakerRequest::JoinServer { addr } => {
289                if let Err(err) = lan_join_server(&matchmaker_channel, addr).await {
290                    warn!("failed to join server: {err:?}");
291                }
292            }
293        }
294    }
295
296    Ok(())
297}
298
299async fn lan_start_server(
300    matchmaker_channel: &BiChannelServer<LanMatchmakerRequest, LanMatchmakerResponse>,
301    mut player_count: u32,
302) -> anyhow::Result<()> {
303    info!("Starting LAN server");
304    matchmaker_channel
305        .send(LanMatchmakerResponse::ServerStarted)
306        .await?;
307
308    let mut connections = Vec::new();
309    let ep = get_network_endpoint().await;
310
311    loop {
312        tokio::select! {
313            next_request = matchmaker_channel.recv() => {
314                match next_request? {
315                    LanMatchmakerRequest::StartServer {
316                        player_count: new_player_count,
317                    } => {
318                        connections.clear();
319                        player_count = new_player_count;
320                    }
321                    LanMatchmakerRequest::StopServer => {
322                        break;
323                    }
324                    LanMatchmakerRequest::StopJoin => {} // Not joining, so don't do anything
325                    LanMatchmakerRequest::JoinServer { .. } => {
326                        anyhow::bail!("Cannot join server while hosting server");
327                    }
328                }
329            }
330
331            // Handle new connections
332            incomming = ep.accept() => {
333                let Some(incomming) = incomming else {
334                    anyhow::bail!("unable to accept new connections");
335                };
336                let result = async move {
337                    let mut connecting = incomming.accept()?;
338                    let alpn = connecting.alpn().await?;
339                    anyhow::ensure!(alpn == PLAY_ALPN, "unexpected ALPN");
340                    let conn = connecting.await?;
341                    anyhow::Ok(conn)
342                };
343
344                match result.await {
345                    Ok(conn) => {
346                        connections.push(conn);
347                        let current_players = connections.len() + 1;
348                        info!(%current_players, "New player connection");
349                    }
350                    Err(err) => {
351                        warn!("failed to accept connection: {:?}", err);
352                        continue;
353                    }
354                }
355            }
356        }
357
358        // Discard closed connections
359        connections.retain(|conn| {
360            if conn.close_reason().is_some() {
361                info!("Player closed connection");
362                false
363            } else {
364                true
365            }
366        });
367
368        let current_players = connections.len();
369        let target_players = player_count;
370        info!(%current_players, %target_players);
371
372        // If we're ready to start a match
373        if connections.len() == (player_count - 1) as usize {
374            info!("All players joined.");
375
376            let endpoint = get_network_endpoint().await;
377
378            // Tell all clients we're ready
379            for (i, conn) in connections.iter().enumerate() {
380                let mut peers = Vec::new();
381                connections
382                    .iter()
383                    .enumerate()
384                    .filter(|x| x.0 != i)
385                    .for_each(|(i, conn)| {
386                        let id = get_remote_node_id(conn).expect("invalid connection");
387                        let mut addr = NodeAddr::new(id);
388                        if let Some(info) = endpoint.remote_info(id) {
389                            if let Some(relay_url) = info.relay_url {
390                                addr = addr.with_relay_url(relay_url.relay_url);
391                            }
392                            addr = addr.with_direct_addresses(
393                                info.addrs.into_iter().map(|addr| addr.addr),
394                            );
395                        }
396
397                        peers.push((u32::try_from(i + 1).unwrap(), addr));
398                    });
399
400                let mut uni = conn.open_uni().await?;
401                uni.write_all(&postcard::to_vec::<_, 20>(&MatchmakerNetMsg::MatchReady {
402                    player_idx: (i + 1).try_into()?,
403                    peers,
404                    player_count,
405                })?)
406                .await?;
407                uni.finish()?;
408                uni.stopped().await?;
409            }
410
411            let connections = connections
412                .into_iter()
413                .enumerate()
414                .map(|(i, c)| (u32::try_from(i + 1).unwrap(), c))
415                .collect();
416
417            // Send the connections to the game so that it can start the network match.
418            matchmaker_channel
419                .send(LanMatchmakerResponse::GameStarting {
420                    socket: Socket::new(0, connections),
421                    player_idx: 0,
422                    player_count,
423                })
424                .await?;
425            info!(player_idx=0, %player_count, "Matchmaking finished");
426
427            // Break out of the server loop
428            break;
429
430            // If we don't have enough players yet, send the updated player count to the game.
431        } else {
432            matchmaker_channel
433                .send(LanMatchmakerResponse::PlayerCount(current_players))
434                .await?;
435        }
436    }
437
438    Ok(())
439}
440
441async fn lan_join_server(
442    matchmaker_channel: &BiChannelServer<LanMatchmakerRequest, LanMatchmakerResponse>,
443    addr: NodeAddr,
444) -> anyhow::Result<()> {
445    let ep = get_network_endpoint().await;
446    let conn = ep.connect(addr, PLAY_ALPN).await?;
447
448    // Wait for match to start
449    let mut uni = conn.accept_uni().await?;
450    let bytes = uni.read_to_end(20).await?;
451    let message: MatchmakerNetMsg = postcard::from_bytes(&bytes)?;
452
453    match message {
454        MatchmakerNetMsg::MatchReady {
455            peers: peer_addrs,
456            player_idx,
457            player_count,
458        } => {
459            info!(%player_count, %player_idx, ?peer_addrs, "Matchmaking finished");
460
461            let peer_connections =
462                establish_peer_connections(player_idx, player_count, peer_addrs, Some(conn))
463                    .await?;
464
465            let socket = Socket::new(player_idx, peer_connections);
466            info!("Connections established.");
467
468            matchmaker_channel
469                .send(LanMatchmakerResponse::GameStarting {
470                    socket,
471                    player_idx,
472                    player_count,
473                })
474                .await?;
475        }
476    }
477    Ok(())
478}
479
480#[derive(Serialize, Deserialize)]
481enum MatchmakerNetMsg {
482    MatchReady {
483        /// The peers they have for the match, with the index in the array being the player index of the peer.
484        peers: Vec<(u32, NodeAddr)>,
485        /// The player index of the player getting the message.
486        player_idx: u32,
487        player_count: u32,
488    },
489}
490
491/// The type of the `LAN_MATCHMAKER` channel.
492#[derive(DerefMut, Deref)]
493pub struct LanMatchmaker(BiChannelClient<LanMatchmakerRequest, LanMatchmakerResponse>);
494
495/// A request that may be sent to the `LAN_MATCHMAKER`.
496#[derive(Debug)]
497pub enum LanMatchmakerRequest {
498    /// Start matchmaker server
499    StartServer {
500        /// match player count
501        player_count: u32,
502    },
503    /// Join server
504    JoinServer {
505        /// Node Addr
506        addr: NodeAddr,
507    },
508    /// Stop matchmaking server
509    StopServer,
510    /// Stop joining match
511    StopJoin,
512}
513
514/// A response that may come from the `LAN_MATCHMAKER`.
515pub enum LanMatchmakerResponse {
516    /// Server started
517    ServerStarted,
518    /// Server player count
519    PlayerCount(usize),
520    /// Game is starting
521    GameStarting {
522        /// Lan socket to game
523        socket: Socket,
524        /// Local player index
525        player_idx: u32,
526        /// Game player count
527        player_count: u32,
528    },
529}
530
531fn pinger(server: BiChannelServer<PingerRequest, PingerResponse>) {
532    while let Ok(servers) = server.recv_blocking() {
533        let mut pings = SmallVec::new();
534        for server in servers {
535            let start = Instant::now();
536            let ping_result =
537                ping_rs::send_ping(&server, Duration::from_secs(2), &[1, 2, 3, 4], None);
538
539            let ping = if let Err(e) = ping_result {
540                warn!("Error pinging {server}: {e:?}");
541                None
542            } else {
543                Some((Instant::now() - start).as_millis() as u16)
544            };
545
546            pings.push((server, ping));
547        }
548        if server.send_blocking(pings).is_err() {
549            break;
550        }
551    }
552}