diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..4e5bd93 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +[registries.ahines] +index = "https://git.ahines.net/joeyahines/_cargo-index.git" \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index be72d66..44e7119 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -322,6 +322,12 @@ dependencies = [ "objc2", ] +[[package]] +name = "either" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" + [[package]] name = "env_filter" version = "0.1.3" @@ -351,6 +357,40 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" +[[package]] +name = "errno" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" +dependencies = [ + "libc", + "windows-sys 0.60.2", +] + +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + +[[package]] +name = "fixedbitset" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" + +[[package]] +name = "getrandom" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasi", +] + [[package]] name = "hashbrown" version = "0.15.5" @@ -409,6 +449,15 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + [[package]] name = "jiff" version = "0.2.15" @@ -471,6 +520,12 @@ version = "0.2.174" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1171693293099992e19cddea4e8b849964e9846f4acee11b3948bcc337be8776" +[[package]] +name = "linux-raw-sys" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" + [[package]] name = "log" version = "0.4.27" @@ -492,6 +547,12 @@ version = "2.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" +[[package]] +name = "multimap" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" + [[package]] name = "ndk" version = "0.9.0" @@ -647,6 +708,16 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" +[[package]] +name = "petgraph" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" +dependencies = [ + "fixedbitset", + "indexmap", +] + [[package]] name = "pkg-config" version = "0.3.32" @@ -668,6 +739,16 @@ dependencies = [ "portable-atomic", ] +[[package]] +name = "prettyplease" +version = "0.2.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff24dfcda44452b9816fff4cd4227e1bb73ff5a2f1bc1105aa92fb8565ce44d2" +dependencies = [ + "proc-macro2", + "syn", +] + [[package]] name = "proc-macro-crate" version = "3.3.0" @@ -686,6 +767,58 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1" +dependencies = [ + "heck", + "itertools", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn", + "tempfile", +] + +[[package]] +name = "prost-derive" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-types" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9b4db3d6da204ed77bb26ba83b6122a73aeb2e87e25fbf7ad2e84c4ccbf8f72" +dependencies = [ + "prost", +] + [[package]] name = "quote" version = "1.0.40" @@ -695,6 +828,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r-efi" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" + [[package]] name = "regex" version = "1.11.1" @@ -724,6 +863,19 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" +[[package]] +name = "rustix" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11181fbabf243db407ef8df94a6ce0b2f9a733bd8be4ad02b4eda9602296cac8" +dependencies = [ + "bitflags 2.9.1", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.60.2", +] + [[package]] name = "rustversion" version = "1.0.21" @@ -795,9 +947,35 @@ dependencies = [ "env_logger", "hound", "log", + "prost", + "tap-interface", "thiserror 2.0.12", ] +[[package]] +name = "tap-interface" +version = "0.1.0" +source = "registry+https://git.ahines.net/joeyahines/_cargo-index.git" +checksum = "4450520c17b84421ce3addd0cb91d99f3a358e969f55440f9946b79ef8b48e67" +dependencies = [ + "prost", + "prost-build", + "prost-types", +] + +[[package]] +name = "tempfile" +version = "3.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15b61f8f20e3a6f7e0649d825294eaf317edce30f82cf6026e7e4cb9222a7d1e" +dependencies = [ + "fastrand", + "getrandom", + "once_cell", + "rustix", + "windows-sys 0.60.2", +] + [[package]] name = "thiserror" version = "1.0.69" @@ -877,6 +1055,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "wasi" +version = "0.14.3+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51ae83037bdd272a9e28ce236db8c07016dd0d50c27038b3f407533c030c95" +dependencies = [ + "wit-bindgen", +] + [[package]] name = "wasm-bindgen" version = "0.2.100" @@ -1276,3 +1463,9 @@ checksum = "f3edebf492c8125044983378ecb5766203ad3b4c2f7a922bd7dd207f6d443e95" dependencies = [ "memchr", ] + +[[package]] +name = "wit-bindgen" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "052283831dbae3d879dc7f51f3d92703a316ca49f91540417d38591826127814" diff --git a/Cargo.toml b/Cargo.toml index e84f735..9259488 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,3 +14,5 @@ thiserror = "2.0.12" circular-buffer = "1.1.0" chrono = "0.4.41" hound = "3.5.1" +tap-interface = {version = "0.1.0", registry = "ahines"} +prost = "0.14.1" diff --git a/src/audio_record.rs b/src/audio_record.rs index b74f757..ac9ff16 100644 --- a/src/audio_record.rs +++ b/src/audio_record.rs @@ -1,17 +1,16 @@ -use crate::tap_packet::TapPacketHeader; use crate::udp_connection::OpusUdpConnection; -use crate::util::{ - SAMPLE_RATE, SAMPLE_RATE_RAW, STEREO_FRAME_SIZE, TAP_PACKET_MAX_DATA_LEN, -}; +use crate::util::{SAMPLE_RATE, SAMPLE_RATE_RAW, STEREO_FRAME_SIZE, TAP_PACKET_MAX_DATA_LEN}; use anyhow::Error; use audiopus::coder::Decoder; use audiopus::packet::Packet; use audiopus::{Channels, MutSignals}; use chrono::Utc; use log::{error, warn}; +use prost::Message; use std::io::{ErrorKind, Read}; use std::net::SocketAddr; use std::path::Path; +use tap_interface::tap::message::TapMessage; pub fn record(addr: SocketAddr, output_path: &Path, duration_ms: u64) -> Result<(), Error> { let mut decoder = Decoder::new(SAMPLE_RATE, Channels::Stereo)?; @@ -32,7 +31,7 @@ pub fn record(addr: SocketAddr, output_path: &Path, duration_ms: u64) -> Result< let deadline = std::time::Instant::now() + std::time::Duration::from_millis(duration_ms); while std::time::Instant::now() < deadline { - let mut packet = vec![0; TAP_PACKET_MAX_DATA_LEN + TapPacketHeader::serialized_size()]; + let mut packet = vec![0; TAP_PACKET_MAX_DATA_LEN]; let mut output_buffer = vec![0.0; STEREO_FRAME_SIZE]; let signals = MutSignals::try_from(&mut output_buffer)?; @@ -49,26 +48,26 @@ pub fn record(addr: SocketAddr, output_path: &Path, duration_ms: u64) -> Result< }; let data_size = if let Some(new_packet) = new_packet { - let hdr = TapPacketHeader::from_buffer(new_packet)?; + let tap_message = TapMessage::decode(new_packet)?; - if !hdr.check_acceptable_latency_tolerance() { + if tap_message.timestamp - Utc::now().timestamp() as u64 > 1 { warn!( "Packet outside of tolerance: packet.timestamp={} time={}", - hdr.timestamp, + tap_message.timestamp, Utc::now().timestamp() ); } - if hdr.seq != seq_count { + if tap_message.seq != seq_count { warn!( - "Found sequence mismatch, expected seq={}, {hdr:?}", + "Found sequence mismatch, expected seq={}, {tap_message:?}", seq_count ); } - (seq_count, _) = hdr.seq.overflowing_add(1); + (seq_count, _) = tap_message.seq.overflowing_add(1); - let packet = Packet::try_from(&new_packet[TapPacketHeader::serialized_size()..])?; + let packet = Packet::try_from(&tap_message.audio_data)?; decoder.decode_float(Some(packet), signals, false)? } else { diff --git a/src/audio_sink.rs b/src/audio_sink.rs deleted file mode 100644 index d564aea..0000000 --- a/src/audio_sink.rs +++ /dev/null @@ -1,169 +0,0 @@ -use crate::input_buffer::InputBuffer; -use crate::udp_connection::OpusUdpConnection; -use crate::util::{CIRC_BUFFER_SIZE, SAMPLE_RATE, SAMPLE_RATE_RAW, STEREO_FRAME_SIZE}; -use anyhow::Error; -use audiopus::Channels; -use audiopus::coder::Decoder; -use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; -use cpal::{ - BufferSize, Device, FromSample, Host, I24, Sample, SampleRate, SizedSample, StreamConfig, - SupportedStreamConfig, -}; -use log::{error, info}; -use std::io::{ErrorKind, Read}; -use std::net::SocketAddr; -use std::sync::{Arc, Condvar, Mutex}; -use std::thread; -use std::time::Duration; - -const INPUT_BUFFER_SIZE: usize = CIRC_BUFFER_SIZE; - -#[derive(Debug, Clone)] -struct Context { - pub addr: SocketAddr, - pub input_buffer: Arc>>, - pub data_rady: Arc, -} - -pub fn listen(device: &str, addr: SocketAddr, host: Host) -> Result<(), Error> { - // Set up the input device and stream with the default input config. - let device = if device == "default" { - host.default_output_device() - } else { - host.output_devices()? - .find(|x| x.name().map(|y| y == device).unwrap_or(false)) - } - .expect("failed to find output device"); - - info!("Output device: {}", device.name()?); - let config = device.default_output_config()?; - println!("Default output config : {config:?}"); - - for cfg in device.supported_output_configs()? { - info!("{:?}", cfg); - } - - let decoder = Decoder::new(SAMPLE_RATE, Channels::Stereo)?; - - let input_buffer = InputBuffer::new(STEREO_FRAME_SIZE, decoder); - - let ctx = Context { - addr, - input_buffer: Arc::new(Mutex::new(input_buffer)), - data_rady: Arc::new(Condvar::new()), - }; - - let stream = stream_setup_for(device, config, ctx)?; - - stream.play()?; - - thread::park(); - - Ok(()) -} - -fn stream_setup_for( - device: Device, - default_config: SupportedStreamConfig, - ctx: Context, -) -> Result -where -{ - let mut config: StreamConfig = default_config.clone().into(); - - config.sample_rate = SampleRate(SAMPLE_RATE_RAW as u32); - config.buffer_size = BufferSize::Fixed(STEREO_FRAME_SIZE as u32); - - match default_config.sample_format() { - cpal::SampleFormat::I8 => make_stream::(&device, &config, ctx), - cpal::SampleFormat::I16 => make_stream::(&device, &config, ctx), - cpal::SampleFormat::I24 => make_stream::(&device, &config, ctx), - cpal::SampleFormat::I32 => make_stream::(&device, &config, ctx), - cpal::SampleFormat::I64 => make_stream::(&device, &config, ctx), - cpal::SampleFormat::U8 => make_stream::(&device, &config, ctx), - cpal::SampleFormat::U16 => make_stream::(&device, &config, ctx), - cpal::SampleFormat::U32 => make_stream::(&device, &config, ctx), - cpal::SampleFormat::U64 => make_stream::(&device, &config, ctx), - cpal::SampleFormat::F32 => make_stream::(&device, &config, ctx), - cpal::SampleFormat::F64 => make_stream::(&device, &config, ctx), - sample_format => Err(anyhow::Error::msg(format!( - "Unsupported sample format '{sample_format}'" - ))), - } -} - -fn make_stream( - device: &Device, - config: &cpal::StreamConfig, - ctx: Context, -) -> Result -where - T: SizedSample + FromSample, -{ - let err_fn = |err| error!("Error building output sound stream: {err}"); - - let time_at_start = std::time::Instant::now(); - info!("Time at start: {time_at_start:?}"); - - let ctx_recv_thread = ctx.clone(); - thread::spawn(move || rx_thread(ctx_recv_thread)); - - let stream = device.build_output_stream( - config, - move |output: &mut [T], _: &cpal::OutputCallbackInfo| process_frame(output, &ctx), - err_fn, - None, - )?; - - Ok(stream) -} - -fn rx_thread(ctx: Context) { - info!("Starting RX thread..."); - let mut udp_connection = OpusUdpConnection::new(ctx.addr, ctx.addr).unwrap(); - udp_connection - .set_read_timeout(Some(Duration::from_millis(20))) - .unwrap(); - let mut packet = vec![0; 32 * 1024]; - - loop { - let new_packet = match udp_connection.read(&mut packet) { - Ok(data_len) => Some(&packet[0..data_len]), - Err(err) => { - if err.kind() == ErrorKind::WouldBlock { - None - } else { - error!("Got error trying to recv from UDP: {err}"); - continue; - } - } - }; - - let mut input_buffer = ctx.input_buffer.lock().expect("Mutex poisoned"); - - if let Err(err) = input_buffer.recv_packet(new_packet) { - error!("Adding packet to buffer: {err}") - } else { - ctx.data_rady.notify_all(); - } - } -} - -fn process_frame(output: &mut [SampleType], ctx: &Context) -where - SampleType: Sample + FromSample, -{ - let mut input_buffer = ctx.input_buffer.lock().expect("Mutex poisoned"); - - let mut input_buffer = if input_buffer.samples_in_buffer() < output.len() { - while input_buffer.samples_in_buffer() < output.len() { - input_buffer = ctx.data_rady.wait(input_buffer).expect("Mutex poisoned"); - } - - input_buffer - } else { - input_buffer - }; - - input_buffer.read_frame_from_buffer(output) -} diff --git a/src/audio_source.rs b/src/audio_source.rs index dd23104..1995fe9 100644 --- a/src/audio_source.rs +++ b/src/audio_source.rs @@ -1,8 +1,6 @@ use crate::tx_thread::TxThread; use crate::udp_connection::OpusUdpConnection; -use crate::util::{ - SAMPLE_RATE, SAMPLE_RATE_RAW, STEREO_FRAME_SIZE, ThreadMessage, -}; +use crate::util::{SAMPLE_RATE, SAMPLE_RATE_RAW, STEREO_FRAME_SIZE, ThreadMessage}; use anyhow::Error; use audiopus::coder::Encoder; use audiopus::{Application, Bitrate, Channels, Signal}; diff --git a/src/input_buffer.rs b/src/input_buffer.rs deleted file mode 100644 index 4810635..0000000 --- a/src/input_buffer.rs +++ /dev/null @@ -1,110 +0,0 @@ -use crate::tap_packet::{TapPacketError, TapPacketHeader}; -use audiopus::MutSignals; -use audiopus::coder::Decoder; -use audiopus::packet::Packet; -use chrono::Utc; -use circular_buffer::CircularBuffer; -use cpal::{FromSample, Sample}; -use log::warn; -use thiserror::Error; - -#[derive(Error, Debug)] -#[allow(clippy::enum_variant_names)] -pub enum Error { - #[error("IO error on Output: {0}")] - IoError(#[from] std::io::Error), - #[error("Decode error: {0}")] - DecodeError(#[from] audiopus::Error), - #[error("Tap packet error: {0}")] - TapPacketError(#[from] TapPacketError), -} - -#[derive(Debug)] -pub struct InputBuffer { - circular_buffer: CircularBuffer, - frame_size: usize, - decoder: Decoder, - expected_seq: u32, -} - -impl InputBuffer { - pub fn new(frame_size: usize, decoder: Decoder) -> Self { - Self { - circular_buffer: CircularBuffer::new(), - frame_size, - decoder, - expected_seq: 0, - } - } - - fn decode(&mut self, input_buffer: &[u8], output_buffer: &mut [f32]) -> Result { - let hdr = TapPacketHeader::from_buffer(input_buffer)?; - - if !hdr.check_acceptable_latency_tolerance() { - warn!( - "Packet outside of tolerance: packet.timestamp={} time={}", - hdr.timestamp, - Utc::now().timestamp() - ); - } - - if hdr.seq != self.expected_seq { - warn!( - "Found sequence mismatch, expected seq={}, {hdr:?}", - self.expected_seq - ); - } - - (self.expected_seq, _) = hdr.seq.overflowing_add(1); - - let packet = Packet::try_from(&input_buffer[TapPacketHeader::serialized_size()..])?; - - let signals = MutSignals::try_from(output_buffer)?; - Ok(self.decoder.decode_float(Some(packet), signals, false)?) - } - - fn missing_packet(&mut self, output_buffer: &mut [f32]) -> Result { - let signals = MutSignals::try_from(output_buffer)?; - Ok(self.decoder.decode_float(None, signals, false)?) - } - - fn write_frame_to_buffer(&mut self, frame: &[f32]) { - self.circular_buffer.extend_from_slice(frame) - } - - pub fn recv_packet(&mut self, input_buffer: Option<&[u8]>) -> Result<(), Error> { - let mut output_buffer = vec![0.0; self.frame_size]; - - let size = match input_buffer { - Some(buf) => self.decode(buf, &mut output_buffer)?, - None => self.missing_packet(&mut output_buffer)?, - } * 2; - - self.write_frame_to_buffer(&output_buffer[..size]); - - Ok(()) - } - - pub fn samples_in_buffer(&self) -> usize { - self.circular_buffer.len() - } - - pub fn read_frame_from_buffer>( - &mut self, - output_buffer: &mut [SampleType], - ) { - let drain_count = output_buffer.len().min(self.circular_buffer.len()); - - let frame: Vec = self.circular_buffer.drain(..drain_count).collect(); - - for (ndx, output) in output_buffer.iter_mut().enumerate() { - let value: SampleType = if ndx < frame.len() { - SampleType::from_sample(frame[ndx]) - } else { - SampleType::from_sample(0.0f32) - }; - - *output = value; - } - } -} diff --git a/src/main.rs b/src/main.rs index b14e2d3..3fb97f7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,10 @@ mod audio_record; -mod audio_sink; mod audio_source; -mod input_buffer; -mod rx_thread; -mod tap_packet; mod tx_thread; mod udp_connection; mod util; use crate::audio_record::record; -use crate::audio_sink::listen; use clap::{Parser, Subcommand}; use log::LevelFilter; use std::net::SocketAddr; @@ -35,15 +30,6 @@ enum Tasks { dest: SocketAddr, }, - /// Listen to data from a tap - Listen { - /// The audio device to use - #[arg(short, long, default_value_t = String::from("default"))] - device: String, - /// UDP addr to receive data on - addr: SocketAddr, - }, - Record { /// UDP addr to receive data on addr: SocketAddr, @@ -65,7 +51,6 @@ fn main() -> Result<(), anyhow::Error> { match opt.command { Tasks::Tap { device, src, dest } => audio_source::tap(&device, src, dest, host), - Tasks::Listen { device, addr } => listen(&device, addr, host), Tasks::Record { addr, duration_ms, diff --git a/src/rx_thread.rs b/src/rx_thread.rs deleted file mode 100644 index 1f2f44e..0000000 --- a/src/rx_thread.rs +++ /dev/null @@ -1,127 +0,0 @@ -use crate::tap_packet::TapPacketError; -use crate::udp_connection::OpusUdpConnection; -use crate::util::{CIRC_BUFFER_SIZE, ThreadMessage}; -use audiopus::coder::Decoder; -use circular_buffer::CircularBuffer; -use std::sync::mpsc::Sender; -use thiserror::Error; - -#[derive(Error, Debug)] -#[allow(clippy::enum_variant_names)] -pub enum Error { - #[error("IO error on Output: {0}")] - IoError(#[from] std::io::Error), - #[error("Decode error: {0}")] - DecodeError(#[from] audiopus::Error), - #[error("Tap packet error: {0}")] - TapPacketError(#[from] TapPacketError), -} - -pub struct RxThread { - sender: Sender, - decoder: Decoder, - opus_udp_connection: OpusUdpConnection, - circular_buffer: CircularBuffer, - expected_seq: u32, - exit: bool, -} - -//impl RxThread { -// pub fn new(sender: Sender, decoder: Decoder, opus_udp_connection: OpusUdpConnection) -> Self { -// Self { -// sender, -// decoder, -// opus_udp_connection, -// circular_buffer: Default::default(), -// expected_seq: 0, -// exit: false, -// } -// } -// -// fn decode(&mut self, input_buffer: &[u8], output_buffer: &mut [f32]) -> Result { -// let hdr = TapPacketHeader::from_buffer(input_buffer)?; -// -// if !hdr.check_acceptable_latency_tolerance() { -// warn!( -// "Packet outside of tolerance: packet.timestamp={} time={}", -// hdr.timestamp, -// Utc::now().timestamp() -// ); -// } -// -// if hdr.seq != self.expected_seq { -// warn!( -// "Found sequence mismatch, expected seq={}, {hdr:?}", -// self.expected_seq -// ); -// } -// -// (self.expected_seq, _) = hdr.seq.overflowing_add(1); -// -// let packet = Packet::try_from(&input_buffer[TapPacketHeader::serialized_size()..])?; -// -// let signals = MutSignals::try_from(output_buffer)?; -// Ok(self.decoder.decode_float(Some(packet), signals, false)?) -// } -// -// fn missing_packet(&mut self, output_buffer: &mut [f32]) -> Result { -// let signals = MutSignals::try_from(output_buffer)?; -// Ok(self.decoder.decode_float(None, signals, false)?) -// } -// -// fn write_frame_to_buffer(&mut self, frame: &[f32]) { -// self.circular_buffer.extend_from_slice(frame) -// } -// -// -// pub fn recv_packet(&mut self, input_buffer: Option<&[u8]>) -> Result<(), Error> { -// let mut output_buffer = vec![0.0; STEREO_FRAME_SIZE]; -// -// let size = match input_buffer { -// Some(buf) => self.decode(buf, &mut output_buffer)?, -// None => self.missing_packet(&mut output_buffer)?, -// }; -// -// self.write_frame_to_buffer(&output_buffer[..size]); -// -// Ok(()) -// } -// -// pub fn send_next_frame( -// &mut self, -// ) { -// let drain_count = output_buffer.len().min(self.circular_buffer.len()); -// -// let frame: Vec = self.circular_buffer.drain(..drain_count).collect(); -// -// for (ndx, output) in output_buffer.iter_mut().enumerate() { -// let value: SampleType = if ndx < frame.len() { -// SampleType::from_sample(frame[ndx]) -// } else { -// SampleType::from_sample(0.0f32) -// }; -// -// *output = value; -// } -// } -// -// fn handle_udp_msg(&mut self) -> Result<(), Error> { -// let mut udp_packet = vec![0; TAP_PACKET_MAX_DATA_LEN + TapPacketHeader::serialized_size()]; -// -// let size = self.opus_udp_connection.read(&mut udp_packet)?; -// -// let udp_packet = if size == 0 { -// None -// } -// else { -// Some(udp_packet.as_slice()) -// }; -// -// self.recv_packet(udp_packet)?; -// -// -// Ok(()) -// } -// -// pub fn thread(self) -//} diff --git a/src/tap_packet.rs b/src/tap_packet.rs deleted file mode 100644 index a49e4aa..0000000 --- a/src/tap_packet.rs +++ /dev/null @@ -1,52 +0,0 @@ -use chrono::Utc; -use thiserror::Error; - -#[derive(Error, Debug)] -pub enum TapPacketError { - #[error("Buffer is too small to serialize data into")] - BufTooSmall, -} - -#[derive(Debug, Clone)] -pub struct TapPacketHeader { - pub seq: u32, - pub timestamp: i64, -} - -impl TapPacketHeader { - pub fn write_to_buffer(&self, buf: &mut [u8]) -> Result<(), TapPacketError> { - if buf.len() < Self::serialized_size() { - return Err(TapPacketError::BufTooSmall); - } - - buf[0..size_of::()].copy_from_slice(self.seq.to_be_bytes().as_slice()); - buf[size_of::()..Self::serialized_size()] - .copy_from_slice(self.timestamp.to_be_bytes().as_slice()); - - Ok(()) - } - - pub fn check_acceptable_latency_tolerance(&self) -> bool { - Utc::now().timestamp().abs_diff(self.timestamp) < 1 - } - - pub fn from_buffer(buf: &[u8]) -> Result { - if buf.len() < Self::serialized_size() { - return Err(TapPacketError::BufTooSmall); - } - - let mut seq_bytes = [0u8; size_of::()]; - seq_bytes.copy_from_slice(&buf[0..size_of::()]); - let seq = u32::from_be_bytes(seq_bytes); - - let mut timestamp_bytes = [0u8; size_of::()]; - timestamp_bytes.copy_from_slice(&buf[size_of::()..Self::serialized_size()]); - let timestamp = i64::from_be_bytes(timestamp_bytes); - - Ok(Self { seq, timestamp }) - } - - pub fn serialized_size() -> usize { - size_of::() + size_of::() - } -} diff --git a/src/tx_thread.rs b/src/tx_thread.rs index 618dd8c..2f30575 100644 --- a/src/tx_thread.rs +++ b/src/tx_thread.rs @@ -1,26 +1,24 @@ -use crate::tap_packet::{TapPacketError, TapPacketHeader}; use crate::udp_connection::OpusUdpConnection; use crate::util::STEREO_FRAME_SIZE; use crate::util::{CIRC_BUFFER_SIZE, TAP_PACKET_MAX_DATA_LEN, ThreadMessage}; use audiopus::coder::Encoder; +use chrono::Utc; use circular_buffer::CircularBuffer; use log::{error, info, warn}; +use prost::Message; use std::io::Write; use std::sync::mpsc::{Receiver, RecvError}; +use tap_interface::tap::message::TapMessage; use thiserror::Error; #[derive(Error, Debug)] pub enum Error { #[error("Failed to receive message: {0}")] RecvFailed(#[from] RecvError), - #[error("Buffer too small to handle error")] - BufferTooSmall, #[error("IO error on offload: {0}")] - IoError(#[from] std::io::Error), + IoFailure(#[from] std::io::Error), #[error("Encode error: {0}")] - EncodeError(#[from] audiopus::Error), - #[error("Tap packet error: {0}")] - TapPacketError(#[from] TapPacketError), + EncodeFailure(#[from] audiopus::Error), } pub struct TxThread { @@ -54,21 +52,19 @@ impl TxThread { } fn send_opus_frame(&mut self) -> Result<(), Error> { - let mut output_buffer = - vec![0; TAP_PACKET_MAX_DATA_LEN + TapPacketHeader::serialized_size()]; - let size = self.encode(&mut output_buffer[TapPacketHeader::serialized_size()..])?; + let mut output_buffer = vec![0; TAP_PACKET_MAX_DATA_LEN]; + let size = self.encode(&mut output_buffer)?; - let hdr = TapPacketHeader { + output_buffer.truncate(size); + + let tap_message = TapMessage { seq: self.seq_count, - timestamp: chrono::Utc::now().timestamp(), + timestamp: Utc::now().timestamp() as u64, + audio_data: output_buffer, }; - hdr.write_to_buffer(&mut output_buffer)?; - - let total_size = TapPacketHeader::serialized_size() + size; - self.opus_udp_connection - .write_all(&output_buffer[0..total_size])?; + .write_all(tap_message.encode_to_vec().as_slice())?; (self.seq_count, _) = self.seq_count.overflowing_add(1); Ok(()) diff --git a/src/udp_connection.rs b/src/udp_connection.rs index 9692ed3..1c21e54 100644 --- a/src/udp_connection.rs +++ b/src/udp_connection.rs @@ -16,6 +16,7 @@ impl OpusUdpConnection { }) } + #[allow(dead_code)] pub fn set_read_timeout(&mut self, timeout: Option) -> std::io::Result<()> { self.udp_socket.set_read_timeout(timeout) } diff --git a/src/util.rs b/src/util.rs index a2ae540..ce254ce 100644 --- a/src/util.rs +++ b/src/util.rs @@ -9,6 +9,7 @@ pub const CIRC_BUFFER_SIZE: usize = STEREO_FRAME_SIZE * 10; pub const TAP_PACKET_MAX_DATA_LEN: usize = 32 * 1024; #[derive(Debug, Clone)] +#[allow(dead_code)] pub enum ThreadMessage { Data(Vec), Exit,