Browse Source

started adding server-side events

main
Thomas Johnson 6 months ago
parent
commit
98685b79db
  1. 3
      Cargo.toml
  2. 76
      src/server.rs

3
Cargo.toml

@ -15,7 +15,8 @@ actix-web = "4"
actix-session = { version = "0.6", features = ["cookie-session"] }
env_logger = "0.9"
parking_lot = "0.12"
futures-util = "0.3"
rand = "0.8"
serde = {version = "1.0.136", features = ["derive" ]}
serde_json = "*"
tokio = { version = "*", features = ["process"] }
tokio = { version = "*", features = ["process"] }

76
src/server.rs

@ -1,28 +1,31 @@
use actix_session::{storage::CookieSessionStore, Session, SessionMiddleware};
use actix_session::{storage::CookieSessionStore, SessionMiddleware};
use actix_web::cookie::Key;
use actix_web::web::{Bytes, Data};
use actix_web::{
get, http::header::ContentType, middleware::Logger, web, App, HttpResponse, HttpServer,
Responder,
};
use actix_web::{post, HttpMessage, HttpRequest};
use actix_web::{post, http::header};
use parking_lot::Mutex;
use petgraph::graph::NodeIndex;
use std::collections::BTreeMap;
use std::process::Stdio;
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::mpsc::{Sender, Receiver};
use tokio::process::Command;
struct GameState {
game: Arc<Mutex<esgea::Game>>
game: Arc<Mutex<esgea::Game>>,
pid_channels: Vec<Option<Sender<Bytes>>>,
}
impl GameState {
fn new() -> Self {
Self {
game: Arc::new(Mutex::new(esgea::Game::new()))
game: Arc::new(Mutex::new(esgea::Game::new())),
pid_channels: vec![],
}
}
}
@ -63,14 +66,40 @@ async fn list_games(state: Data<Mutex<State>>) -> impl Responder {
)
}
struct ReceiverStream(Receiver<Bytes>);
impl futures_util::Stream for ReceiverStream {
type Item = Result<Bytes, actix_web::Error>;
fn poll_next(mut self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> core::task::Poll<Option<Self::Item>> {
use core::task::Poll::*;
match core::pin::Pin::new(&mut self.0).poll_recv(cx) {
Ready(Some(b)) => Ready(Some(Ok(b))),
Ready(None) => Ready(None),
Pending => Pending,
}
}
}
#[get("/events/{gid}/{pid}")]
async fn event_stream(state: Data<Mutex<State>>, path: web::Path<(String, String)>) -> impl Responder {
let (tx, rx) = tokio::sync::mpsc::channel(100);
let (gid, pid) = path.into_inner();
let gid: u128 = gid.parse().expect("sad gid");
let pid: esgea::PlayerId = pid.parse().expect("sad pid");
state.lock().games.entry(gid).and_modify(|e| if pid < e.pid_channels.len() { e.pid_channels.push(Some(tx)) });
HttpResponse::Ok().append_header((header::CONTENT_TYPE, "text/event-stream")).streaming(ReceiverStream(rx))
}
#[post("/join_game/{gid}")]
async fn join_game(state: Data<Mutex<State>>, path: web::Path<String>) -> impl Responder {
let st = state.lock();
let mut st = state.lock();
let gid = path.into_inner();
println!("gid = {}", gid);
let gid: u128 = gid.parse().expect("sad gid");
match st.games.get(&gid) {
match st.games.get_mut(&gid) {
Some(gm) => {
gm.pid_channels.push(None);
let mut gm = gm.game.lock();
let last = gm.players.last().cloned().unwrap_or(Default::default());
gm.players.push(esgea::Player {
@ -114,13 +143,22 @@ async fn render(state: Data<Mutex<State>>, path: web::Path<(String, String)>) ->
.body(svg)
}
fn distribute_updates(game: &mut esgea::Game, updates: Vec<(Option<esgea::PlayerId>, esgea::ClientUpdate)>) {
async fn distribute_updates(gs: &mut GameState, updates: Vec<(Option<esgea::PlayerId>, esgea::ClientUpdate)>) {
let mut game = gs.game.lock();
for (pid, upd) in updates {
if let Some(pid) = pid {
game.updates[pid].push(upd);
let seqno = game.updates[pid].len();
game.updates[pid].push(upd.clone());
if let Some(tx) = &gs.pid_channels[pid] {
tx.send(Bytes::copy_from_slice(serde_json::to_string(&(seqno, upd)).expect("cannot json encode client update").as_bytes())).await.expect("could not send on channel");
}
} else {
for pl in 0..game.updates.len() {
let seqno = game.updates[pl].len();
game.updates[pl].push(upd.clone());
if let Some(tx) = &gs.pid_channels[pl] {
tx.send(Bytes::copy_from_slice(serde_json::to_string(&(seqno, upd.clone())).expect("cannot json encode client update").as_bytes())).await.expect("could not send on channel");
}
}
}
}
@ -136,19 +174,21 @@ async fn do_action(
let gid: u128 = gid.parse().expect("gid isnt u128");
let pid: esgea::PlayerId = pid.parse().expect("pid isnt usize");
let gm = state.lock().games.get(&gid).expect("no homie").game.clone();
let mut gm = gm.lock();
let mut guard = state.lock();
let mut gs = guard.games.get_mut(&gid).expect("no homie");
//let mut gm = gm.lock();
match body.as_ref() {
b"strike" => { let upds = gm.strike(pid); distribute_updates(&mut gm, upds) },
b"wait" => { gm.wait(pid); },
b"capture" => { gm.capture(pid); },
b"hide_signals" => { gm.hide_signals(pid); },
b"invisible" => { gm.invisible(pid); },
b"prepare" => { gm.prepare(pid); },
b"strike" => { let upds = gs.game.lock().strike(pid); distribute_updates(&mut gs, upds).await },
b"wait" => { let upds = gs.game.lock().wait(pid); distribute_updates(&mut gs, upds).await },
b"capture" => { let upds = gs.game.lock().capture(pid); distribute_updates(&mut gs, upds).await },
b"hide_signals" => { let upds = gs.game.lock().hide_signals(pid); distribute_updates(&mut gs, upds).await },
b"invisible" => { let upds = gs.game.lock().invisible(pid); distribute_updates(&mut gs, upds).await },
b"prepare" => { let upds = gs.game.lock().prepare(pid); distribute_updates(&mut gs, upds).await },
_ => match body.as_ref().split(|c| b':' == *c).collect::<Vec<_>>()[..] {
[b"move", to] => {
return HttpResponse::Ok().body(
gm.try_move(
// TODO: fix try_move to give events
gs.game.lock().try_move(
pid,
NodeIndex::new(
std::str::from_utf8(to)
@ -161,7 +201,7 @@ async fn do_action(
);
}
[b"reveal", who] => {
gm.reveal(
gs.game.lock().reveal(
pid,
// TODO
None,

Loading…
Cancel
Save