Browse Source

initial commit

main
Thomas Johnson 6 months ago
commit
43bd7a3cc3
  1. 2
      .gitignore
  2. 13
      Cargo.toml
  3. 109
      src/main.rs

2
.gitignore

@ -0,0 +1,2 @@
/target
Cargo.lock

13
Cargo.toml

@ -0,0 +1,13 @@
[package]
name = "audio_sync"
version = "0.1.0"
edition = "2021"
[dependencies]
cpal = "0.13"
rand = "0.8"
tokio = { version = "1.17", features = [ "rt", "rt-multi-thread", "sync", "time", "io-std", "io-util" , "net", "macros"] }
[features]
jack = [ "cpal/jack" ]
default = [ "jack" ]

109
src/main.rs

@ -0,0 +1,109 @@
use core::sync::atomic::{AtomicI64, Ordering as AtOrd};
use std::sync::Arc;
use std::time::Instant;
use cpal::{traits::{DeviceTrait, HostTrait, StreamTrait}, SampleRate};
use rand::Rng;
use tokio::{io::{self, AsyncBufReadExt, AsyncWriteExt, BufReader}, sync::mpsc, net::UdpSocket, select};
fn main() {
#[cfg(feature="jack")]
let host = if let Ok(host) = cpal::host_from_id(cpal::HostId::Jack) {
host
} else {
cpal::default_host()
};
#[cfg(not(feature="jack"))]
let host = cpal::default_host();
let device = host.default_output_device().expect("no default host audio device!");
let mut conf_ranges = device.supported_output_configs().expect("could not query audio device capabilities -- audio device disconnected?");
let conf_range = conf_ranges.next().expect("audio device has no configurations!");
let desired_sample_rate = conf_range.max_sample_rate().0;
//let desired_sample_rate = u32::clamp(44100, conf_range.min_sample_rate().0, conf_range.max_sample_rate().0);
let conf = conf_range.with_sample_rate(SampleRate(desired_sample_rate)).config();
println!("conf: {:?}", conf);
// if let cpal::SupportedBufferSize::Range { min, .. } = conf_range.buffer_size() {
// println!("using buffer size {} ({}cm)", min, *min as f64 / conf.sample_rate.0 as f64 * 33000.0);
// conf.buffer_size = cpal::BufferSize::Fixed(*min);
// }
println!("playing at sample rate {}", conf.sample_rate.0);
let ts_ns = Arc::new(AtomicI64::new(0));
let tsref = ts_ns.clone();
let mut first_timestamp = None;
let stream = device.build_output_stream(
&conf,
move |_data: &mut [f32], cbinfo| {
if let Some(ts) = first_timestamp {
let ns = cbinfo.timestamp().callback.duration_since(&ts).unwrap().as_nanos();
tsref.store(ns as i64, AtOrd::Relaxed);
} else {
first_timestamp = Some(cbinfo.timestamp().callback);
}
},
|_err| {}
).expect("could not build output stream");
let mut rng = rand::thread_rng();
let begin = Instant::now();
stream.play().expect("could not play audio stream to synchronize against");
let rt = tokio::runtime::Runtime::new().expect("could not create tokio runtime");
rt.block_on(async {
let (tx, rx_console) = tokio::sync::mpsc::channel::<ConsoleInput>(16);
tokio::task::spawn(console_loop(tx));
tokio::task::spawn(network_loop(rx_console));
loop {
tokio::time::sleep(core::time::Duration::from_millis(rng.gen_range(100..200))).await;
let now = Instant::now().duration_since(begin).as_nanos() as i64;
let audio_time = ts_ns.load(AtOrd::Relaxed);
//println!("audio time: {}\t drift: {}", audio_time, now - audio_time);
}
});
}
enum NetMessage {
Solicit,
Advertise,
}
async fn network_loop(mut rx_console: mpsc::Receiver<ConsoleInput>) {
let mut recvbuf = [0u8; 65536]; // max udp packet size
let sock = UdpSocket::bind("0.0.0.0:43434").await.expect("could not bind socket");
loop {
select! {
console_input = rx_console.recv() => {
println!("console says {:?}", console_input);
}
Ok((bytes, source)) = sock.recv_from(&mut recvbuf) => {
println!("received {} bytes from {}", bytes, source);
}
}
}
}
#[derive(Debug, Clone)]
enum ConsoleInput {
Solicit,
Advertise,
}
async fn console_loop(tx: mpsc::Sender<ConsoleInput>) {
let mut stdout = io::stderr();
let mut stdin = BufReader::new(io::stdin());
let mut buffer = String::new();
loop {
stdout.write("> ".as_bytes()).await.expect("could not write console");
stdout.flush().await.expect("could not flush console");
stdin.read_line(&mut buffer).await.expect("could not read console");
if buffer.starts_with("adv") {
tx.send(ConsoleInput::Advertise).await.unwrap();
}
if buffer.starts_with("sol") {
tx.send(ConsoleInput::Solicit).await.unwrap();
}
buffer.clear();
}
}
Loading…
Cancel
Save