From 1f250f448d675035084c1c3b30b42b655f960b7f Mon Sep 17 00:00:00 2001 From: Joey Hines Date: Sat, 30 Aug 2025 18:58:38 -0600 Subject: [PATCH] Working with bot! + decode returns numbers of samples per channel, we were effectively chomping half the samples + needs a lot of cleanup --- Cargo.lock | 7 +++ Cargo.toml | 1 + src/audio_record.rs | 88 ++++++++++++++++++++++++++++++ src/audio_sink.rs | 4 +- src/audio_source.rs | 44 +++++++-------- src/input_buffer.rs | 2 +- src/main.rs | 28 +++++++--- src/output_buffer.rs | 85 ----------------------------- src/rx_thread.rs | 127 +++++++++++++++++++++++++++++++++++++++++++ src/tx_thread.rs | 111 +++++++++++++++++++++++++++++++++++++ src/util.rs | 14 +++++ 11 files changed, 390 insertions(+), 121 deletions(-) create mode 100644 src/audio_record.rs delete mode 100644 src/output_buffer.rs create mode 100644 src/rx_thread.rs create mode 100644 src/tx_thread.rs diff --git a/Cargo.lock b/Cargo.lock index b0675fa..be72d66 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -363,6 +363,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hound" +version = "3.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62adaabb884c94955b19907d60019f4e145d091c75345379e70d1ee696f7854f" + [[package]] name = "iana-time-zone" version = "0.1.63" @@ -787,6 +793,7 @@ dependencies = [ "clap", "cpal", "env_logger", + "hound", "log", "thiserror 2.0.12", ] diff --git a/Cargo.toml b/Cargo.toml index 37e1f06..e84f735 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,3 +13,4 @@ audiopus = "0.3.0-rc.0" thiserror = "2.0.12" circular-buffer = "1.1.0" chrono = "0.4.41" +hound = "3.5.1" diff --git a/src/audio_record.rs b/src/audio_record.rs new file mode 100644 index 0000000..b74f757 --- /dev/null +++ b/src/audio_record.rs @@ -0,0 +1,88 @@ +use crate::tap_packet::TapPacketHeader; +use crate::udp_connection::OpusUdpConnection; +use crate::util::{ + SAMPLE_RATE, SAMPLE_RATE_RAW, STEREO_FRAME_SIZE, TAP_PACKET_MAX_DATA_LEN, +}; +use anyhow::Error; +use audiopus::coder::Decoder; +use audiopus::packet::Packet; +use audiopus::{Channels, MutSignals}; +use chrono::Utc; +use log::{error, warn}; +use std::io::{ErrorKind, Read}; +use std::net::SocketAddr; +use std::path::Path; + +pub fn record(addr: SocketAddr, output_path: &Path, duration_ms: u64) -> Result<(), Error> { + let mut decoder = Decoder::new(SAMPLE_RATE, Channels::Stereo)?; + + let mut udp_connection = OpusUdpConnection::new(addr, addr)?; + + let spec = hound::WavSpec { + channels: 2, + sample_rate: SAMPLE_RATE_RAW as u32, + bits_per_sample: 32, + sample_format: hound::SampleFormat::Float, + }; + + let mut seq_count = 0; + + let mut writer = hound::WavWriter::create(output_path, spec)?; + + let deadline = std::time::Instant::now() + std::time::Duration::from_millis(duration_ms); + + while std::time::Instant::now() < deadline { + let mut packet = vec![0; TAP_PACKET_MAX_DATA_LEN + TapPacketHeader::serialized_size()]; + let mut output_buffer = vec![0.0; STEREO_FRAME_SIZE]; + let signals = MutSignals::try_from(&mut output_buffer)?; + + let new_packet = match udp_connection.read(&mut packet) { + Ok(data_len) => Some(&packet[0..data_len]), + Err(err) => { + if err.kind() == ErrorKind::WouldBlock { + None + } else { + error!("Got error trying to recv from UDP: {err}"); + continue; + } + } + }; + + let data_size = if let Some(new_packet) = new_packet { + let hdr = TapPacketHeader::from_buffer(new_packet)?; + + if !hdr.check_acceptable_latency_tolerance() { + warn!( + "Packet outside of tolerance: packet.timestamp={} time={}", + hdr.timestamp, + Utc::now().timestamp() + ); + } + + if hdr.seq != seq_count { + warn!( + "Found sequence mismatch, expected seq={}, {hdr:?}", + seq_count + ); + } + + (seq_count, _) = hdr.seq.overflowing_add(1); + + let packet = Packet::try_from(&new_packet[TapPacketHeader::serialized_size()..])?; + + decoder.decode_float(Some(packet), signals, false)? + } else { + decoder.decode_float(None, signals, false)? + } * 2; + + for samples in output_buffer[..data_size].chunks(2) { + for sample in samples { + writer.write_sample(*sample)?; + } + } + } + + writer.finalize()?; + + Ok(()) +} diff --git a/src/audio_sink.rs b/src/audio_sink.rs index 3464ee0..d564aea 100644 --- a/src/audio_sink.rs +++ b/src/audio_sink.rs @@ -1,6 +1,6 @@ use crate::input_buffer::InputBuffer; use crate::udp_connection::OpusUdpConnection; -use crate::{CIRC_BUFFER_SIZE, SAMPLE_RATE, SAMPLE_RATE_RAW, STEREO_FRAME_SIZE}; +use crate::util::{CIRC_BUFFER_SIZE, SAMPLE_RATE, SAMPLE_RATE_RAW, STEREO_FRAME_SIZE}; use anyhow::Error; use audiopus::Channels; use audiopus::coder::Decoder; @@ -122,7 +122,7 @@ fn rx_thread(ctx: Context) { info!("Starting RX thread..."); let mut udp_connection = OpusUdpConnection::new(ctx.addr, ctx.addr).unwrap(); udp_connection - .set_read_timeout(Some(Duration::from_millis(1000))) + .set_read_timeout(Some(Duration::from_millis(20))) .unwrap(); let mut packet = vec![0; 32 * 1024]; diff --git a/src/audio_source.rs b/src/audio_source.rs index 5211791..dd23104 100644 --- a/src/audio_source.rs +++ b/src/audio_source.rs @@ -1,8 +1,7 @@ -use crate::output_buffer::OutputBuffer; +use crate::tx_thread::TxThread; use crate::udp_connection::OpusUdpConnection; -use crate::{ - CIRC_BUFFER_SIZE, SAMPLE_RATE, SAMPLE_RATE_RAW, - STEREO_FRAME_SIZE, +use crate::util::{ + SAMPLE_RATE, SAMPLE_RATE_RAW, STEREO_FRAME_SIZE, ThreadMessage, }; use anyhow::Error; use audiopus::coder::Encoder; @@ -11,7 +10,8 @@ use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::{BufferSize, FromSample, Host, Sample, StreamConfig}; use log::{error, info}; use std::net::SocketAddr; -use std::sync::{Arc, Mutex}; +use std::sync::mpsc::{Sender, channel}; +use std::thread; pub fn tap( device: &str, @@ -36,7 +36,7 @@ pub fn tap( let mut stream_config = StreamConfig::from(config.clone()); stream_config.sample_rate = cpal::SampleRate(SAMPLE_RATE_RAW as u32); - stream_config.buffer_size = BufferSize::Fixed(STEREO_FRAME_SIZE as u32); + stream_config.buffer_size = BufferSize::Fixed(STEREO_FRAME_SIZE as u32 * 2); for cfg in device.supported_input_configs()? { info!("{:?}", cfg) @@ -50,39 +50,38 @@ pub fn tap( encoder.set_complexity(10)?; encoder.set_signal(Signal::Music)?; - let sound_buffer = Arc::new(Mutex::new(OutputBuffer::new( - STEREO_FRAME_SIZE, - encoder, - udp_connection, - ))); + let (sender, receiver) = channel(); + + let tx_thread = TxThread::new(receiver, encoder, udp_connection); + + thread::spawn(move || tx_thread.thread()); let err_fn = move |err| { error!("an error occurred on stream: {err}"); }; - let sound_buffer2 = sound_buffer.clone(); let stream = match config.sample_format() { cpal::SampleFormat::I8 => device.build_input_stream( &stream_config, - move |data, _: &_| write_input_data::(data, &sound_buffer2), + move |data, _: &_| write_input_data::(data, &sender), err_fn, None, )?, cpal::SampleFormat::I16 => device.build_input_stream( &stream_config, - move |data, _: &_| write_input_data::(data, &sound_buffer2), + move |data, _: &_| write_input_data::(data, &sender), err_fn, None, )?, cpal::SampleFormat::I32 => device.build_input_stream( &stream_config, - move |data, _: &_| write_input_data::(data, &sound_buffer2), + move |data, _: &_| write_input_data::(data, &sender), err_fn, None, )?, cpal::SampleFormat::F32 => device.build_input_stream( &stream_config, - move |data, _: &_| write_input_data::(data, &sound_buffer2), + move |data, _: &_| write_input_data::(data, &sender), err_fn, None, )?, @@ -96,23 +95,20 @@ pub fn tap( info!("Running tap..."); stream.play()?; - std::thread::park(); + thread::park(); info!("Exiting..."); Ok(()) } -fn write_input_data( - input: &[T], - ctx: &Arc>>, -) where +fn write_input_data(input: &[T], ctx: &Sender) +where T: Sample, f32: FromSample, { - let mut ctx = ctx.lock().expect("Mutex poisoned"); let sample: Vec = input.iter().map(|s| f32::from_sample(*s)).collect(); - if let Err(err) = ctx.write_data(&sample) { - error!("Got error when writing data to sound buffer: {err}") + if let Err(err) = ctx.send(ThreadMessage::Data(sample)) { + error!("Got error when sending data: {err}") } } diff --git a/src/input_buffer.rs b/src/input_buffer.rs index 861ab30..4810635 100644 --- a/src/input_buffer.rs +++ b/src/input_buffer.rs @@ -78,7 +78,7 @@ impl InputBuffer { let size = match input_buffer { Some(buf) => self.decode(buf, &mut output_buffer)?, None => self.missing_packet(&mut output_buffer)?, - }; + } * 2; self.write_frame_to_buffer(&output_buffer[..size]); diff --git a/src/main.rs b/src/main.rs index 1b7f1b9..b14e2d3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,23 +1,19 @@ +mod audio_record; mod audio_sink; mod audio_source; mod input_buffer; -mod output_buffer; +mod rx_thread; mod tap_packet; +mod tx_thread; mod udp_connection; mod util; +use crate::audio_record::record; use crate::audio_sink::listen; -use audiopus::SampleRate; use clap::{Parser, Subcommand}; use log::LevelFilter; use std::net::SocketAddr; - -pub const SAMPLE_RATE: SampleRate = SampleRate::Hz48000; -pub const SAMPLE_RATE_RAW: usize = 48_000; -pub const AUDIO_FRAME_RATE: usize = 50; -pub const MONO_FRAME_SIZE: usize = SAMPLE_RATE_RAW / AUDIO_FRAME_RATE; -pub const STEREO_FRAME_SIZE: usize = 2 * MONO_FRAME_SIZE; -pub const CIRC_BUFFER_SIZE: usize = STEREO_FRAME_SIZE * 50; +use std::path::PathBuf; #[derive(Parser, Debug)] #[command(version, about = "Tap sounds from a audio device and send them over a UDP socket", long_about = None)] @@ -47,6 +43,15 @@ enum Tasks { /// UDP addr to receive data on addr: SocketAddr, }, + + Record { + /// UDP addr to receive data on + addr: SocketAddr, + /// Number of MS to record for + duration_ms: u64, + /// Output file + output_file: PathBuf, + }, } fn main() -> Result<(), anyhow::Error> { @@ -61,5 +66,10 @@ fn main() -> Result<(), anyhow::Error> { match opt.command { Tasks::Tap { device, src, dest } => audio_source::tap(&device, src, dest, host), Tasks::Listen { device, addr } => listen(&device, addr, host), + Tasks::Record { + addr, + duration_ms, + output_file, + } => record(addr, &output_file, duration_ms), } } diff --git a/src/output_buffer.rs b/src/output_buffer.rs deleted file mode 100644 index e017ec0..0000000 --- a/src/output_buffer.rs +++ /dev/null @@ -1,85 +0,0 @@ -use crate::tap_packet::{TapPacketError, TapPacketHeader}; -use audiopus::coder::Encoder; -use circular_buffer::CircularBuffer; -use std::io::Write; -use thiserror::Error; - -#[derive(Error, Debug)] -#[allow(clippy::enum_variant_names)] -pub enum Error { - #[error("Buffer too small to handle error")] - BufferTooSmall, - #[error("IO error on offload: {0}")] - IoError(#[from] std::io::Error), - #[error("Encode error: {0}")] - EncodeError(#[from] audiopus::Error), - #[error("Tap packet error: {0}")] - TapPacketError(#[from] TapPacketError), -} - -#[derive(Debug)] -pub struct OutputBuffer { - circular_buffer: CircularBuffer, - frame_size: usize, - encoder: Encoder, - offload: T, - seq_count: u32, -} - -impl OutputBuffer { - pub fn new(frame_size: usize, encoder: Encoder, offload: T) -> Self { - Self { - circular_buffer: CircularBuffer::new(), - frame_size, - encoder, - offload, - seq_count: 0, - } - } - - fn encode(&mut self, output_buffer: &mut [u8]) -> Result { - let encode_buffer: Vec = self.circular_buffer.drain(0..self.frame_size).collect(); - Ok(self.encoder.encode_float(&encode_buffer, output_buffer)?) - } - - fn offload_buffer_data(&mut self) -> Result<(), Error> { - let mut output_buffer = vec![0; 32 * 1024 + TapPacketHeader::serialized_size()]; - let size = self.encode(&mut output_buffer[TapPacketHeader::serialized_size()..])?; - - let hdr = TapPacketHeader { - seq: self.seq_count, - timestamp: chrono::Utc::now().timestamp(), - }; - - hdr.write_to_buffer(&mut output_buffer)?; - - let total_size = TapPacketHeader::serialized_size() + size; - - self.offload.write_all(&output_buffer[0..total_size])?; - - (self.seq_count, _) = self.seq_count.overflowing_add(1); - Ok(()) - } - - pub fn write_data(&mut self, buf: &[f32]) -> Result { - if buf.len() > N { - return Err(Error::BufferTooSmall); - } - - self.circular_buffer.extend_from_slice(buf); - - if self.circular_buffer.len() > self.frame_size * 4 { - self.offload_buffer_data()?; - } - - Ok(buf.len()) - } -} - -impl Drop for OutputBuffer { - fn drop(&mut self) { - if self.circular_buffer.len() > N { - let _ = self.offload_buffer_data().is_ok(); - } - } -} diff --git a/src/rx_thread.rs b/src/rx_thread.rs new file mode 100644 index 0000000..1f2f44e --- /dev/null +++ b/src/rx_thread.rs @@ -0,0 +1,127 @@ +use crate::tap_packet::TapPacketError; +use crate::udp_connection::OpusUdpConnection; +use crate::util::{CIRC_BUFFER_SIZE, ThreadMessage}; +use audiopus::coder::Decoder; +use circular_buffer::CircularBuffer; +use std::sync::mpsc::Sender; +use thiserror::Error; + +#[derive(Error, Debug)] +#[allow(clippy::enum_variant_names)] +pub enum Error { + #[error("IO error on Output: {0}")] + IoError(#[from] std::io::Error), + #[error("Decode error: {0}")] + DecodeError(#[from] audiopus::Error), + #[error("Tap packet error: {0}")] + TapPacketError(#[from] TapPacketError), +} + +pub struct RxThread { + sender: Sender, + decoder: Decoder, + opus_udp_connection: OpusUdpConnection, + circular_buffer: CircularBuffer, + expected_seq: u32, + exit: bool, +} + +//impl RxThread { +// pub fn new(sender: Sender, decoder: Decoder, opus_udp_connection: OpusUdpConnection) -> Self { +// Self { +// sender, +// decoder, +// opus_udp_connection, +// circular_buffer: Default::default(), +// expected_seq: 0, +// exit: false, +// } +// } +// +// fn decode(&mut self, input_buffer: &[u8], output_buffer: &mut [f32]) -> Result { +// let hdr = TapPacketHeader::from_buffer(input_buffer)?; +// +// if !hdr.check_acceptable_latency_tolerance() { +// warn!( +// "Packet outside of tolerance: packet.timestamp={} time={}", +// hdr.timestamp, +// Utc::now().timestamp() +// ); +// } +// +// if hdr.seq != self.expected_seq { +// warn!( +// "Found sequence mismatch, expected seq={}, {hdr:?}", +// self.expected_seq +// ); +// } +// +// (self.expected_seq, _) = hdr.seq.overflowing_add(1); +// +// let packet = Packet::try_from(&input_buffer[TapPacketHeader::serialized_size()..])?; +// +// let signals = MutSignals::try_from(output_buffer)?; +// Ok(self.decoder.decode_float(Some(packet), signals, false)?) +// } +// +// fn missing_packet(&mut self, output_buffer: &mut [f32]) -> Result { +// let signals = MutSignals::try_from(output_buffer)?; +// Ok(self.decoder.decode_float(None, signals, false)?) +// } +// +// fn write_frame_to_buffer(&mut self, frame: &[f32]) { +// self.circular_buffer.extend_from_slice(frame) +// } +// +// +// pub fn recv_packet(&mut self, input_buffer: Option<&[u8]>) -> Result<(), Error> { +// let mut output_buffer = vec![0.0; STEREO_FRAME_SIZE]; +// +// let size = match input_buffer { +// Some(buf) => self.decode(buf, &mut output_buffer)?, +// None => self.missing_packet(&mut output_buffer)?, +// }; +// +// self.write_frame_to_buffer(&output_buffer[..size]); +// +// Ok(()) +// } +// +// pub fn send_next_frame( +// &mut self, +// ) { +// let drain_count = output_buffer.len().min(self.circular_buffer.len()); +// +// let frame: Vec = self.circular_buffer.drain(..drain_count).collect(); +// +// for (ndx, output) in output_buffer.iter_mut().enumerate() { +// let value: SampleType = if ndx < frame.len() { +// SampleType::from_sample(frame[ndx]) +// } else { +// SampleType::from_sample(0.0f32) +// }; +// +// *output = value; +// } +// } +// +// fn handle_udp_msg(&mut self) -> Result<(), Error> { +// let mut udp_packet = vec![0; TAP_PACKET_MAX_DATA_LEN + TapPacketHeader::serialized_size()]; +// +// let size = self.opus_udp_connection.read(&mut udp_packet)?; +// +// let udp_packet = if size == 0 { +// None +// } +// else { +// Some(udp_packet.as_slice()) +// }; +// +// self.recv_packet(udp_packet)?; +// +// +// Ok(()) +// } +// +// pub fn thread(self) +//} diff --git a/src/tx_thread.rs b/src/tx_thread.rs new file mode 100644 index 0000000..618dd8c --- /dev/null +++ b/src/tx_thread.rs @@ -0,0 +1,111 @@ +use crate::tap_packet::{TapPacketError, TapPacketHeader}; +use crate::udp_connection::OpusUdpConnection; +use crate::util::STEREO_FRAME_SIZE; +use crate::util::{CIRC_BUFFER_SIZE, TAP_PACKET_MAX_DATA_LEN, ThreadMessage}; +use audiopus::coder::Encoder; +use circular_buffer::CircularBuffer; +use log::{error, info, warn}; +use std::io::Write; +use std::sync::mpsc::{Receiver, RecvError}; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum Error { + #[error("Failed to receive message: {0}")] + RecvFailed(#[from] RecvError), + #[error("Buffer too small to handle error")] + BufferTooSmall, + #[error("IO error on offload: {0}")] + IoError(#[from] std::io::Error), + #[error("Encode error: {0}")] + EncodeError(#[from] audiopus::Error), + #[error("Tap packet error: {0}")] + TapPacketError(#[from] TapPacketError), +} + +pub struct TxThread { + receiver: Receiver, + encoder: Encoder, + opus_udp_connection: OpusUdpConnection, + circular_buffer: CircularBuffer, + seq_count: u32, + exit: bool, +} + +impl TxThread { + pub fn new( + receiver: Receiver, + encoder: Encoder, + opus_udp_connection: OpusUdpConnection, + ) -> Self { + Self { + receiver, + encoder, + opus_udp_connection, + circular_buffer: Default::default(), + seq_count: 0, + exit: false, + } + } + + fn encode(&mut self, output_buffer: &mut [u8]) -> Result { + let encode_buffer: Vec = self.circular_buffer.drain(0..STEREO_FRAME_SIZE).collect(); + Ok(self.encoder.encode_float(&encode_buffer, output_buffer)?) + } + + fn send_opus_frame(&mut self) -> Result<(), Error> { + let mut output_buffer = + vec![0; TAP_PACKET_MAX_DATA_LEN + TapPacketHeader::serialized_size()]; + let size = self.encode(&mut output_buffer[TapPacketHeader::serialized_size()..])?; + + let hdr = TapPacketHeader { + seq: self.seq_count, + timestamp: chrono::Utc::now().timestamp(), + }; + + hdr.write_to_buffer(&mut output_buffer)?; + + let total_size = TapPacketHeader::serialized_size() + size; + + self.opus_udp_connection + .write_all(&output_buffer[0..total_size])?; + + (self.seq_count, _) = self.seq_count.overflowing_add(1); + Ok(()) + } + + fn handle_data_msg(&mut self, data: Vec) -> Result<(), Error> { + if self.circular_buffer.len() + data.len() > CIRC_BUFFER_SIZE { + warn!("Will drop data in circular buffer"); + } + + self.circular_buffer.extend_from_slice(&data); + + if self.circular_buffer.len() >= STEREO_FRAME_SIZE { + self.send_opus_frame()?; + } + + Ok(()) + } + + fn handle_msg(&mut self) -> Result<(), Error> { + let msg = self.receiver.recv()?; + + match msg { + ThreadMessage::Data(data) => self.handle_data_msg(data), + ThreadMessage::Exit => { + self.exit = true; + Ok(()) + } + } + } + + pub fn thread(mut self) { + info!("Starting TX Thread..."); + loop { + if let Err(err) = self.handle_msg() { + error!("Failed to process message: {err:?}") + } + } + } +} diff --git a/src/util.rs b/src/util.rs index 8b13789..a2ae540 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1 +1,15 @@ +use audiopus::SampleRate; +pub const SAMPLE_RATE: SampleRate = SampleRate::Hz48000; +pub const SAMPLE_RATE_RAW: usize = 48_000; +pub const AUDIO_FRAME_RATE: usize = 50; +pub const MONO_FRAME_SIZE: usize = SAMPLE_RATE_RAW / AUDIO_FRAME_RATE; +pub const STEREO_FRAME_SIZE: usize = 2 * MONO_FRAME_SIZE; +pub const CIRC_BUFFER_SIZE: usize = STEREO_FRAME_SIZE * 10; +pub const TAP_PACKET_MAX_DATA_LEN: usize = 32 * 1024; + +#[derive(Debug, Clone)] +pub enum ThreadMessage { + Data(Vec), + Exit, +}