diff --git a/src/audio_sink.rs b/src/audio_sink.rs new file mode 100644 index 0000000..98fd4e5 --- /dev/null +++ b/src/audio_sink.rs @@ -0,0 +1,157 @@ +use crate::input_buffer::InputBuffer; +use crate::udp_connection::OpusUdpConnection; +use crate::{CIRC_BUFFER_SIZE, SAMPLE_RATE, STEREO_FRAME_SIZE}; +use anyhow::Error; +use audiopus::Channels; +use audiopus::coder::Decoder; +use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; +use cpal::{Device, FromSample, Host, I24, Sample, SizedSample, SupportedStreamConfig}; +use log::{error, info}; +use std::io::{ErrorKind, Read}; +use std::net::SocketAddr; +use std::sync::{Arc, Condvar, Mutex}; +use std::thread; +use std::time::Duration; + +const INPUT_BUFFER_SIZE: usize = CIRC_BUFFER_SIZE; + +#[derive(Debug, Clone)] +struct Context { + pub addr: SocketAddr, + pub input_buffer: Arc>>, + pub data_rady: Arc, +} + +pub fn listen(device: &str, addr: SocketAddr, host: Host) -> Result<(), Error> { + // Set up the input device and stream with the default input config. + let device = if device == "default" { + host.default_output_device() + } else { + host.output_devices()? + .find(|x| x.name().map(|y| y == device).unwrap_or(false)) + } + .expect("failed to find output device"); + + info!("Output device: {}", device.name()?); + let config = device.default_output_config()?; + println!("Default output config : {config:?}"); + + let decoder = Decoder::new(SAMPLE_RATE, Channels::Stereo)?; + + let input_buffer = InputBuffer::new(STEREO_FRAME_SIZE, decoder); + + let ctx = Context { + addr, + input_buffer: Arc::new(Mutex::new(input_buffer)), + data_rady: Arc::new(Condvar::new()), + }; + + let stream = stream_setup_for(device, config, ctx)?; + + stream.play()?; + + thread::park(); + + Ok(()) +} + +fn stream_setup_for( + device: Device, + config: SupportedStreamConfig, + ctx: Context, +) -> Result +where +{ + match config.sample_format() { + cpal::SampleFormat::I8 => make_stream::(&device, &config.into(), ctx), + cpal::SampleFormat::I16 => make_stream::(&device, &config.into(), ctx), + cpal::SampleFormat::I24 => make_stream::(&device, &config.into(), ctx), + cpal::SampleFormat::I32 => make_stream::(&device, &config.into(), ctx), + cpal::SampleFormat::I64 => make_stream::(&device, &config.into(), ctx), + cpal::SampleFormat::U8 => make_stream::(&device, &config.into(), ctx), + cpal::SampleFormat::U16 => make_stream::(&device, &config.into(), ctx), + cpal::SampleFormat::U32 => make_stream::(&device, &config.into(), ctx), + cpal::SampleFormat::U64 => make_stream::(&device, &config.into(), ctx), + cpal::SampleFormat::F32 => make_stream::(&device, &config.into(), ctx), + cpal::SampleFormat::F64 => make_stream::(&device, &config.into(), ctx), + sample_format => Err(anyhow::Error::msg(format!( + "Unsupported sample format '{sample_format}'" + ))), + } +} + +fn make_stream( + device: &Device, + config: &cpal::StreamConfig, + ctx: Context, +) -> Result +where + T: SizedSample + FromSample, +{ + let err_fn = |err| error!("Error building output sound stream: {err}"); + + let time_at_start = std::time::Instant::now(); + info!("Time at start: {time_at_start:?}"); + + let ctx_recv_thread = ctx.clone(); + thread::spawn(move || rx_thread(ctx_recv_thread)); + + let stream = device.build_output_stream( + config, + move |output: &mut [T], _: &cpal::OutputCallbackInfo| process_frame(output, &ctx), + err_fn, + None, + )?; + + Ok(stream) +} + +fn rx_thread(ctx: Context) { + info!("Starting RX thread..."); + let mut udp_connection = OpusUdpConnection::new(ctx.addr, ctx.addr).unwrap(); + udp_connection + .set_read_timeout(Some(Duration::from_millis(1000))) + .unwrap(); + let mut packet = vec![0; 32 * 1024]; + + loop { + let new_packet = match udp_connection.read(&mut packet) { + Ok(data_len) => Some(&packet[0..data_len]), + Err(err) => { + if err.kind() == ErrorKind::WouldBlock { + None + } else { + error!("Got error trying to recv from UDP: {err}"); + continue; + } + } + }; + + let mut input_buffer = ctx.input_buffer.lock().expect("Mutex poisoned"); + + if let Err(err) = input_buffer.recv_packet(new_packet) { + error!("Adding packet to buffer: {err}") + } else { + ctx.data_rady.notify_all(); + } + } +} + +fn process_frame(output: &mut [SampleType], ctx: &Context) +where + SampleType: Sample + FromSample, +{ + let mut input_buffer = ctx.input_buffer.lock().expect("Mutex poisoned"); + + let mut input_buffer = if input_buffer.samples_in_buffer() < output.len() { + while input_buffer.samples_in_buffer() < output.len() { + input_buffer = ctx.data_rady.wait(input_buffer).expect("Mutex poisoned"); + } + + input_buffer + } else { + input_buffer + }; + + input_buffer.read_frame_from_buffer(output) +} diff --git a/src/audio_source.rs b/src/audio_source.rs new file mode 100644 index 0000000..40b63b3 --- /dev/null +++ b/src/audio_source.rs @@ -0,0 +1,108 @@ +use crate::output_buffer::OutputBuffer; +use crate::udp_connection::OpusUdpConnection; +use crate::{CIRC_BUFFER_SIZE, SAMPLE_RATE, SAMPLE_RATE_RAW, STEREO_FRAME_SIZE}; +use anyhow::Error; +use audiopus::coder::Encoder; +use audiopus::{Application, Channels}; +use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; +use cpal::{FromSample, Host, Sample, StreamConfig}; +use log::{error, info}; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; + +pub fn tap( + device: &str, + src_addr: SocketAddr, + dest_addr: SocketAddr, + host: Host, +) -> Result<(), Error> { + // Set up the input device and stream with the default input config. + let device = if device == "default" { + host.default_input_device() + } else { + host.input_devices()? + .find(|x| x.name().map(|y| y == device).unwrap_or(false)) + } + .expect("failed to find input device"); + + info!("Input device: {}", device.name()?); + + let config = device + .default_input_config() + .expect("Failed to get default input config"); + + let mut stream_config = StreamConfig::from(config.clone()); + stream_config.sample_rate = cpal::SampleRate(SAMPLE_RATE_RAW as u32); + + info!("Stream config: {stream_config:?}"); + + let udp_connection = OpusUdpConnection::new(src_addr, dest_addr)?; + let encoder = Encoder::new(SAMPLE_RATE, Channels::Stereo, Application::Audio)?; + + let sound_buffer = Arc::new(Mutex::new(OutputBuffer::new( + STEREO_FRAME_SIZE, + encoder, + udp_connection, + ))); + + let err_fn = move |err| { + error!("an error occurred on stream: {err}"); + }; + + let sound_buffer2 = sound_buffer.clone(); + let stream = match config.sample_format() { + cpal::SampleFormat::I8 => device.build_input_stream( + &stream_config, + move |data, _: &_| write_input_data::(data, &sound_buffer2), + err_fn, + None, + )?, + cpal::SampleFormat::I16 => device.build_input_stream( + &stream_config, + move |data, _: &_| write_input_data::(data, &sound_buffer2), + err_fn, + None, + )?, + cpal::SampleFormat::I32 => device.build_input_stream( + &stream_config, + move |data, _: &_| write_input_data::(data, &sound_buffer2), + err_fn, + None, + )?, + cpal::SampleFormat::F32 => device.build_input_stream( + &stream_config, + move |data, _: &_| write_input_data::(data, &sound_buffer2), + err_fn, + None, + )?, + sample_format => { + return Err(anyhow::Error::msg(format!( + "Unsupported sample format '{sample_format}'" + ))); + } + }; + + info!("Running tap..."); + stream.play()?; + + std::thread::park(); + + info!("Exiting..."); + Ok(()) +} + +fn write_input_data( + input: &[T], + ctx: &Arc>>, +) where + T: Sample, + f32: FromSample, +{ + if let Ok(mut ctx) = ctx.try_lock() { + let sample: Vec = input.iter().map(|s| f32::from_sample(*s)).collect(); + + if let Err(err) = ctx.write_data(&sample) { + error!("Got error when writing data to sound buffer: {err}") + } + } +} diff --git a/src/input_buffer.rs b/src/input_buffer.rs new file mode 100644 index 0000000..90b0446 --- /dev/null +++ b/src/input_buffer.rs @@ -0,0 +1,84 @@ +use audiopus::MutSignals; +use audiopus::coder::Decoder; +use audiopus::packet::Packet; +use circular_buffer::CircularBuffer; +use cpal::{FromSample, Sample}; +use thiserror::Error; + +#[derive(Error, Debug)] +#[allow(clippy::enum_variant_names)] +pub enum Error { + #[error("IO error on Output: {0}")] + IoError(#[from] std::io::Error), + #[error("Decode error: {0}")] + DecodeError(#[from] audiopus::Error), +} + +#[derive(Debug)] +pub struct InputBuffer { + circular_buffer: CircularBuffer, + frame_size: usize, + decoder: Decoder, +} + +impl InputBuffer { + pub fn new(frame_size: usize, decoder: Decoder) -> Self { + Self { + circular_buffer: CircularBuffer::new(), + frame_size, + decoder, + } + } + + fn decode(&mut self, input_buffer: &[u8], output_buffer: &mut [f32]) -> Result { + let packet = Packet::try_from(input_buffer)?; + + let signals = MutSignals::try_from(output_buffer)?; + Ok(self.decoder.decode_float(Some(packet), signals, false)?) + } + + fn missing_packet(&mut self, output_buffer: &mut [f32]) -> Result { + let signals = MutSignals::try_from(output_buffer)?; + Ok(self.decoder.decode_float(None, signals, false)?) + } + + fn write_frame_to_buffer(&mut self, frame: &[f32]) { + self.circular_buffer.extend_from_slice(frame) + } + + pub fn recv_packet(&mut self, input_buffer: Option<&[u8]>) -> Result<(), Error> { + let mut output_buffer = vec![0.0; self.frame_size]; + + let size = match input_buffer { + Some(buf) => self.decode(buf, &mut output_buffer)?, + None => self.missing_packet(&mut output_buffer)?, + }; + + self.write_frame_to_buffer(&output_buffer[..size]); + + Ok(()) + } + + pub fn samples_in_buffer(&self) -> usize { + self.circular_buffer.len() + } + + pub fn read_frame_from_buffer>( + &mut self, + output_buffer: &mut [SampleType], + ) { + let drain_count = output_buffer.len().min(self.circular_buffer.len()); + + let frame: Vec = self.circular_buffer.drain(..drain_count).collect(); + + for (ndx, output) in output_buffer.iter_mut().enumerate() { + let value: SampleType = if ndx < frame.len() { + SampleType::from_sample(frame[ndx]) + } else { + SampleType::from_sample(0.0f32) + }; + + *output = value; + } + } +} diff --git a/src/main.rs b/src/main.rs index e1b7420..36d6611 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,34 +1,51 @@ -mod sound_buffer; +mod audio_sink; +mod audio_source; +mod input_buffer; +mod output_buffer; mod udp_connection; +mod util; -use crate::sound_buffer::SoundBuffer; -use crate::udp_connection::OpusUdpConnection; -use audiopus::coder::Encoder; -use audiopus::{Application, Channels, SampleRate}; -use clap::Parser; -use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; -use cpal::{FromSample, Sample, StreamConfig}; -use log::{LevelFilter, error, info}; +use crate::audio_sink::listen; +use audiopus::SampleRate; +use clap::{Parser, Subcommand}; +use log::LevelFilter; use std::net::SocketAddr; -use std::sync::{Arc, Mutex}; pub const SAMPLE_RATE: SampleRate = SampleRate::Hz48000; pub const SAMPLE_RATE_RAW: usize = 48_000; pub const AUDIO_FRAME_RATE: usize = 50; pub const MONO_FRAME_SIZE: usize = SAMPLE_RATE_RAW / AUDIO_FRAME_RATE; pub const STEREO_FRAME_SIZE: usize = 2 * MONO_FRAME_SIZE; -pub const CIRC_BUFFER_SIZE: usize = STEREO_FRAME_SIZE * 4; +pub const CIRC_BUFFER_SIZE: usize = STEREO_FRAME_SIZE * 50; #[derive(Parser, Debug)] #[command(version, about = "Tap sounds from a audio device and send them over a UDP socket", long_about = None)] struct Opt { - /// The audio device to use - #[arg(short, long, default_value_t = String::from("default"))] - device: String, - /// Src UDP socket addr to send data from - src: SocketAddr, - /// UDP socket addr to connect to - dest: SocketAddr, + #[command(subcommand)] + command: Tasks, +} + +#[derive(Subcommand, Debug, Clone)] +enum Tasks { + /// Tap an Audio source to send over the network + Tap { + /// The audio device to use + #[arg(short, long, default_value_t = String::from("default"))] + device: String, + /// Src UDP socket addr to send data from + src: SocketAddr, + /// UDP socket addr to connect to + dest: SocketAddr, + }, + + /// Listen to data from a tap + Listen { + /// The audio device to use + #[arg(short, long, default_value_t = String::from("default"))] + device: String, + /// UDP addr to receive data on + addr: SocketAddr, + }, } fn main() -> Result<(), anyhow::Error> { @@ -40,92 +57,8 @@ fn main() -> Result<(), anyhow::Error> { let opt = Opt::parse(); let host = cpal::default_host(); - // Set up the input device and stream with the default input config. - let device = if opt.device == "default" { - host.default_input_device() - } else { - host.input_devices()? - .find(|x| x.name().map(|y| y == opt.device).unwrap_or(false)) - } - .expect("failed to find input device"); - - info!("Input device: {}", device.name()?); - - let config = device - .default_input_config() - .expect("Failed to get default input config"); - - let mut stream_config = StreamConfig::from(config.clone()); - - stream_config.sample_rate = cpal::SampleRate(SAMPLE_RATE_RAW as u32); - - let udp_connection = OpusUdpConnection::new(opt.src, opt.dest)?; - let encoder = Encoder::new(SAMPLE_RATE, Channels::Stereo, Application::Audio)?; - - let sound_buffer = Arc::new(Mutex::new(SoundBuffer::new( - STEREO_FRAME_SIZE, - encoder, - udp_connection, - ))); - - let err_fn = move |err| { - error!("an error occurred on stream: {err}"); - }; - - let sound_buffer2 = sound_buffer.clone(); - let stream = match config.sample_format() { - cpal::SampleFormat::I8 => device.build_input_stream( - &stream_config, - move |data, _: &_| write_input_data::(data, &sound_buffer2), - err_fn, - None, - )?, - cpal::SampleFormat::I16 => device.build_input_stream( - &stream_config, - move |data, _: &_| write_input_data::(data, &sound_buffer2), - err_fn, - None, - )?, - cpal::SampleFormat::I32 => device.build_input_stream( - &stream_config, - move |data, _: &_| write_input_data::(data, &sound_buffer2), - err_fn, - None, - )?, - cpal::SampleFormat::F32 => device.build_input_stream( - &stream_config, - move |data, _: &_| write_input_data::(data, &sound_buffer2), - err_fn, - None, - )?, - sample_format => { - return Err(anyhow::Error::msg(format!( - "Unsupported sample format '{sample_format}'" - ))); - } - }; - - info!("Running tap..."); - stream.play()?; - - std::thread::park(); - - info!("Exiting..."); - Ok(()) -} - -fn write_input_data( - input: &[T], - ctx: &Arc>>, -) where - T: Sample, - f32: FromSample, -{ - if let Ok(mut ctx) = ctx.try_lock() { - let sample: Vec = input.iter().map(|s| f32::from_sample(*s)).collect(); - - if let Err(err) = ctx.write_data(&sample) { - error!("Got error when writing data to sound buffer: {err}") - } + match opt.command { + Tasks::Tap { device, src, dest } => audio_source::tap(&device, src, dest, host), + Tasks::Listen { device, addr } => listen(&device, addr, host), } } diff --git a/src/sound_buffer.rs b/src/output_buffer.rs similarity index 86% rename from src/sound_buffer.rs rename to src/output_buffer.rs index fc3fa86..10c7e12 100644 --- a/src/sound_buffer.rs +++ b/src/output_buffer.rs @@ -15,14 +15,14 @@ pub enum Error { } #[derive(Debug)] -pub struct SoundBuffer { +pub struct OutputBuffer { circular_buffer: CircularBuffer, frame_size: usize, encoder: Encoder, offload: T, } -impl SoundBuffer { +impl OutputBuffer { pub fn new(frame_size: usize, encoder: Encoder, offload: T) -> Self { Self { circular_buffer: CircularBuffer::new(), @@ -46,13 +46,13 @@ impl SoundBuffer { } pub fn write_data(&mut self, buf: &[f32]) -> Result { - if buf.len() > N * 4 { + if buf.len() > N { return Err(Error::BufferTooSmall); } self.circular_buffer.extend_from_slice(buf); - if self.circular_buffer.len() > self.frame_size { + if self.circular_buffer.len() > self.frame_size * 4 { self.offload_buffer_data()?; } @@ -60,7 +60,7 @@ impl SoundBuffer { } } -impl Drop for SoundBuffer { +impl Drop for OutputBuffer { fn drop(&mut self) { if self.circular_buffer.len() > N { let _ = self.offload_buffer_data().is_ok(); diff --git a/src/udp_connection.rs b/src/udp_connection.rs index 95428b1..9692ed3 100644 --- a/src/udp_connection.rs +++ b/src/udp_connection.rs @@ -1,5 +1,6 @@ -use std::io::Write; +use std::io::{Read, Write}; use std::net::{SocketAddr, UdpSocket}; +use std::time::Duration; #[derive(Debug)] pub struct OpusUdpConnection { @@ -14,6 +15,10 @@ impl OpusUdpConnection { dest, }) } + + pub fn set_read_timeout(&mut self, timeout: Option) -> std::io::Result<()> { + self.udp_socket.set_read_timeout(timeout) + } } impl Write for OpusUdpConnection { @@ -25,3 +30,9 @@ impl Write for OpusUdpConnection { Ok(()) } } + +impl Read for OpusUdpConnection { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + self.udp_socket.recv(buf) + } +} diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/util.rs @@ -0,0 +1 @@ +