bones_framework/
networking.rs

1#![doc = include_str!("./networking.md")]
2
3use self::{input::NetworkControls, socket::Socket};
4use crate::input::{DenseControl, DenseInput, DenseInputConfig};
5use crate::networking::online::OnlineMatchmakerResponse;
6pub use crate::networking::random::RngGenerator;
7use crate::prelude::*;
8use bones_matchmaker_proto::{MATCH_ALPN, PLAY_ALPN};
9use ggrs::P2PSession;
10use instant::Duration;
11use once_cell::sync::Lazy;
12use std::{fmt::Debug, marker::PhantomData, sync::Arc};
13use tracing::{debug, error, info, trace, warn};
14
15#[cfg(feature = "net-debug")]
16use {
17    self::debug::{NetworkDebugMessage, PlayerSyncState, NETWORK_DEBUG_CHANNEL},
18    ggrs::{NetworkStats, PlayerHandle},
19};
20
21pub use iroh;
22
23pub mod input;
24pub mod lan;
25pub mod online;
26pub mod online_lobby;
27pub mod online_matchmaking;
28pub mod random;
29pub mod socket;
30
31#[cfg(feature = "net-debug")]
32pub mod debug;
33
34/// Runtime, needed to execute network related calls.
35pub static RUNTIME: Lazy<tokio::runtime::Runtime> =
36    Lazy::new(|| tokio::runtime::Runtime::new().expect("unable to crate tokio runtime"));
37
38/// Indicates if input from networking is confirmed, predicted, or if player is disconnected.
39#[derive(Debug, Copy, Clone, PartialEq, Eq)]
40pub enum NetworkInputStatus {
41    /// The input of this player for this frame is an actual received input.
42    Confirmed,
43    /// The input of this player for this frame is predicted.
44    Predicted,
45    /// The player has disconnected at or prior to this frame, so this input is a dummy.
46    Disconnected,
47}
48
49impl From<ggrs::InputStatus> for NetworkInputStatus {
50    fn from(value: ggrs::InputStatus) -> Self {
51        match value {
52            ggrs::InputStatus::Confirmed => NetworkInputStatus::Confirmed,
53            ggrs::InputStatus::Predicted => NetworkInputStatus::Predicted,
54            ggrs::InputStatus::Disconnected => NetworkInputStatus::Disconnected,
55        }
56    }
57}
58
59/// Module prelude.
60pub mod prelude {
61    pub use super::{
62        input, lan, online, random, DisconnectedPlayers, RngGenerator, SyncingInfo, RUNTIME,
63    };
64
65    #[cfg(feature = "net-debug")]
66    pub use super::debug::prelude::*;
67}
68
69/// Muliplier for framerate that will be used when playing an online match.
70///
71/// Lowering the frame rate a little for online matches reduces bandwidth and may help overall
72/// gameplay. This may not be necessary once we improve network performance.
73///
74/// Note that FPS is provided as an integer to ggrs, so network modified fps is rounded to nearest int,
75/// which is then used to compute timestep so ggrs and networking match.
76pub const NETWORK_FRAME_RATE_FACTOR: f32 = 0.9;
77
78/// Default frame rate to run at if user provides none
79pub const NETWORK_DEFAULT_SIMULATION_FRAME_RATE: f32 = 60.0;
80
81/// Number of frames client may predict beyond confirmed frame before freezing and waiting
82/// for inputs from other players. Default value if not specified in [`GgrsSessionRunnerInfo`].
83pub const NETWORK_MAX_PREDICTION_WINDOW_DEFAULT: usize = 7;
84
85// todo test as zero?
86
87/// Amount of frames GGRS will delay local input.
88pub const NETWORK_LOCAL_INPUT_DELAY_DEFAULT: usize = 2;
89
90/// Possible errors returned by network loop.
91pub enum NetworkError {
92    /// The session was disconnected.
93    Disconnected,
94}
95
96/// The [`ggrs::Config`] implementation used by Jumpy.
97#[derive(Debug)]
98pub struct GgrsConfig<T: DenseInput + Debug> {
99    phantom: PhantomData<T>,
100}
101
102impl<T: DenseInput + Debug> ggrs::Config for GgrsConfig<T> {
103    type Input = T;
104    type State = World;
105    /// Addresses are the same as the player handle for our custom socket.
106    type Address = usize;
107}
108
109/// The network endpoint used for all network communications.
110static NETWORK_ENDPOINT: tokio::sync::OnceCell<iroh::Endpoint> = tokio::sync::OnceCell::const_new();
111
112/// Get the network endpoint used for all communications.
113pub async fn get_network_endpoint() -> &'static iroh::Endpoint {
114    NETWORK_ENDPOINT
115        .get_or_init(|| async move {
116            let secret_key = iroh::key::SecretKey::generate();
117            iroh::Endpoint::builder()
118                .alpns(vec![MATCH_ALPN.to_vec(), PLAY_ALPN.to_vec()])
119                .discovery(Box::new(
120                    iroh::discovery::ConcurrentDiscovery::from_services(vec![
121                        Box::new(
122                            iroh::discovery::local_swarm_discovery::LocalSwarmDiscovery::new(
123                                secret_key.public(),
124                            )
125                            .unwrap(),
126                        ),
127                        Box::new(iroh::discovery::dns::DnsDiscovery::n0_dns()),
128                        Box::new(iroh::discovery::pkarr::PkarrPublisher::n0_dns(
129                            secret_key.clone(),
130                        )),
131                    ]),
132                ))
133                .secret_key(secret_key)
134                .bind()
135                .await
136                .unwrap()
137        })
138        .await
139}
140
141/// Resource containing the [`NetworkSocket`] implementation while there is a connection to a
142/// network game.
143///
144/// This is inserted into the world after a match has been established by a network matchmaker.
145#[derive(Clone, HasSchema, Deref, DerefMut)]
146#[schema(no_default)]
147pub struct NetworkMatchSocket(Arc<dyn NetworkSocket>);
148
149/// Wraps [`ggrs::Message`] with included `match_id`, used to determine if message received
150/// from current match.
151#[derive(Debug, Clone, Serialize, Deserialize)]
152pub struct GameMessage {
153    /// Socket match id
154    pub match_id: u8,
155    /// Wrapped message
156    pub message: ggrs::Message,
157}
158
159/// Automatically implemented for [`NetworkSocket`] + [`ggrs::NonBlockingSocket<usize>`].
160pub trait GgrsSocket: NetworkSocket + ggrs::NonBlockingSocket<usize> {}
161impl<T> GgrsSocket for T where T: NetworkSocket + ggrs::NonBlockingSocket<usize> {}
162
163/// Trait that must be implemented by socket connections establish by matchmakers.
164///
165/// The [`NetworkMatchSocket`] resource will contain an instance of this trait and will be used by
166/// the game to send network messages after a match has been established.
167pub trait NetworkSocket: Sync + Send {
168    /// Get a GGRS socket from this network socket.
169    fn ggrs_socket(&self) -> Socket;
170    /// Send a reliable message to the given [`SocketTarget`].
171    fn send_reliable(&self, target: SocketTarget, message: &[u8]);
172    /// Receive reliable messages from other players. The `usize` is the index of the player that
173    /// sent the message.
174    fn recv_reliable(&self) -> Vec<(u32, Vec<u8>)>;
175    /// Close the connection.
176    fn close(&self);
177    /// Get the player index of the local player.
178    fn player_idx(&self) -> u32;
179    /// Get the player count for this network match.
180    fn player_count(&self) -> u32;
181
182    /// Increment match id so messages from previous match that are still in flight
183    /// will be filtered out. Used when starting new session with existing socket.
184    fn increment_match_id(&mut self);
185}
186
187/// The destination for a reliable network message.
188pub enum SocketTarget {
189    /// Send to a specific player.
190    Player(u32),
191    /// Broadcast to all players.
192    All,
193}
194
195/// Resource updated each frame exposing syncing/networking information in the current session.
196#[derive(HasSchema, Clone)]
197#[schema(no_default)]
198pub struct SyncingInfo {
199    /// Current frame of simulation step
200    current_frame: i32,
201    /// The random seed for this session
202    random_seed: u64,
203    /// The additional online info.
204    online_info: Option<OnlineSyncingInfo>,
205}
206
207/// Holds data for an online session
208#[derive(HasSchema, Clone)]
209#[schema(no_default)]
210pub struct OnlineSyncingInfo {
211    /// Last confirmed frame by all clients.
212    /// Anything that occurred on this frame is agreed upon by all clients.
213    last_confirmed_frame: i32,
214    /// Socket
215    socket: Socket,
216    /// Networking stats for each connected player, stored at the \[player_idx\] index for each respective player.
217    players_network_stats: SVec<PlayerNetworkStats>,
218    /// The local player's index
219    local_player_idx: usize,
220    /// The local input delay set for this session
221    local_frame_delay: usize,
222    /// List of disconnected players (their idx)
223    disconnected_players: SVec<usize>,
224}
225
226impl SyncingInfo {
227    /// Checks if the session is online.
228    pub fn is_online(&self) -> bool {
229        self.online_info.is_some()
230    }
231
232    /// Checks if the session is offline.
233    pub fn is_offline(&self) -> bool {
234        self.online_info.is_none()
235    }
236
237    /// Getter for the current frame (number).
238    pub fn current_frame(&self) -> i32 {
239        self.current_frame
240    }
241
242    /// Getter for the last confirmed frame (number).
243    pub fn last_confirmed_frame(&self) -> i32 {
244        self.online_info
245            .as_ref()
246            .map(|info| info.last_confirmed_frame)
247            .unwrap_or(self.current_frame)
248    }
249    /// Getter for socket.
250    pub fn socket(&self) -> Option<&Socket> {
251        self.online_info.as_ref().map(|info| &info.socket)
252    }
253
254    /// Mutable getter for socket.
255    pub fn socket_mut(&mut self) -> Option<&mut Socket> {
256        self.online_info.as_mut().map(|info| &mut info.socket)
257    }
258
259    /// Getter for a single player's network stats using their player_idx
260    pub fn player_network_stats(&self, player_idx: usize) -> Option<PlayerNetworkStats> {
261        self.online_info
262            .as_ref()
263            .map(|info| info.players_network_stats.get(player_idx).cloned())?
264    }
265
266    /// Getter for all players' network stats, including local player (set to default). This maintains index == player_idx.
267    pub fn players_network_stats(&self) -> SVec<PlayerNetworkStats> {
268        self.online_info
269            .as_ref()
270            .map(|info| info.players_network_stats.clone())
271            .unwrap_or_default()
272    }
273
274    /// Getter for remote player network stats (filtering out local player). This does not maintain index == player_idx.
275    pub fn remote_players_network_stats(&self) -> SVec<PlayerNetworkStats> {
276        self.online_info
277            .as_ref()
278            .map(|info| {
279                info.players_network_stats
280                    .iter()
281                    .filter(|&stats| stats.ping != 0 || stats.kbps_sent != 0)
282                    .cloned()
283                    .collect()
284            })
285            .unwrap_or_default()
286    }
287
288    /// Calculates the total kilobits per second sent across all remote players.
289    pub fn total_kbps_sent(&self) -> usize {
290        self.remote_players_network_stats()
291            .iter()
292            .map(|stats| stats.kbps_sent)
293            .sum()
294    }
295
296    /// Calculates the average kilobits per second sent across all remote players.
297    pub fn averaged_kbps_sent(&self) -> f32 {
298        let remote_stats = self.remote_players_network_stats();
299        if remote_stats.is_empty() {
300            0.0
301        } else {
302            let total_kbps: usize = remote_stats.iter().map(|stats| stats.kbps_sent).sum();
303            total_kbps as f32 / remote_stats.len() as f32
304        }
305    }
306
307    /// Returns the highest number of local frames behind across all remote players.
308    pub fn highest_local_frames_behind(&self) -> i32 {
309        self.remote_players_network_stats()
310            .iter()
311            .map(|stats| stats.local_frames_behind)
312            .max()
313            .unwrap_or(0)
314    }
315
316    /// Returns the highest number of remote frames behind across all remote players.
317    pub fn highest_remote_frames_behind(&self) -> i32 {
318        self.remote_players_network_stats()
319            .iter()
320            .map(|stats| stats.remote_frames_behind)
321            .max()
322            .unwrap_or(0)
323    }
324
325    /// Calculates the average ping across all remote players.
326    pub fn averaged_ping(&self) -> u128 {
327        let remote_stats = self.remote_players_network_stats();
328        if remote_stats.is_empty() {
329            0
330        } else {
331            let total_ping: u128 = remote_stats.iter().map(|stats| stats.ping).sum();
332            total_ping / remote_stats.len() as u128
333        }
334    }
335
336    /// Returns the lowest ping across all remote players.
337    pub fn lowest_ping(&self) -> u128 {
338        self.remote_players_network_stats()
339            .iter()
340            .map(|stats| stats.ping)
341            .min()
342            .unwrap_or(0)
343    }
344
345    /// Returns the highest ping across all remote players.
346    pub fn highest_ping(&self) -> u128 {
347        self.remote_players_network_stats()
348            .iter()
349            .map(|stats| stats.ping)
350            .max()
351            .unwrap_or(0)
352    }
353
354    /// Getter for the local player index, if offline defaults to None.
355    pub fn local_player_idx_checked(&self) -> Option<usize> {
356        self.online_info.as_ref().map(|info| info.local_player_idx)
357    }
358
359    /// Getter for the local player index, if offline defaults to 0.
360    pub fn local_player_idx(&self) -> usize {
361        self.online_info
362            .as_ref()
363            .map(|info| info.local_player_idx)
364            .unwrap_or(0)
365    }
366
367    /// Getter for the local frame delay.
368    pub fn local_frame_delay(&self) -> usize {
369        self.online_info
370            .as_ref()
371            .map(|info| info.local_frame_delay)
372            .unwrap_or(0)
373    }
374
375    /// Getter for the number of players, if offline defaults to 0.
376    pub fn players_count(&self) -> usize {
377        self.online_info
378            .as_ref()
379            .map(|info| info.players_network_stats.len())
380            .unwrap_or(0)
381    }
382
383    /// Getter for the number of players, if offline defaults to None.
384    pub fn players_count_checked(&self) -> Option<usize> {
385        self.online_info
386            .as_ref()
387            .map(|info| info.players_network_stats.len())
388    }
389
390    /// Getter for the list of active players (idx) which are connected. Offline returns empty list.
391    pub fn active_players(&self) -> SVec<usize> {
392        self.online_info
393            .as_ref()
394            .map(|info| {
395                let total_players = info.players_network_stats.len();
396                (0..total_players)
397                    .filter(|&id| !info.disconnected_players.contains(&id))
398                    .collect()
399            })
400            .unwrap_or_default()
401    }
402
403    /// Getter for the list of active players (idx) which are connected. Offline returns None.
404    pub fn active_players_checked(&self) -> Option<SVec<usize>> {
405        self.online_info.as_ref().map(|info| {
406            let total_players = info.players_network_stats.len();
407            (0..total_players)
408                .filter(|&id| !info.disconnected_players.contains(&id))
409                .collect()
410        })
411    }
412
413    /// Getter for the list of players which have been disconnected (their idx). Offline returns empty list.
414    pub fn disconnected_players(&self) -> SVec<usize> {
415        self.online_info
416            .as_ref()
417            .map(|info| info.disconnected_players.clone())
418            .unwrap_or_default()
419    }
420
421    /// Getter for the list of players which have been disconnected (their idx). Offline returns None.
422    pub fn disconnected_players_checked(&self) -> Option<SVec<usize>> {
423        self.online_info
424            .as_ref()
425            .map(|info| info.disconnected_players.clone())
426    }
427
428    /// Getter for the random seed.
429    pub fn random_seed(&self) -> u64 {
430        self.random_seed
431    }
432}
433
434/// Resource tracking which players have been disconnected.
435/// May not be in world if no disconnects.
436///
437/// If rollback to frame before disconnect, player handle is still included here.
438#[derive(HasSchema, Clone, Default)]
439pub struct DisconnectedPlayers {
440    /// Handles of players that have been disconnected.
441    pub disconnected_players: Vec<usize>,
442}
443
444/// [`SessionRunner`] implementation that uses [`ggrs`] for network play.
445///
446/// This is where the whole `ggrs` integration is implemented.
447pub struct GgrsSessionRunner<'a, InputTypes: DenseInputConfig<'a>> {
448    /// The last player input we detected.
449    pub last_player_input: InputTypes::Dense,
450
451    /// The GGRS peer-to-peer session.
452    pub session: P2PSession<GgrsConfig<InputTypes::Dense>>,
453
454    /// Local player idx.
455    pub player_idx: u32,
456
457    /// Index of local player, computed from player_is_local
458    pub local_player_idx: u32,
459
460    /// The frame time accumulator, used to produce a fixed refresh rate.
461    pub accumulator: f64,
462
463    /// Timestamp of last time session was run to compute delta time.
464    pub last_run: Option<Instant>,
465
466    /// FPS from game adjusted with constant network factor (may be slightly slower)
467    pub network_fps: f64,
468
469    /// FPS from game not adjusted with network factor.
470    pub original_fps: f64,
471
472    /// Session runner's input collector.
473    pub input_collector: InputTypes::InputCollector,
474
475    /// Is local input disabled? (No input will be used if set)
476    pub local_input_disabled: bool,
477
478    /// Players who have been reported disconnected by ggrs
479    disconnected_players: Vec<usize>,
480
481    /// Store copy of socket to be able to restart session runner with existing socket.
482    socket: Socket,
483
484    /// Local input delay ggrs session was initialized with
485    local_input_delay: usize,
486
487    /// The random seed used for this session
488    pub random_seed: u64,
489}
490
491/// The info required to create a [`GgrsSessionRunner`].
492#[derive(Clone)]
493pub struct GgrsSessionRunnerInfo {
494    /// The socket that will be converted into GGRS socket implementation.
495    pub socket: Socket,
496    /// The local player idx
497    pub player_idx: u32,
498    /// the player count.
499    pub player_count: u32,
500
501    /// Max prediction window (max number of frames client may predict ahead of last confirmed frame)
502    /// `None` will use Bone's default.
503    pub max_prediction_window: Option<usize>,
504
505    /// Local input delay (local inputs + remote inputs will be buffered and sampled this many frames later)
506    /// Increasing helps with mitigating pops when remote user changes input quickly, and reduces amount of frames
507    /// client will end up predicted ahead from others, helps with high latency.
508    ///
509    /// `None` will use Bone's default.
510    pub local_input_delay: Option<usize>,
511    /// The random seed used for this session
512    pub random_seed: u64,
513}
514
515impl GgrsSessionRunnerInfo {
516    /// See [`GgrsSessionRunnerInfo`] fields for info on arguments.
517    pub fn new(
518        socket: Socket,
519        max_prediction_window: Option<usize>,
520        local_input_delay: Option<usize>,
521        random_seed: u64,
522    ) -> Self {
523        let player_idx = socket.player_idx();
524        let player_count = socket.player_count();
525        Self {
526            socket,
527            player_idx,
528            player_count,
529            max_prediction_window,
530            local_input_delay,
531            random_seed,
532        }
533    }
534}
535
536impl<'a, InputTypes> GgrsSessionRunner<'a, InputTypes>
537where
538    InputTypes: DenseInputConfig<'a>,
539{
540    /// Creates a new session runner from a `OnlineMatchmakerResponse::GameStarting`
541    /// Any input values set as `None` will be set to default.
542    /// If response is not `GameStarting` returns None.
543    pub fn new_networked_game_starting(
544        target_fps: Option<f32>,
545        max_prediction_window: Option<usize>,
546        local_input_delay: Option<usize>,
547        matchmaker_resp_game_starting: OnlineMatchmakerResponse,
548    ) -> Option<Self> {
549        if let OnlineMatchmakerResponse::GameStarting {
550            socket,
551            player_idx: _,
552            player_count: _,
553            random_seed,
554        } = matchmaker_resp_game_starting
555        {
556            Some(Self::new(
557                target_fps,
558                GgrsSessionRunnerInfo::new(
559                    socket.ggrs_socket(),
560                    max_prediction_window,
561                    local_input_delay,
562                    random_seed,
563                ),
564            ))
565        } else {
566            None
567        }
568    }
569
570    /// Creates a new session runner from scratch.
571    pub fn new(target_fps: Option<f32>, info: GgrsSessionRunnerInfo) -> Self
572    where
573        Self: Sized,
574    {
575        let simulation_fps = target_fps.unwrap_or(NETWORK_DEFAULT_SIMULATION_FRAME_RATE);
576
577        // Modified FPS may not be an integer, but ggrs requires integer fps, so we clamp and round
578        // to integer so our computed timestep will match  that of ggrs.
579        let network_fps = (simulation_fps * NETWORK_FRAME_RATE_FACTOR) as f64;
580        let network_fps = network_fps
581            .max(usize::MIN as f64)
582            .min(usize::MAX as f64)
583            .round() as usize;
584
585        // There may be value in dynamically negotitaing these values based on client's pings
586        // before starting the match.
587        let max_prediction = info
588            .max_prediction_window
589            .unwrap_or(NETWORK_MAX_PREDICTION_WINDOW_DEFAULT);
590        let local_input_delay = info
591            .local_input_delay
592            .unwrap_or(NETWORK_LOCAL_INPUT_DELAY_DEFAULT);
593
594        // Notify debugger of setting
595        #[cfg(feature = "net-debug")]
596        NETWORK_DEBUG_CHANNEL
597            .sender
598            .try_send(NetworkDebugMessage::SetMaxPrediction(max_prediction))
599            .unwrap();
600
601        let mut builder = ggrs::SessionBuilder::new()
602            .with_num_players(info.player_count as usize)
603            .with_input_delay(local_input_delay)
604            .with_fps(network_fps)
605            .unwrap()
606            .with_max_prediction_window(max_prediction)
607            .unwrap();
608
609        let local_player_idx = info.player_idx;
610        for i in 0..info.player_count {
611            if i == info.player_idx {
612                builder = builder
613                    .add_player(ggrs::PlayerType::Local, i as usize)
614                    .unwrap();
615            } else {
616                builder = builder
617                    .add_player(ggrs::PlayerType::Remote(i as usize), i as usize)
618                    .unwrap();
619            }
620        }
621
622        let session = builder.start_p2p_session(info.socket.clone()).unwrap();
623
624        Self {
625            last_player_input: InputTypes::Dense::default(),
626            session,
627            player_idx: info.player_idx,
628            local_player_idx,
629            accumulator: default(),
630            last_run: None,
631            network_fps: network_fps as f64,
632            original_fps: simulation_fps as f64,
633            disconnected_players: default(),
634            input_collector: InputTypes::InputCollector::default(),
635            socket: info.socket.clone(),
636            local_input_delay,
637            local_input_disabled: false,
638            random_seed: info.random_seed,
639        }
640    }
641}
642
643impl<InputTypes> SessionRunner for GgrsSessionRunner<'static, InputTypes>
644where
645    InputTypes: DenseInputConfig<'static> + 'static,
646{
647    fn step(&mut self, frame_start: Instant, world: &mut World, stages: &mut SystemStages) {
648        let step: f64 = 1.0 / self.network_fps;
649
650        let last_run = self.last_run.unwrap_or(frame_start);
651        let delta = (frame_start - last_run).as_secs_f64();
652        self.accumulator += delta;
653
654        let mut skip_frames: u32 = 0;
655
656        // Collect inputs and update controls
657        self.input_collector.apply_inputs(world);
658        self.input_collector.update_just_pressed();
659
660        // save local players dense input for use with ggrs
661        self.last_player_input = self.input_collector.get_control().get_dense_input();
662
663        #[cfg(feature = "net-debug")]
664        // Current frame before we start network update loop
665        let current_frame_original = self.session.current_frame();
666
667        for event in self.session.events() {
668            match event {
669                ggrs::GgrsEvent::Synchronizing { addr, total, count } => {
670                    info!(player=%addr, %total, progress=%count, "Syncing network player");
671
672                    #[cfg(feature = "net-debug")]
673                    NETWORK_DEBUG_CHANNEL
674                        .sender
675                        .try_send(NetworkDebugMessage::PlayerSync((
676                            PlayerSyncState::SyncInProgress,
677                            addr,
678                        )))
679                        .unwrap();
680                }
681                ggrs::GgrsEvent::Synchronized { addr } => {
682                    info!(player=%addr, "Syncrhonized network client");
683
684                    #[cfg(feature = "net-debug")]
685                    NETWORK_DEBUG_CHANNEL
686                        .sender
687                        .try_send(NetworkDebugMessage::PlayerSync((
688                            PlayerSyncState::Sychronized,
689                            addr,
690                        )))
691                        .unwrap();
692                }
693                ggrs::GgrsEvent::Disconnected { addr } => {
694                    warn!(player=%addr, "Player Disconnected");
695                    self.disconnected_players.push(addr);
696
697                    #[cfg(feature = "net-debug")]
698                    NETWORK_DEBUG_CHANNEL
699                        .sender
700                        .try_send(NetworkDebugMessage::DisconnectedPlayers(
701                            self.disconnected_players.clone(),
702                        ))
703                        .unwrap();
704                } //return Err(SessionError::Disconnected)},
705                ggrs::GgrsEvent::NetworkInterrupted { addr, .. } => {
706                    info!(player=%addr, "Network player interrupted");
707                }
708                ggrs::GgrsEvent::NetworkResumed { addr } => {
709                    info!(player=%addr, "Network player re-connected");
710                }
711                ggrs::GgrsEvent::WaitRecommendation {
712                    skip_frames: skip_count,
713                } => {
714                    info!(
715                        "Skipping {skip_count} frames to give network players a chance to catch up"
716                    );
717                    skip_frames = skip_count;
718
719                    #[cfg(feature = "net-debug")]
720                    NETWORK_DEBUG_CHANNEL
721                        .sender
722                        .try_send(NetworkDebugMessage::SkipFrame {
723                            frame: current_frame_original,
724                            count: skip_count,
725                        })
726                        .unwrap();
727                }
728                ggrs::GgrsEvent::DesyncDetected {
729                    frame,
730                    local_checksum,
731                    remote_checksum,
732                    addr,
733                } => {
734                    error!(%frame, %local_checksum, %remote_checksum, player=%addr, "Network de-sync detected");
735                }
736            }
737        }
738
739        loop {
740            if self.accumulator >= step {
741                self.accumulator -= step;
742
743                if !self.local_input_disabled {
744                    self.session
745                        .add_local_input(self.local_player_idx as usize, self.last_player_input)
746                        .unwrap();
747                } else {
748                    // If local input is disabled, we still submit a default value representing no-inputs.
749                    // This way if input is disabled current inputs will not be held down indefinitely.
750                    self.session
751                        .add_local_input(
752                            self.local_player_idx as usize,
753                            InputTypes::Dense::default(),
754                        )
755                        .unwrap();
756                }
757
758                #[cfg(feature = "net-debug")]
759                {
760                    let current_frame = self.session.current_frame();
761                    let confirmed_frame = self.session.confirmed_frame();
762
763                    NETWORK_DEBUG_CHANNEL
764                        .sender
765                        .try_send(NetworkDebugMessage::FrameUpdate {
766                            current: current_frame,
767                            last_confirmed: confirmed_frame,
768                        })
769                        .unwrap();
770                }
771
772                if skip_frames > 0 {
773                    skip_frames = skip_frames.saturating_sub(1);
774                    continue;
775                }
776
777                match self.session.advance_frame() {
778                    Ok(requests) => {
779                        for request in requests {
780                            match request {
781                                ggrs::GgrsRequest::SaveGameState { cell, frame } => {
782                                    cell.save(frame, Some(world.clone()), None)
783                                }
784                                ggrs::GgrsRequest::LoadGameState { cell, .. } => {
785                                    world.load_snapshot(cell.load().unwrap_or_default());
786                                }
787                                ggrs::GgrsRequest::AdvanceFrame {
788                                    inputs: network_inputs,
789                                } => {
790                                    // Input has been consumed, signal that we are in new input frame
791                                    self.input_collector.advance_frame();
792
793                                    // Fetch the PlayerNetworkStats for each remote player, guaranteeing each one is inserted into the index matching its handle
794                                    let mut players_network_stats: Vec<PlayerNetworkStats> = vec![
795                                        PlayerNetworkStats::default();
796                                        self.session.remote_player_handles().len() + 1 // + 1 for the local player to maintain correct length
797                                    ];
798                                    for handle in self.session.remote_player_handles().iter() {
799                                        if let Ok(stats) = self.session.network_stats(*handle) {
800                                            players_network_stats[*handle] =
801                                                PlayerNetworkStats::from_ggrs_network_stats(
802                                                    *handle, stats,
803                                                );
804                                        }
805                                    }
806
807                                    // Create and insert the RngGenerator resource if it doesn't exist
808                                    if world.resources.get::<RngGenerator>().is_none() {
809                                        let rng_generator = RngGenerator::new(self.random_seed);
810                                        world.insert_resource(rng_generator);
811                                    }
812
813                                    // TODO: Make sure SyncingInfo is initialized immediately when session is created,
814                                    // even before a frame has advanced.
815                                    // The existance of this resource may be used to determine if in an online match, and there could
816                                    // be race if expected it to exist but testing before first frame advance.
817                                    world.insert_resource(SyncingInfo {
818                                        current_frame: self.session.current_frame(),
819                                        random_seed: self.random_seed,
820                                        online_info: Some(OnlineSyncingInfo {
821                                            last_confirmed_frame: self.session.confirmed_frame(),
822                                            socket: self.socket.clone(),
823                                            players_network_stats: players_network_stats.into(),
824                                            local_player_idx: self.local_player_idx as usize,
825                                            local_frame_delay: self.local_input_delay,
826                                            disconnected_players: self
827                                                .disconnected_players
828                                                .clone()
829                                                .into(),
830                                        }),
831                                    });
832
833                                    // Disconnected players persisted on session runner, and updated each frame.
834                                    // This avoids a rollback from changing resource state.
835                                    world.insert_resource(DisconnectedPlayers {
836                                        disconnected_players: self.disconnected_players.clone(),
837                                    });
838
839                                    {
840                                        world
841                                            .resource_mut::<Time>()
842                                            .advance_exact(Duration::from_secs_f64(step));
843
844                                        // update game controls from ggrs inputs
845                                        let mut player_inputs =
846                                            world.resource_mut::<InputTypes::Controls>();
847                                        for (player_idx, (input, status)) in
848                                            network_inputs.into_iter().enumerate()
849                                        {
850                                            trace!(
851                                                "Net player({player_idx}) local: {}, status: {status:?}, input: {:?}",
852                                                self.local_player_idx as usize == player_idx,
853                                                input
854                                            );
855                                            player_inputs.network_update(
856                                                player_idx,
857                                                &input,
858                                                status.into(),
859                                            );
860                                        }
861                                    }
862
863                                    // Run game session stages, advancing simulation
864                                    stages.run(world);
865
866                                    // Handle any triggered resets of world + preserve runner's managed resources like RngGenerator.
867                                    if world.reset_triggered() {
868                                        let rng = world
869                                            .get_resource::<RngGenerator>()
870                                            .map(|r| (*r).clone());
871                                        if let Some(rng) = rng {
872                                            if let Some(mut reset) =
873                                                world.get_resource_mut::<ResetWorld>()
874                                            {
875                                                reset.insert_reset_resource(rng);
876                                            }
877                                        }
878
879                                        world.handle_world_reset(stages);
880                                    }
881                                }
882                            }
883                        }
884                    }
885                    Err(e) => match e {
886                        ggrs::GgrsError::NotSynchronized => {
887                            debug!("Waiting for network clients to sync")
888                        }
889                        ggrs::GgrsError::PredictionThreshold => {
890                            warn!("Freezing game while waiting for network to catch-up.");
891
892                            #[cfg(feature = "net-debug")]
893                            NETWORK_DEBUG_CHANNEL
894                                .sender
895                                .try_send(NetworkDebugMessage::FrameFroze {
896                                    frame: self.session.current_frame(),
897                                })
898                                .unwrap();
899                        }
900                        e => error!("Network protocol error: {e}"),
901                    },
902                }
903            } else {
904                break;
905            }
906        }
907
908        self.last_run = Some(frame_start);
909
910        // Fetch GGRS network stats of remote players and send to net debug tool
911        #[cfg(feature = "net-debug")]
912        {
913            let mut network_stats: Vec<(PlayerHandle, NetworkStats)> = vec![];
914            for handle in self.session.remote_player_handles().iter() {
915                if let Ok(stats) = self.session.network_stats(*handle) {
916                    network_stats.push((*handle, stats));
917                }
918            }
919            if !network_stats.is_empty() {
920                NETWORK_DEBUG_CHANNEL
921                    .sender
922                    .try_send(NetworkDebugMessage::NetworkStats { network_stats })
923                    .unwrap();
924            }
925        }
926    }
927
928    fn restart_session(&mut self) {
929        // Rebuild session info from runner + create new runner
930
931        // Increment match id so messages from previous match that are still in flight
932        // will be filtered out.
933        self.socket.increment_match_id();
934
935        let runner_info = GgrsSessionRunnerInfo {
936            socket: self.socket.clone(),
937            player_idx: self.player_idx,
938            player_count: self.session.num_players().try_into().unwrap(),
939            max_prediction_window: Some(self.session.max_prediction()),
940            local_input_delay: Some(self.local_input_delay),
941            random_seed: self.random_seed,
942        };
943        *self = GgrsSessionRunner::new(Some(self.original_fps as f32), runner_info);
944    }
945
946    fn disable_local_input(&mut self, input_disabled: bool) {
947        self.local_input_disabled = input_disabled;
948    }
949}
950
951/// A schema-compatible wrapper for ggrs `NetworkStats` struct contains networking stats.
952#[derive(Debug, Default, Clone, Copy, HasSchema)]
953pub struct PlayerNetworkStats {
954    /// The idx of the player these stats are for. Included here for self-attesting/ease-of-access.
955    pub player_idx: usize,
956    /// The length of the queue containing UDP packets which have not yet been acknowledged by the end client.
957    /// The length of the send queue is a rough indication of the quality of the connection. The longer the send queue, the higher the round-trip time between the
958    /// clients. The send queue will also be longer than usual during high packet loss situations.
959    pub send_queue_len: usize,
960    /// The roundtrip packet transmission time as calculated by GGRS.
961    pub ping: u128,
962    /// The estimated bandwidth used between the two clients, in kilobits per second.
963    pub kbps_sent: usize,
964
965    /// The number of frames GGRS calculates that the local client is behind the remote client at this instant in time.
966    /// For example, if at this instant the current game client is running frame 1002 and the remote game client is running frame 1009,
967    /// this value will mostly likely roughly equal 7.
968    pub local_frames_behind: i32,
969    /// The same as [`local_frames_behind`], but calculated from the perspective of the remote player.
970    ///
971    /// [`local_frames_behind`]: #structfield.local_frames_behind
972    pub remote_frames_behind: i32,
973}
974
975impl PlayerNetworkStats {
976    /// Creates a new PlayerNetworkStats from a player index and a ggrs NetworkStats.
977    pub fn from_ggrs_network_stats(player_idx: usize, stats: ggrs::NetworkStats) -> Self {
978        Self {
979            player_idx,
980            send_queue_len: stats.send_queue_len,
981            ping: stats.ping,
982            kbps_sent: stats.kbps_sent,
983            local_frames_behind: stats.local_frames_behind,
984            remote_frames_behind: stats.remote_frames_behind,
985        }
986    }
987}