Browse Source

event stream is broken right now

main
Thomas Johnson 6 months ago
parent
commit
6a9c444090
  1. 6
      src/index.html
  2. 44
      src/server.rs

6
src/index.html

@ -31,6 +31,7 @@
window.pid = "not yet in game";
setInterval(() => { document.querySelector("#pid").textContent = window.pid; }, 300);
setInterval(() => { document.querySelector("#refresh").click(); }, 3000);
window.game_events = null;
window.renderMap = async () => {
let res = await fetch(`/render/${window.gid}/${window.pid}`, {
@ -78,6 +79,11 @@
});
window.gid = btn.getAttribute("gameid");
window.pid = await res.text();
console.log("pid: ", window.pid);
window.game_events = new EventSource(`/events/${window.gid}/${window.pid}`);
window.game_events.onmessage = (event) => {
console.log(event);
}
});
});
});

44
src/server.rs

@ -68,14 +68,26 @@ async fn list_games(state: Data<Mutex<State>>) -> impl Responder {
struct ReceiverStream(Receiver<Bytes>);
impl core::ops::Drop for ReceiverStream {
fn drop(&mut self) {
println!("dropping a channel");
}
}
impl futures_util::Stream for ReceiverStream {
type Item = Result<Bytes, actix_web::Error>;
fn poll_next(mut self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> core::task::Poll<Option<Self::Item>> {
use core::task::Poll::*;
match core::pin::Pin::new(&mut self.0).poll_recv(cx) {
Ready(Some(b)) => Ready(Some(Ok(b))),
Ready(None) => Ready(None),
Ready(Some(b)) => {
println!("dispatching some bytes: {b:?}");
Ready(Some(Ok(b)))
},
Ready(None) => {
println!("closing channel");
Ready(None)
},
Pending => Pending,
}
}
@ -87,7 +99,8 @@ async fn event_stream(state: Data<Mutex<State>>, path: web::Path<(String, String
let (gid, pid) = path.into_inner();
let gid: u128 = gid.parse().expect("sad gid");
let pid: esgea::PlayerId = pid.parse().expect("sad pid");
state.lock().games.entry(gid).and_modify(|e| if pid < e.pid_channels.len() { e.pid_channels.push(Some(tx)) });
println!("getting event stream for {gid}/{pid}");
state.lock().games.entry(gid).and_modify(|e| if pid < e.pid_channels.len() { e.pid_channels[pid] = Some(tx) });
HttpResponse::Ok().append_header((header::CONTENT_TYPE, "text/event-stream")).streaming(ReceiverStream(rx))
}
@ -101,15 +114,16 @@ async fn join_game(state: Data<Mutex<State>>, path: web::Path<String>) -> impl R
Some(gm) => {
gm.pid_channels.push(None);
let mut gm = gm.game.lock();
let last = gm.players.last().cloned().unwrap_or(Default::default());
gm.players.push(esgea::Player {
let new_player = gm.players.last().cloned().map(|last| esgea::Player {
id: last.id + 1,
..last
});
}).unwrap_or(Default::default());
println!("adding player to game {gid}: {new_player:?}");
gm.players.push(new_player);
gm.updates.push(vec![]);
HttpResponse::Ok()
.append_header(ContentType::plaintext())
.body(format!("{}", last.id + 1))
.body(format!("{}", new_player.id))
}
None => HttpResponse::NotFound().body("no game"),
}
@ -150,14 +164,24 @@ async fn distribute_updates(gs: &mut GameState, updates: Vec<(Option<esgea::Play
let seqno = game.updates[pid].len();
game.updates[pid].push(upd.clone());
if let Some(tx) = &gs.pid_channels[pid] {
tx.send(Bytes::copy_from_slice(serde_json::to_string(&(seqno, upd)).expect("cannot json encode client update").as_bytes())).await.expect("could not send on channel");
let result = tx.send(Bytes::copy_from_slice(serde_json::to_string(&(seqno, upd)).expect("cannot json encode client update").as_bytes())).await;
if let Err(_) = result {
gs.pid_channels[pid] = None;
}
} else {
println!("no active event stream for {pid} -- cannot send {upd:?}");
}
} else {
for pl in 0..game.updates.len() {
let seqno = game.updates[pl].len();
game.updates[pl].push(upd.clone());
if let Some(tx) = &gs.pid_channels[pl] {
tx.send(Bytes::copy_from_slice(serde_json::to_string(&(seqno, upd.clone())).expect("cannot json encode client update").as_bytes())).await.expect("could not send on channel");
let result = tx.send(Bytes::copy_from_slice(serde_json::to_string(&(seqno, upd.clone())).expect("cannot json encode client update").as_bytes())).await;
if let Err(_) = result {
gs.pid_channels[pl] = None;
}
} else {
println!("no active event stream for {pl} -- cannot send {upd:?}");
}
}
}
@ -176,7 +200,6 @@ async fn do_action(
let mut guard = state.lock();
let mut gs = guard.games.get_mut(&gid).expect("no homie");
//let mut gm = gm.lock();
match body.as_ref() {
b"strike" => { let upds = gs.game.lock().strike(pid); distribute_updates(&mut gs, upds).await },
b"wait" => { let upds = gs.game.lock().wait(pid); distribute_updates(&mut gs, upds).await },
@ -234,6 +257,7 @@ async fn main() -> std::io::Result<()> {
.service(do_action)
.service(list_games)
.service(join_game)
.service(event_stream)
.service(render)
.service(start_game)
})

Loading…
Cancel
Save