diff --git a/Cargo.lock b/Cargo.lock index 6687759..b0675fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -33,6 +33,21 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anstream" version = "0.6.20" @@ -160,6 +175,20 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268" +[[package]] +name = "chrono" +version = "0.4.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-link", +] + [[package]] name = "circular-buffer" version = "1.1.0" @@ -231,6 +260,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + [[package]] name = "coreaudio-rs" version = "0.13.0" @@ -328,6 +363,30 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "iana-time-zone" +version = "0.1.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c919e5debc312ad217002b8048a17b7d83f80703865bbfcfebb0458b0b27d8" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "log", + "wasm-bindgen", + "windows-core 0.61.2", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "indexmap" version = "2.10.0" @@ -723,6 +782,7 @@ version = "0.1.0" dependencies = [ "anyhow", "audiopus", + "chrono", "circular-buffer", "clap", "cpal", @@ -906,7 +966,7 @@ version = "0.54.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9252e5725dbed82865af151df558e754e4a3c2c30818359eb17465f1346a1b49" dependencies = [ - "windows-core", + "windows-core 0.54.0", "windows-targets 0.52.6", ] @@ -916,10 +976,45 @@ version = "0.54.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "12661b9c89351d684a50a8a643ce5f608e20243b9fb84687800163429f161d65" dependencies = [ - "windows-result", + "windows-result 0.1.2", "windows-targets 0.52.6", ] +[[package]] +name = "windows-core" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link", + "windows-result 0.3.4", + "windows-strings", +] + +[[package]] +name = "windows-implement" +version = "0.60.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.59.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "windows-link" version = "0.1.3" @@ -935,6 +1030,24 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-result" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-strings" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" +dependencies = [ + "windows-link", +] + [[package]] name = "windows-sys" version = "0.45.0" diff --git a/Cargo.toml b/Cargo.toml index ef8e89c..37e1f06 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,3 +12,4 @@ env_logger = "0.11.8" audiopus = "0.3.0-rc.0" thiserror = "2.0.12" circular-buffer = "1.1.0" +chrono = "0.4.41" diff --git a/src/audio_sink.rs b/src/audio_sink.rs index 98fd4e5..3464ee0 100644 --- a/src/audio_sink.rs +++ b/src/audio_sink.rs @@ -1,11 +1,14 @@ use crate::input_buffer::InputBuffer; use crate::udp_connection::OpusUdpConnection; -use crate::{CIRC_BUFFER_SIZE, SAMPLE_RATE, STEREO_FRAME_SIZE}; +use crate::{CIRC_BUFFER_SIZE, SAMPLE_RATE, SAMPLE_RATE_RAW, STEREO_FRAME_SIZE}; use anyhow::Error; use audiopus::Channels; use audiopus::coder::Decoder; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; -use cpal::{Device, FromSample, Host, I24, Sample, SizedSample, SupportedStreamConfig}; +use cpal::{ + BufferSize, Device, FromSample, Host, I24, Sample, SampleRate, SizedSample, StreamConfig, + SupportedStreamConfig, +}; use log::{error, info}; use std::io::{ErrorKind, Read}; use std::net::SocketAddr; @@ -36,6 +39,10 @@ pub fn listen(device: &str, addr: SocketAddr, host: Host) -> Result<(), Error> { let config = device.default_output_config()?; println!("Default output config : {config:?}"); + for cfg in device.supported_output_configs()? { + info!("{:?}", cfg); + } + let decoder = Decoder::new(SAMPLE_RATE, Channels::Stereo)?; let input_buffer = InputBuffer::new(STEREO_FRAME_SIZE, decoder); @@ -57,23 +64,28 @@ pub fn listen(device: &str, addr: SocketAddr, host: Host) -> Result<(), Error> { fn stream_setup_for( device: Device, - config: SupportedStreamConfig, + default_config: SupportedStreamConfig, ctx: Context, ) -> Result where { - match config.sample_format() { - cpal::SampleFormat::I8 => make_stream::(&device, &config.into(), ctx), - cpal::SampleFormat::I16 => make_stream::(&device, &config.into(), ctx), - cpal::SampleFormat::I24 => make_stream::(&device, &config.into(), ctx), - cpal::SampleFormat::I32 => make_stream::(&device, &config.into(), ctx), - cpal::SampleFormat::I64 => make_stream::(&device, &config.into(), ctx), - cpal::SampleFormat::U8 => make_stream::(&device, &config.into(), ctx), - cpal::SampleFormat::U16 => make_stream::(&device, &config.into(), ctx), - cpal::SampleFormat::U32 => make_stream::(&device, &config.into(), ctx), - cpal::SampleFormat::U64 => make_stream::(&device, &config.into(), ctx), - cpal::SampleFormat::F32 => make_stream::(&device, &config.into(), ctx), - cpal::SampleFormat::F64 => make_stream::(&device, &config.into(), ctx), + let mut config: StreamConfig = default_config.clone().into(); + + config.sample_rate = SampleRate(SAMPLE_RATE_RAW as u32); + config.buffer_size = BufferSize::Fixed(STEREO_FRAME_SIZE as u32); + + match default_config.sample_format() { + cpal::SampleFormat::I8 => make_stream::(&device, &config, ctx), + cpal::SampleFormat::I16 => make_stream::(&device, &config, ctx), + cpal::SampleFormat::I24 => make_stream::(&device, &config, ctx), + cpal::SampleFormat::I32 => make_stream::(&device, &config, ctx), + cpal::SampleFormat::I64 => make_stream::(&device, &config, ctx), + cpal::SampleFormat::U8 => make_stream::(&device, &config, ctx), + cpal::SampleFormat::U16 => make_stream::(&device, &config, ctx), + cpal::SampleFormat::U32 => make_stream::(&device, &config, ctx), + cpal::SampleFormat::U64 => make_stream::(&device, &config, ctx), + cpal::SampleFormat::F32 => make_stream::(&device, &config, ctx), + cpal::SampleFormat::F64 => make_stream::(&device, &config, ctx), sample_format => Err(anyhow::Error::msg(format!( "Unsupported sample format '{sample_format}'" ))), diff --git a/src/audio_source.rs b/src/audio_source.rs index 40b63b3..5211791 100644 --- a/src/audio_source.rs +++ b/src/audio_source.rs @@ -1,11 +1,14 @@ use crate::output_buffer::OutputBuffer; use crate::udp_connection::OpusUdpConnection; -use crate::{CIRC_BUFFER_SIZE, SAMPLE_RATE, SAMPLE_RATE_RAW, STEREO_FRAME_SIZE}; +use crate::{ + CIRC_BUFFER_SIZE, SAMPLE_RATE, SAMPLE_RATE_RAW, + STEREO_FRAME_SIZE, +}; use anyhow::Error; use audiopus::coder::Encoder; -use audiopus::{Application, Channels}; +use audiopus::{Application, Bitrate, Channels, Signal}; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; -use cpal::{FromSample, Host, Sample, StreamConfig}; +use cpal::{BufferSize, FromSample, Host, Sample, StreamConfig}; use log::{error, info}; use std::net::SocketAddr; use std::sync::{Arc, Mutex}; @@ -33,11 +36,19 @@ pub fn tap( let mut stream_config = StreamConfig::from(config.clone()); stream_config.sample_rate = cpal::SampleRate(SAMPLE_RATE_RAW as u32); + stream_config.buffer_size = BufferSize::Fixed(STEREO_FRAME_SIZE as u32); + + for cfg in device.supported_input_configs()? { + info!("{:?}", cfg) + } info!("Stream config: {stream_config:?}"); let udp_connection = OpusUdpConnection::new(src_addr, dest_addr)?; - let encoder = Encoder::new(SAMPLE_RATE, Channels::Stereo, Application::Audio)?; + let mut encoder = Encoder::new(SAMPLE_RATE, Channels::Stereo, Application::Audio)?; + encoder.set_bitrate(Bitrate::Max)?; + encoder.set_complexity(10)?; + encoder.set_signal(Signal::Music)?; let sound_buffer = Arc::new(Mutex::new(OutputBuffer::new( STEREO_FRAME_SIZE, @@ -98,11 +109,10 @@ fn write_input_data( T: Sample, f32: FromSample, { - if let Ok(mut ctx) = ctx.try_lock() { - let sample: Vec = input.iter().map(|s| f32::from_sample(*s)).collect(); + let mut ctx = ctx.lock().expect("Mutex poisoned"); + let sample: Vec = input.iter().map(|s| f32::from_sample(*s)).collect(); - if let Err(err) = ctx.write_data(&sample) { - error!("Got error when writing data to sound buffer: {err}") - } + if let Err(err) = ctx.write_data(&sample) { + error!("Got error when writing data to sound buffer: {err}") } } diff --git a/src/input_buffer.rs b/src/input_buffer.rs index 90b0446..861ab30 100644 --- a/src/input_buffer.rs +++ b/src/input_buffer.rs @@ -1,8 +1,11 @@ +use crate::tap_packet::{TapPacketError, TapPacketHeader}; use audiopus::MutSignals; use audiopus::coder::Decoder; use audiopus::packet::Packet; +use chrono::Utc; use circular_buffer::CircularBuffer; use cpal::{FromSample, Sample}; +use log::warn; use thiserror::Error; #[derive(Error, Debug)] @@ -12,6 +15,8 @@ pub enum Error { IoError(#[from] std::io::Error), #[error("Decode error: {0}")] DecodeError(#[from] audiopus::Error), + #[error("Tap packet error: {0}")] + TapPacketError(#[from] TapPacketError), } #[derive(Debug)] @@ -19,6 +24,7 @@ pub struct InputBuffer { circular_buffer: CircularBuffer, frame_size: usize, decoder: Decoder, + expected_seq: u32, } impl InputBuffer { @@ -27,11 +33,31 @@ impl InputBuffer { circular_buffer: CircularBuffer::new(), frame_size, decoder, + expected_seq: 0, } } fn decode(&mut self, input_buffer: &[u8], output_buffer: &mut [f32]) -> Result { - let packet = Packet::try_from(input_buffer)?; + let hdr = TapPacketHeader::from_buffer(input_buffer)?; + + if !hdr.check_acceptable_latency_tolerance() { + warn!( + "Packet outside of tolerance: packet.timestamp={} time={}", + hdr.timestamp, + Utc::now().timestamp() + ); + } + + if hdr.seq != self.expected_seq { + warn!( + "Found sequence mismatch, expected seq={}, {hdr:?}", + self.expected_seq + ); + } + + (self.expected_seq, _) = hdr.seq.overflowing_add(1); + + let packet = Packet::try_from(&input_buffer[TapPacketHeader::serialized_size()..])?; let signals = MutSignals::try_from(output_buffer)?; Ok(self.decoder.decode_float(Some(packet), signals, false)?) diff --git a/src/main.rs b/src/main.rs index 36d6611..1b7f1b9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ mod audio_sink; mod audio_source; mod input_buffer; mod output_buffer; +mod tap_packet; mod udp_connection; mod util; diff --git a/src/output_buffer.rs b/src/output_buffer.rs index 10c7e12..e017ec0 100644 --- a/src/output_buffer.rs +++ b/src/output_buffer.rs @@ -1,3 +1,4 @@ +use crate::tap_packet::{TapPacketError, TapPacketHeader}; use audiopus::coder::Encoder; use circular_buffer::CircularBuffer; use std::io::Write; @@ -12,6 +13,8 @@ pub enum Error { IoError(#[from] std::io::Error), #[error("Encode error: {0}")] EncodeError(#[from] audiopus::Error), + #[error("Tap packet error: {0}")] + TapPacketError(#[from] TapPacketError), } #[derive(Debug)] @@ -20,6 +23,7 @@ pub struct OutputBuffer { frame_size: usize, encoder: Encoder, offload: T, + seq_count: u32, } impl OutputBuffer { @@ -29,6 +33,7 @@ impl OutputBuffer { frame_size, encoder, offload, + seq_count: 0, } } @@ -38,10 +43,21 @@ impl OutputBuffer { } fn offload_buffer_data(&mut self) -> Result<(), Error> { - let mut output_buffer = vec![0; 32 * 1024]; - let size = self.encode(&mut output_buffer)?; - self.offload.write_all(&output_buffer[0..size])?; + let mut output_buffer = vec![0; 32 * 1024 + TapPacketHeader::serialized_size()]; + let size = self.encode(&mut output_buffer[TapPacketHeader::serialized_size()..])?; + let hdr = TapPacketHeader { + seq: self.seq_count, + timestamp: chrono::Utc::now().timestamp(), + }; + + hdr.write_to_buffer(&mut output_buffer)?; + + let total_size = TapPacketHeader::serialized_size() + size; + + self.offload.write_all(&output_buffer[0..total_size])?; + + (self.seq_count, _) = self.seq_count.overflowing_add(1); Ok(()) } diff --git a/src/tap_packet.rs b/src/tap_packet.rs new file mode 100644 index 0000000..a49e4aa --- /dev/null +++ b/src/tap_packet.rs @@ -0,0 +1,52 @@ +use chrono::Utc; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum TapPacketError { + #[error("Buffer is too small to serialize data into")] + BufTooSmall, +} + +#[derive(Debug, Clone)] +pub struct TapPacketHeader { + pub seq: u32, + pub timestamp: i64, +} + +impl TapPacketHeader { + pub fn write_to_buffer(&self, buf: &mut [u8]) -> Result<(), TapPacketError> { + if buf.len() < Self::serialized_size() { + return Err(TapPacketError::BufTooSmall); + } + + buf[0..size_of::()].copy_from_slice(self.seq.to_be_bytes().as_slice()); + buf[size_of::()..Self::serialized_size()] + .copy_from_slice(self.timestamp.to_be_bytes().as_slice()); + + Ok(()) + } + + pub fn check_acceptable_latency_tolerance(&self) -> bool { + Utc::now().timestamp().abs_diff(self.timestamp) < 1 + } + + pub fn from_buffer(buf: &[u8]) -> Result { + if buf.len() < Self::serialized_size() { + return Err(TapPacketError::BufTooSmall); + } + + let mut seq_bytes = [0u8; size_of::()]; + seq_bytes.copy_from_slice(&buf[0..size_of::()]); + let seq = u32::from_be_bytes(seq_bytes); + + let mut timestamp_bytes = [0u8; size_of::()]; + timestamp_bytes.copy_from_slice(&buf[size_of::()..Self::serialized_size()]); + let timestamp = i64::from_be_bytes(timestamp_bytes); + + Ok(Self { seq, timestamp }) + } + + pub fn serialized_size() -> usize { + size_of::() + size_of::() + } +}