bones_framework/
networking.rs

1#![doc = include_str!("./networking.md")]
2
3use self::{
4    input::{DenseInput, NetworkInputConfig, NetworkPlayerControl, NetworkPlayerControls},
5    socket::Socket,
6};
7use crate::networking::online::OnlineMatchmakerResponse;
8pub use crate::networking::random::RngGenerator;
9use crate::prelude::*;
10use bones_matchmaker_proto::{MATCH_ALPN, PLAY_ALPN};
11use ggrs::P2PSession;
12use instant::Duration;
13use once_cell::sync::Lazy;
14use std::{fmt::Debug, marker::PhantomData, sync::Arc};
15use tracing::{debug, error, info, trace, warn};
16
17#[cfg(feature = "net-debug")]
18use {
19    self::debug::{NetworkDebugMessage, PlayerSyncState, NETWORK_DEBUG_CHANNEL},
20    ggrs::{NetworkStats, PlayerHandle},
21};
22
23use crate::input::PlayerControls as PlayerControlsTrait;
24
25pub use iroh;
26
27pub mod input;
28pub mod lan;
29pub mod online;
30pub mod online_lobby;
31pub mod online_matchmaking;
32pub mod proto;
33pub mod random;
34pub mod socket;
35
36#[cfg(feature = "net-debug")]
37pub mod debug;
38
39/// Runtime, needed to execute network related calls.
40pub static RUNTIME: Lazy<tokio::runtime::Runtime> =
41    Lazy::new(|| tokio::runtime::Runtime::new().expect("unable to crate tokio runtime"));
42
43/// Indicates if input from networking is confirmed, predicted, or if player is disconnected.
44#[derive(Debug, Copy, Clone, PartialEq, Eq)]
45pub enum NetworkInputStatus {
46    /// The input of this player for this frame is an actual received input.
47    Confirmed,
48    /// The input of this player for this frame is predicted.
49    Predicted,
50    /// The player has disconnected at or prior to this frame, so this input is a dummy.
51    Disconnected,
52}
53
54impl From<ggrs::InputStatus> for NetworkInputStatus {
55    fn from(value: ggrs::InputStatus) -> Self {
56        match value {
57            ggrs::InputStatus::Confirmed => NetworkInputStatus::Confirmed,
58            ggrs::InputStatus::Predicted => NetworkInputStatus::Predicted,
59            ggrs::InputStatus::Disconnected => NetworkInputStatus::Disconnected,
60        }
61    }
62}
63
64/// Module prelude.
65pub mod prelude {
66    pub use super::{
67        input, lan, online, proto, random, DisconnectedPlayers, RngGenerator, SyncingInfo, RUNTIME,
68    };
69
70    #[cfg(feature = "net-debug")]
71    pub use super::debug::prelude::*;
72}
73
74/// Muliplier for framerate that will be used when playing an online match.
75///
76/// Lowering the frame rate a little for online matches reduces bandwidth and may help overall
77/// gameplay. This may not be necessary once we improve network performance.
78///
79/// Note that FPS is provided as an integer to ggrs, so network modified fps is rounded to nearest int,
80/// which is then used to compute timestep so ggrs and networking match.
81pub const NETWORK_FRAME_RATE_FACTOR: f32 = 0.9;
82
83/// Default frame rate to run at if user provides none
84pub const NETWORK_DEFAULT_SIMULATION_FRAME_RATE: f32 = 60.0;
85
86/// Number of frames client may predict beyond confirmed frame before freezing and waiting
87/// for inputs from other players. Default value if not specified in [`GgrsSessionRunnerInfo`].
88pub const NETWORK_MAX_PREDICTION_WINDOW_DEFAULT: usize = 7;
89
90// todo test as zero?
91
92/// Amount of frames GGRS will delay local input.
93pub const NETWORK_LOCAL_INPUT_DELAY_DEFAULT: usize = 2;
94
95/// Possible errors returned by network loop.
96pub enum NetworkError {
97    /// The session was disconnected.
98    Disconnected,
99}
100
101/// The [`ggrs::Config`] implementation used by Jumpy.
102#[derive(Debug)]
103pub struct GgrsConfig<T: DenseInput + Debug> {
104    phantom: PhantomData<T>,
105}
106
107impl<T: DenseInput + Debug> ggrs::Config for GgrsConfig<T> {
108    type Input = T;
109    type State = World;
110    /// Addresses are the same as the player handle for our custom socket.
111    type Address = usize;
112}
113
114/// The network endpoint used for all network communications.
115static NETWORK_ENDPOINT: tokio::sync::OnceCell<iroh::Endpoint> = tokio::sync::OnceCell::const_new();
116
117/// Get the network endpoint used for all communications.
118pub async fn get_network_endpoint() -> &'static iroh::Endpoint {
119    NETWORK_ENDPOINT
120        .get_or_init(|| async move {
121            let secret_key = iroh::key::SecretKey::generate();
122            iroh::Endpoint::builder()
123                .alpns(vec![MATCH_ALPN.to_vec(), PLAY_ALPN.to_vec()])
124                .discovery(Box::new(
125                    iroh::discovery::ConcurrentDiscovery::from_services(vec![
126                        Box::new(
127                            iroh::discovery::local_swarm_discovery::LocalSwarmDiscovery::new(
128                                secret_key.public(),
129                            )
130                            .unwrap(),
131                        ),
132                        Box::new(iroh::discovery::dns::DnsDiscovery::n0_dns()),
133                        Box::new(iroh::discovery::pkarr::PkarrPublisher::n0_dns(
134                            secret_key.clone(),
135                        )),
136                    ]),
137                ))
138                .secret_key(secret_key)
139                .bind()
140                .await
141                .unwrap()
142        })
143        .await
144}
145
146/// Resource containing the [`NetworkSocket`] implementation while there is a connection to a
147/// network game.
148///
149/// This is inserted into the world after a match has been established by a network matchmaker.
150#[derive(Clone, HasSchema, Deref, DerefMut)]
151#[schema(no_default)]
152pub struct NetworkMatchSocket(Arc<dyn NetworkSocket>);
153
154/// Wraps [`ggrs::Message`] with included `match_id`, used to determine if message received
155/// from current match.
156#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct GameMessage {
158    /// Socket match id
159    pub match_id: u8,
160    /// Wrapped message
161    pub message: ggrs::Message,
162}
163
164/// Automatically implemented for [`NetworkSocket`] + [`ggrs::NonBlockingSocket<usize>`].
165pub trait GgrsSocket: NetworkSocket + ggrs::NonBlockingSocket<usize> {}
166impl<T> GgrsSocket for T where T: NetworkSocket + ggrs::NonBlockingSocket<usize> {}
167
168/// Trait that must be implemented by socket connections establish by matchmakers.
169///
170/// The [`NetworkMatchSocket`] resource will contain an instance of this trait and will be used by
171/// the game to send network messages after a match has been established.
172pub trait NetworkSocket: Sync + Send {
173    /// Get a GGRS socket from this network socket.
174    fn ggrs_socket(&self) -> Socket;
175    /// Send a reliable message to the given [`SocketTarget`].
176    fn send_reliable(&self, target: SocketTarget, message: &[u8]);
177    /// Receive reliable messages from other players. The `usize` is the index of the player that
178    /// sent the message.
179    fn recv_reliable(&self) -> Vec<(u32, Vec<u8>)>;
180    /// Close the connection.
181    fn close(&self);
182    /// Get the player index of the local player.
183    fn player_idx(&self) -> u32;
184    /// Get the player count for this network match.
185    fn player_count(&self) -> u32;
186
187    /// Increment match id so messages from previous match that are still in flight
188    /// will be filtered out. Used when starting new session with existing socket.
189    fn increment_match_id(&mut self);
190}
191
192/// The destination for a reliable network message.
193pub enum SocketTarget {
194    /// Send to a specific player.
195    Player(u32),
196    /// Broadcast to all players.
197    All,
198}
199
200/// Resource updated each frame exposing syncing/networking information in the current session.
201#[derive(HasSchema, Clone)]
202#[schema(no_default)]
203pub struct SyncingInfo {
204    /// Current frame of simulation step
205    current_frame: i32,
206    /// The random seed for this session
207    random_seed: u64,
208    /// The additional online info.
209    online_info: Option<OnlineSyncingInfo>,
210}
211
212/// Holds data for an online session
213#[derive(HasSchema, Clone)]
214#[schema(no_default)]
215pub struct OnlineSyncingInfo {
216    /// Last confirmed frame by all clients.
217    /// Anything that occurred on this frame is agreed upon by all clients.
218    last_confirmed_frame: i32,
219    /// Socket
220    socket: Socket,
221    /// Networking stats for each connected player, stored at the \[player_idx\] index for each respective player.
222    players_network_stats: SVec<PlayerNetworkStats>,
223    /// The local player's index
224    local_player_idx: usize,
225    /// The local input delay set for this session
226    local_frame_delay: usize,
227    /// List of disconnected players (their idx)
228    disconnected_players: SVec<usize>,
229}
230
231impl SyncingInfo {
232    /// Checks if the session is online.
233    pub fn is_online(&self) -> bool {
234        self.online_info.is_some()
235    }
236
237    /// Checks if the session is offline.
238    pub fn is_offline(&self) -> bool {
239        self.online_info.is_none()
240    }
241
242    /// Getter for the current frame (number).
243    pub fn current_frame(&self) -> i32 {
244        self.current_frame
245    }
246
247    /// Getter for the last confirmed frame (number).
248    pub fn last_confirmed_frame(&self) -> i32 {
249        self.online_info
250            .as_ref()
251            .map(|info| info.last_confirmed_frame)
252            .unwrap_or(self.current_frame)
253    }
254    /// Getter for socket.
255    pub fn socket(&self) -> Option<&Socket> {
256        self.online_info.as_ref().map(|info| &info.socket)
257    }
258
259    /// Mutable getter for socket.
260    pub fn socket_mut(&mut self) -> Option<&mut Socket> {
261        self.online_info.as_mut().map(|info| &mut info.socket)
262    }
263
264    /// Getter for a single player's network stats using their player_idx
265    pub fn player_network_stats(&self, player_idx: usize) -> Option<PlayerNetworkStats> {
266        self.online_info
267            .as_ref()
268            .map(|info| info.players_network_stats.get(player_idx).cloned())?
269    }
270
271    /// Getter for all players' network stats, including local player (set to default). This maintains index == player_idx.
272    pub fn players_network_stats(&self) -> SVec<PlayerNetworkStats> {
273        self.online_info
274            .as_ref()
275            .map(|info| info.players_network_stats.clone())
276            .unwrap_or_default()
277    }
278
279    /// Getter for remote player network stats (filtering out local player). This does not maintain index == player_idx.
280    pub fn remote_players_network_stats(&self) -> SVec<PlayerNetworkStats> {
281        self.online_info
282            .as_ref()
283            .map(|info| {
284                info.players_network_stats
285                    .iter()
286                    .filter(|&stats| stats.ping != 0 || stats.kbps_sent != 0)
287                    .cloned()
288                    .collect()
289            })
290            .unwrap_or_default()
291    }
292
293    /// Calculates the total kilobits per second sent across all remote players.
294    pub fn total_kbps_sent(&self) -> usize {
295        self.remote_players_network_stats()
296            .iter()
297            .map(|stats| stats.kbps_sent)
298            .sum()
299    }
300
301    /// Calculates the average kilobits per second sent across all remote players.
302    pub fn averaged_kbps_sent(&self) -> f32 {
303        let remote_stats = self.remote_players_network_stats();
304        if remote_stats.is_empty() {
305            0.0
306        } else {
307            let total_kbps: usize = remote_stats.iter().map(|stats| stats.kbps_sent).sum();
308            total_kbps as f32 / remote_stats.len() as f32
309        }
310    }
311
312    /// Returns the highest number of local frames behind across all remote players.
313    pub fn highest_local_frames_behind(&self) -> i32 {
314        self.remote_players_network_stats()
315            .iter()
316            .map(|stats| stats.local_frames_behind)
317            .max()
318            .unwrap_or(0)
319    }
320
321    /// Returns the highest number of remote frames behind across all remote players.
322    pub fn highest_remote_frames_behind(&self) -> i32 {
323        self.remote_players_network_stats()
324            .iter()
325            .map(|stats| stats.remote_frames_behind)
326            .max()
327            .unwrap_or(0)
328    }
329
330    /// Calculates the average ping across all remote players.
331    pub fn averaged_ping(&self) -> u128 {
332        let remote_stats = self.remote_players_network_stats();
333        if remote_stats.is_empty() {
334            0
335        } else {
336            let total_ping: u128 = remote_stats.iter().map(|stats| stats.ping).sum();
337            total_ping / remote_stats.len() as u128
338        }
339    }
340
341    /// Returns the lowest ping across all remote players.
342    pub fn lowest_ping(&self) -> u128 {
343        self.remote_players_network_stats()
344            .iter()
345            .map(|stats| stats.ping)
346            .min()
347            .unwrap_or(0)
348    }
349
350    /// Returns the highest ping across all remote players.
351    pub fn highest_ping(&self) -> u128 {
352        self.remote_players_network_stats()
353            .iter()
354            .map(|stats| stats.ping)
355            .max()
356            .unwrap_or(0)
357    }
358
359    /// Getter for the local player index, if offline defaults to None.
360    pub fn local_player_idx_checked(&self) -> Option<usize> {
361        self.online_info.as_ref().map(|info| info.local_player_idx)
362    }
363
364    /// Getter for the local player index, if offline defaults to 0.
365    pub fn local_player_idx(&self) -> usize {
366        self.online_info
367            .as_ref()
368            .map(|info| info.local_player_idx)
369            .unwrap_or(0)
370    }
371
372    /// Getter for the local frame delay.
373    pub fn local_frame_delay(&self) -> usize {
374        self.online_info
375            .as_ref()
376            .map(|info| info.local_frame_delay)
377            .unwrap_or(0)
378    }
379
380    /// Getter for the number of players, if offline defaults to 0.
381    pub fn players_count(&self) -> usize {
382        self.online_info
383            .as_ref()
384            .map(|info| info.players_network_stats.len())
385            .unwrap_or(0)
386    }
387
388    /// Getter for the number of players, if offline defaults to None.
389    pub fn players_count_checked(&self) -> Option<usize> {
390        self.online_info
391            .as_ref()
392            .map(|info| info.players_network_stats.len())
393    }
394
395    /// Getter for the list of active players (idx) which are connected. Offline returns empty list.
396    pub fn active_players(&self) -> SVec<usize> {
397        self.online_info
398            .as_ref()
399            .map(|info| {
400                let total_players = info.players_network_stats.len();
401                (0..total_players)
402                    .filter(|&id| !info.disconnected_players.contains(&id))
403                    .collect()
404            })
405            .unwrap_or_default()
406    }
407
408    /// Getter for the list of active players (idx) which are connected. Offline returns None.
409    pub fn active_players_checked(&self) -> Option<SVec<usize>> {
410        self.online_info.as_ref().map(|info| {
411            let total_players = info.players_network_stats.len();
412            (0..total_players)
413                .filter(|&id| !info.disconnected_players.contains(&id))
414                .collect()
415        })
416    }
417
418    /// Getter for the list of players which have been disconnected (their idx). Offline returns empty list.
419    pub fn disconnected_players(&self) -> SVec<usize> {
420        self.online_info
421            .as_ref()
422            .map(|info| info.disconnected_players.clone())
423            .unwrap_or_default()
424    }
425
426    /// Getter for the list of players which have been disconnected (their idx). Offline returns None.
427    pub fn disconnected_players_checked(&self) -> Option<SVec<usize>> {
428        self.online_info
429            .as_ref()
430            .map(|info| info.disconnected_players.clone())
431    }
432
433    /// Getter for the random seed.
434    pub fn random_seed(&self) -> u64 {
435        self.random_seed
436    }
437}
438
439/// Resource tracking which players have been disconnected.
440/// May not be in world if no disconnects.
441///
442/// If rollback to frame before disconnect, player handle is still included here.
443#[derive(HasSchema, Clone, Default)]
444pub struct DisconnectedPlayers {
445    /// Handles of players that have been disconnected.
446    pub disconnected_players: Vec<usize>,
447}
448
449/// [`SessionRunner`] implementation that uses [`ggrs`] for network play.
450///
451/// This is where the whole `ggrs` integration is implemented.
452pub struct GgrsSessionRunner<'a, InputTypes: NetworkInputConfig<'a>> {
453    /// The last player input we detected.
454    pub last_player_input: InputTypes::Dense,
455
456    /// The GGRS peer-to-peer session.
457    pub session: P2PSession<GgrsConfig<InputTypes::Dense>>,
458
459    /// Local player idx.
460    pub player_idx: u32,
461
462    /// Index of local player, computed from player_is_local
463    pub local_player_idx: u32,
464
465    /// The frame time accumulator, used to produce a fixed refresh rate.
466    pub accumulator: f64,
467
468    /// Timestamp of last time session was run to compute delta time.
469    pub last_run: Option<Instant>,
470
471    /// FPS from game adjusted with constant network factor (may be slightly slower)
472    pub network_fps: f64,
473
474    /// FPS from game not adjusted with network factor.
475    pub original_fps: f64,
476
477    /// Session runner's input collector.
478    pub input_collector: InputTypes::InputCollector,
479
480    /// Is local input disabled? (No input will be used if set)
481    pub local_input_disabled: bool,
482
483    /// Players who have been reported disconnected by ggrs
484    disconnected_players: Vec<usize>,
485
486    /// Store copy of socket to be able to restart session runner with existing socket.
487    socket: Socket,
488
489    /// Local input delay ggrs session was initialized with
490    local_input_delay: usize,
491
492    /// The random seed used for this session
493    pub random_seed: u64,
494}
495
496/// The info required to create a [`GgrsSessionRunner`].
497#[derive(Clone)]
498pub struct GgrsSessionRunnerInfo {
499    /// The socket that will be converted into GGRS socket implementation.
500    pub socket: Socket,
501    /// The local player idx
502    pub player_idx: u32,
503    /// the player count.
504    pub player_count: u32,
505
506    /// Max prediction window (max number of frames client may predict ahead of last confirmed frame)
507    /// `None` will use Bone's default.
508    pub max_prediction_window: Option<usize>,
509
510    /// Local input delay (local inputs + remote inputs will be buffered and sampled this many frames later)
511    /// Increasing helps with mitigating pops when remote user changes input quickly, and reduces amount of frames
512    /// client will end up predicted ahead from others, helps with high latency.
513    ///
514    /// `None` will use Bone's default.
515    pub local_input_delay: Option<usize>,
516    /// The random seed used for this session
517    pub random_seed: u64,
518}
519
520impl GgrsSessionRunnerInfo {
521    /// See [`GgrsSessionRunnerInfo`] fields for info on arguments.
522    pub fn new(
523        socket: Socket,
524        max_prediction_window: Option<usize>,
525        local_input_delay: Option<usize>,
526        random_seed: u64,
527    ) -> Self {
528        let player_idx = socket.player_idx();
529        let player_count = socket.player_count();
530        Self {
531            socket,
532            player_idx,
533            player_count,
534            max_prediction_window,
535            local_input_delay,
536            random_seed,
537        }
538    }
539}
540
541impl<'a, InputTypes> GgrsSessionRunner<'a, InputTypes>
542where
543    InputTypes: NetworkInputConfig<'a>,
544{
545    /// Creates a new session runner from a `OnlineMatchmakerResponse::GameStarting`
546    /// Any input values set as `None` will be set to default.
547    /// If response is not `GameStarting` returns None.
548    pub fn new_networked_game_starting(
549        target_fps: Option<f32>,
550        max_prediction_window: Option<usize>,
551        local_input_delay: Option<usize>,
552        matchmaker_resp_game_starting: OnlineMatchmakerResponse,
553    ) -> Option<Self> {
554        if let OnlineMatchmakerResponse::GameStarting {
555            socket,
556            player_idx: _,
557            player_count: _,
558            random_seed,
559        } = matchmaker_resp_game_starting
560        {
561            Some(Self::new(
562                target_fps,
563                GgrsSessionRunnerInfo::new(
564                    socket.ggrs_socket(),
565                    max_prediction_window,
566                    local_input_delay,
567                    random_seed,
568                ),
569            ))
570        } else {
571            None
572        }
573    }
574
575    /// Creates a new session runner from scratch.
576    pub fn new(target_fps: Option<f32>, info: GgrsSessionRunnerInfo) -> Self
577    where
578        Self: Sized,
579    {
580        let simulation_fps = target_fps.unwrap_or(NETWORK_DEFAULT_SIMULATION_FRAME_RATE);
581
582        // Modified FPS may not be an integer, but ggrs requires integer fps, so we clamp and round
583        // to integer so our computed timestep will match  that of ggrs.
584        let network_fps = (simulation_fps * NETWORK_FRAME_RATE_FACTOR) as f64;
585        let network_fps = network_fps
586            .max(usize::MIN as f64)
587            .min(usize::MAX as f64)
588            .round() as usize;
589
590        // There may be value in dynamically negotitaing these values based on client's pings
591        // before starting the match.
592        let max_prediction = info
593            .max_prediction_window
594            .unwrap_or(NETWORK_MAX_PREDICTION_WINDOW_DEFAULT);
595        let local_input_delay = info
596            .local_input_delay
597            .unwrap_or(NETWORK_LOCAL_INPUT_DELAY_DEFAULT);
598
599        // Notify debugger of setting
600        #[cfg(feature = "net-debug")]
601        NETWORK_DEBUG_CHANNEL
602            .sender
603            .try_send(NetworkDebugMessage::SetMaxPrediction(max_prediction))
604            .unwrap();
605
606        let mut builder = ggrs::SessionBuilder::new()
607            .with_num_players(info.player_count as usize)
608            .with_input_delay(local_input_delay)
609            .with_fps(network_fps)
610            .unwrap()
611            .with_max_prediction_window(max_prediction)
612            .unwrap();
613
614        let local_player_idx = info.player_idx;
615        for i in 0..info.player_count {
616            if i == info.player_idx {
617                builder = builder
618                    .add_player(ggrs::PlayerType::Local, i as usize)
619                    .unwrap();
620            } else {
621                builder = builder
622                    .add_player(ggrs::PlayerType::Remote(i as usize), i as usize)
623                    .unwrap();
624            }
625        }
626
627        let session = builder.start_p2p_session(info.socket.clone()).unwrap();
628
629        Self {
630            last_player_input: InputTypes::Dense::default(),
631            session,
632            player_idx: info.player_idx,
633            local_player_idx,
634            accumulator: default(),
635            last_run: None,
636            network_fps: network_fps as f64,
637            original_fps: simulation_fps as f64,
638            disconnected_players: default(),
639            input_collector: InputTypes::InputCollector::default(),
640            socket: info.socket.clone(),
641            local_input_delay,
642            local_input_disabled: false,
643            random_seed: info.random_seed,
644        }
645    }
646}
647
648impl<InputTypes> SessionRunner for GgrsSessionRunner<'static, InputTypes>
649where
650    InputTypes: NetworkInputConfig<'static> + 'static,
651{
652    fn step(&mut self, frame_start: Instant, world: &mut World, stages: &mut SystemStages) {
653        let step: f64 = 1.0 / self.network_fps;
654
655        let last_run = self.last_run.unwrap_or(frame_start);
656        let delta = (frame_start - last_run).as_secs_f64();
657        self.accumulator += delta;
658
659        let mut skip_frames: u32 = 0;
660
661        {
662            let player_inputs = world.resource::<InputTypes::PlayerControls>();
663
664            // Collect inputs and update controls
665            self.input_collector.apply_inputs(world);
666            self.input_collector.update_just_pressed();
667
668            // save local players dense input for use with ggrs
669            match player_inputs.get_control_source(self.local_player_idx as usize) {
670                Some(control_source) => {
671                    let control = self
672                        .input_collector
673                        .get_control(self.local_player_idx as usize, control_source);
674
675                    self.last_player_input = control.get_dense_input();
676                },
677                None => warn!("GgrsSessionRunner local_player_idx {} has no control source, no local input provided.",
678                    self.local_player_idx)
679            };
680        }
681
682        #[cfg(feature = "net-debug")]
683        // Current frame before we start network update loop
684        let current_frame_original = self.session.current_frame();
685
686        for event in self.session.events() {
687            match event {
688                ggrs::GgrsEvent::Synchronizing { addr, total, count } => {
689                    info!(player=%addr, %total, progress=%count, "Syncing network player");
690
691                    #[cfg(feature = "net-debug")]
692                    NETWORK_DEBUG_CHANNEL
693                        .sender
694                        .try_send(NetworkDebugMessage::PlayerSync((
695                            PlayerSyncState::SyncInProgress,
696                            addr,
697                        )))
698                        .unwrap();
699                }
700                ggrs::GgrsEvent::Synchronized { addr } => {
701                    info!(player=%addr, "Syncrhonized network client");
702
703                    #[cfg(feature = "net-debug")]
704                    NETWORK_DEBUG_CHANNEL
705                        .sender
706                        .try_send(NetworkDebugMessage::PlayerSync((
707                            PlayerSyncState::Sychronized,
708                            addr,
709                        )))
710                        .unwrap();
711                }
712                ggrs::GgrsEvent::Disconnected { addr } => {
713                    warn!(player=%addr, "Player Disconnected");
714                    self.disconnected_players.push(addr);
715
716                    #[cfg(feature = "net-debug")]
717                    NETWORK_DEBUG_CHANNEL
718                        .sender
719                        .try_send(NetworkDebugMessage::DisconnectedPlayers(
720                            self.disconnected_players.clone(),
721                        ))
722                        .unwrap();
723                } //return Err(SessionError::Disconnected)},
724                ggrs::GgrsEvent::NetworkInterrupted { addr, .. } => {
725                    info!(player=%addr, "Network player interrupted");
726                }
727                ggrs::GgrsEvent::NetworkResumed { addr } => {
728                    info!(player=%addr, "Network player re-connected");
729                }
730                ggrs::GgrsEvent::WaitRecommendation {
731                    skip_frames: skip_count,
732                } => {
733                    info!(
734                        "Skipping {skip_count} frames to give network players a chance to catch up"
735                    );
736                    skip_frames = skip_count;
737
738                    #[cfg(feature = "net-debug")]
739                    NETWORK_DEBUG_CHANNEL
740                        .sender
741                        .try_send(NetworkDebugMessage::SkipFrame {
742                            frame: current_frame_original,
743                            count: skip_count,
744                        })
745                        .unwrap();
746                }
747                ggrs::GgrsEvent::DesyncDetected {
748                    frame,
749                    local_checksum,
750                    remote_checksum,
751                    addr,
752                } => {
753                    error!(%frame, %local_checksum, %remote_checksum, player=%addr, "Network de-sync detected");
754                }
755            }
756        }
757
758        loop {
759            if self.accumulator >= step {
760                self.accumulator -= step;
761
762                if !self.local_input_disabled {
763                    self.session
764                        .add_local_input(self.local_player_idx as usize, self.last_player_input)
765                        .unwrap();
766                } else {
767                    // If local input is disabled, we still submit a default value representing no-inputs.
768                    // This way if input is disabled current inputs will not be held down indefinitely.
769                    self.session
770                        .add_local_input(
771                            self.local_player_idx as usize,
772                            InputTypes::Dense::default(),
773                        )
774                        .unwrap();
775                }
776
777                #[cfg(feature = "net-debug")]
778                {
779                    let current_frame = self.session.current_frame();
780                    let confirmed_frame = self.session.confirmed_frame();
781
782                    NETWORK_DEBUG_CHANNEL
783                        .sender
784                        .try_send(NetworkDebugMessage::FrameUpdate {
785                            current: current_frame,
786                            last_confirmed: confirmed_frame,
787                        })
788                        .unwrap();
789                }
790
791                if skip_frames > 0 {
792                    skip_frames = skip_frames.saturating_sub(1);
793                    continue;
794                }
795
796                match self.session.advance_frame() {
797                    Ok(requests) => {
798                        for request in requests {
799                            match request {
800                                ggrs::GgrsRequest::SaveGameState { cell, frame } => {
801                                    cell.save(frame, Some(world.clone()), None)
802                                }
803                                ggrs::GgrsRequest::LoadGameState { cell, .. } => {
804                                    world.load_snapshot(cell.load().unwrap_or_default());
805                                }
806                                ggrs::GgrsRequest::AdvanceFrame {
807                                    inputs: network_inputs,
808                                } => {
809                                    // Input has been consumed, signal that we are in new input frame
810                                    self.input_collector.advance_frame();
811
812                                    // Fetch the PlayerNetworkStats for each remote player, guaranteeing each one is inserted into the index matching its handle
813                                    let mut players_network_stats: Vec<PlayerNetworkStats> = vec![
814                                        PlayerNetworkStats::default();
815                                        self.session.remote_player_handles().len() + 1 // + 1 for the local player to maintain correct length
816                                    ];
817                                    for handle in self.session.remote_player_handles().iter() {
818                                        if let Ok(stats) = self.session.network_stats(*handle) {
819                                            players_network_stats[*handle] =
820                                                PlayerNetworkStats::from_ggrs_network_stats(
821                                                    *handle, stats,
822                                                );
823                                        }
824                                    }
825
826                                    // Create and insert the RngGenerator resource if it doesn't exist
827                                    if world.resources.get::<RngGenerator>().is_none() {
828                                        let rng_generator = RngGenerator::new(self.random_seed);
829                                        world.insert_resource(rng_generator);
830                                    }
831
832                                    // TODO: Make sure SyncingInfo is initialized immediately when session is created,
833                                    // even before a frame has advanced.
834                                    // The existance of this resource may be used to determine if in an online match, and there could
835                                    // be race if expected it to exist but testing before first frame advance.
836                                    world.insert_resource(SyncingInfo {
837                                        current_frame: self.session.current_frame(),
838                                        random_seed: self.random_seed,
839                                        online_info: Some(OnlineSyncingInfo {
840                                            last_confirmed_frame: self.session.confirmed_frame(),
841                                            socket: self.socket.clone(),
842                                            players_network_stats: players_network_stats.into(),
843                                            local_player_idx: self.local_player_idx as usize,
844                                            local_frame_delay: self.local_input_delay,
845                                            disconnected_players: self
846                                                .disconnected_players
847                                                .clone()
848                                                .into(),
849                                        }),
850                                    });
851
852                                    // Disconnected players persisted on session runner, and updated each frame.
853                                    // This avoids a rollback from changing resource state.
854                                    world.insert_resource(DisconnectedPlayers {
855                                        disconnected_players: self.disconnected_players.clone(),
856                                    });
857
858                                    {
859                                        world
860                                            .resource_mut::<Time>()
861                                            .advance_exact(Duration::from_secs_f64(step));
862
863                                        // update game controls from ggrs inputs
864                                        let mut player_inputs =
865                                            world.resource_mut::<InputTypes::PlayerControls>();
866                                        for (player_idx, (input, status)) in
867                                            network_inputs.into_iter().enumerate()
868                                        {
869                                            trace!(
870                                                "Net player({player_idx}) local: {}, status: {status:?}, input: {:?}",
871                                                self.local_player_idx as usize == player_idx,
872                                                input
873                                            );
874                                            player_inputs.network_update(
875                                                player_idx,
876                                                &input,
877                                                status.into(),
878                                            );
879                                        }
880                                    }
881
882                                    // Run game session stages, advancing simulation
883                                    stages.run(world);
884
885                                    // Handle any triggered resets of world + preserve runner's managed resources like RngGenerator.
886                                    if world.reset_triggered() {
887                                        let rng = world
888                                            .get_resource::<RngGenerator>()
889                                            .map(|r| (*r).clone());
890                                        if let Some(rng) = rng {
891                                            if let Some(mut reset) =
892                                                world.get_resource_mut::<ResetWorld>()
893                                            {
894                                                reset.insert_reset_resource(rng);
895                                            }
896                                        }
897
898                                        world.handle_world_reset(stages);
899                                    }
900                                }
901                            }
902                        }
903                    }
904                    Err(e) => match e {
905                        ggrs::GgrsError::NotSynchronized => {
906                            debug!("Waiting for network clients to sync")
907                        }
908                        ggrs::GgrsError::PredictionThreshold => {
909                            warn!("Freezing game while waiting for network to catch-up.");
910
911                            #[cfg(feature = "net-debug")]
912                            NETWORK_DEBUG_CHANNEL
913                                .sender
914                                .try_send(NetworkDebugMessage::FrameFroze {
915                                    frame: self.session.current_frame(),
916                                })
917                                .unwrap();
918                        }
919                        e => error!("Network protocol error: {e}"),
920                    },
921                }
922            } else {
923                break;
924            }
925        }
926
927        self.last_run = Some(frame_start);
928
929        // Fetch GGRS network stats of remote players and send to net debug tool
930        #[cfg(feature = "net-debug")]
931        {
932            let mut network_stats: Vec<(PlayerHandle, NetworkStats)> = vec![];
933            for handle in self.session.remote_player_handles().iter() {
934                if let Ok(stats) = self.session.network_stats(*handle) {
935                    network_stats.push((*handle, stats));
936                }
937            }
938            if !network_stats.is_empty() {
939                NETWORK_DEBUG_CHANNEL
940                    .sender
941                    .try_send(NetworkDebugMessage::NetworkStats { network_stats })
942                    .unwrap();
943            }
944        }
945    }
946
947    fn restart_session(&mut self) {
948        // Rebuild session info from runner + create new runner
949
950        // Increment match id so messages from previous match that are still in flight
951        // will be filtered out.
952        self.socket.increment_match_id();
953
954        let runner_info = GgrsSessionRunnerInfo {
955            socket: self.socket.clone(),
956            player_idx: self.player_idx,
957            player_count: self.session.num_players().try_into().unwrap(),
958            max_prediction_window: Some(self.session.max_prediction()),
959            local_input_delay: Some(self.local_input_delay),
960            random_seed: self.random_seed,
961        };
962        *self = GgrsSessionRunner::new(Some(self.original_fps as f32), runner_info);
963    }
964
965    fn disable_local_input(&mut self, input_disabled: bool) {
966        self.local_input_disabled = input_disabled;
967    }
968}
969
970/// A schema-compatible wrapper for ggrs `NetworkStats` struct contains networking stats.
971#[derive(Debug, Default, Clone, Copy, HasSchema)]
972pub struct PlayerNetworkStats {
973    /// The idx of the player these stats are for. Included here for self-attesting/ease-of-access.
974    pub player_idx: usize,
975    /// The length of the queue containing UDP packets which have not yet been acknowledged by the end client.
976    /// The length of the send queue is a rough indication of the quality of the connection. The longer the send queue, the higher the round-trip time between the
977    /// clients. The send queue will also be longer than usual during high packet loss situations.
978    pub send_queue_len: usize,
979    /// The roundtrip packet transmission time as calculated by GGRS.
980    pub ping: u128,
981    /// The estimated bandwidth used between the two clients, in kilobits per second.
982    pub kbps_sent: usize,
983
984    /// The number of frames GGRS calculates that the local client is behind the remote client at this instant in time.
985    /// For example, if at this instant the current game client is running frame 1002 and the remote game client is running frame 1009,
986    /// this value will mostly likely roughly equal 7.
987    pub local_frames_behind: i32,
988    /// The same as [`local_frames_behind`], but calculated from the perspective of the remote player.
989    ///
990    /// [`local_frames_behind`]: #structfield.local_frames_behind
991    pub remote_frames_behind: i32,
992}
993
994impl PlayerNetworkStats {
995    /// Creates a new PlayerNetworkStats from a player index and a ggrs NetworkStats.
996    pub fn from_ggrs_network_stats(player_idx: usize, stats: ggrs::NetworkStats) -> Self {
997        Self {
998            player_idx,
999            send_queue_len: stats.send_queue_len,
1000            ping: stats.ping,
1001            kbps_sent: stats.kbps_sent,
1002            local_frames_behind: stats.local_frames_behind,
1003            remote_frames_behind: stats.remote_frames_behind,
1004        }
1005    }
1006}