Kinda working tap/listen

+ Audio quality is bad and pretty choppy
This commit is contained in:
Joey Hines 2025-08-23 18:23:37 -06:00
parent 009110e8e0
commit ccedbe0fb9
Signed by: joeyahines
GPG Key ID: 38BA6F25C94C9382
7 changed files with 405 additions and 111 deletions

157
src/audio_sink.rs Normal file
View File

@ -0,0 +1,157 @@
use crate::input_buffer::InputBuffer;
use crate::udp_connection::OpusUdpConnection;
use crate::{CIRC_BUFFER_SIZE, SAMPLE_RATE, STEREO_FRAME_SIZE};
use anyhow::Error;
use audiopus::Channels;
use audiopus::coder::Decoder;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{Device, FromSample, Host, I24, Sample, SizedSample, SupportedStreamConfig};
use log::{error, info};
use std::io::{ErrorKind, Read};
use std::net::SocketAddr;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
const INPUT_BUFFER_SIZE: usize = CIRC_BUFFER_SIZE;
#[derive(Debug, Clone)]
struct Context {
pub addr: SocketAddr,
pub input_buffer: Arc<Mutex<InputBuffer<INPUT_BUFFER_SIZE>>>,
pub data_rady: Arc<Condvar>,
}
pub fn listen(device: &str, addr: SocketAddr, host: Host) -> Result<(), Error> {
// Set up the input device and stream with the default input config.
let device = if device == "default" {
host.default_output_device()
} else {
host.output_devices()?
.find(|x| x.name().map(|y| y == device).unwrap_or(false))
}
.expect("failed to find output device");
info!("Output device: {}", device.name()?);
let config = device.default_output_config()?;
println!("Default output config : {config:?}");
let decoder = Decoder::new(SAMPLE_RATE, Channels::Stereo)?;
let input_buffer = InputBuffer::new(STEREO_FRAME_SIZE, decoder);
let ctx = Context {
addr,
input_buffer: Arc::new(Mutex::new(input_buffer)),
data_rady: Arc::new(Condvar::new()),
};
let stream = stream_setup_for(device, config, ctx)?;
stream.play()?;
thread::park();
Ok(())
}
fn stream_setup_for(
device: Device,
config: SupportedStreamConfig,
ctx: Context,
) -> Result<cpal::Stream, anyhow::Error>
where
{
match config.sample_format() {
cpal::SampleFormat::I8 => make_stream::<i8>(&device, &config.into(), ctx),
cpal::SampleFormat::I16 => make_stream::<i16>(&device, &config.into(), ctx),
cpal::SampleFormat::I24 => make_stream::<I24>(&device, &config.into(), ctx),
cpal::SampleFormat::I32 => make_stream::<i32>(&device, &config.into(), ctx),
cpal::SampleFormat::I64 => make_stream::<i64>(&device, &config.into(), ctx),
cpal::SampleFormat::U8 => make_stream::<u8>(&device, &config.into(), ctx),
cpal::SampleFormat::U16 => make_stream::<u16>(&device, &config.into(), ctx),
cpal::SampleFormat::U32 => make_stream::<u32>(&device, &config.into(), ctx),
cpal::SampleFormat::U64 => make_stream::<u64>(&device, &config.into(), ctx),
cpal::SampleFormat::F32 => make_stream::<f32>(&device, &config.into(), ctx),
cpal::SampleFormat::F64 => make_stream::<f64>(&device, &config.into(), ctx),
sample_format => Err(anyhow::Error::msg(format!(
"Unsupported sample format '{sample_format}'"
))),
}
}
fn make_stream<T>(
device: &Device,
config: &cpal::StreamConfig,
ctx: Context,
) -> Result<cpal::Stream, anyhow::Error>
where
T: SizedSample + FromSample<f32>,
{
let err_fn = |err| error!("Error building output sound stream: {err}");
let time_at_start = std::time::Instant::now();
info!("Time at start: {time_at_start:?}");
let ctx_recv_thread = ctx.clone();
thread::spawn(move || rx_thread(ctx_recv_thread));
let stream = device.build_output_stream(
config,
move |output: &mut [T], _: &cpal::OutputCallbackInfo| process_frame(output, &ctx),
err_fn,
None,
)?;
Ok(stream)
}
fn rx_thread(ctx: Context) {
info!("Starting RX thread...");
let mut udp_connection = OpusUdpConnection::new(ctx.addr, ctx.addr).unwrap();
udp_connection
.set_read_timeout(Some(Duration::from_millis(1000)))
.unwrap();
let mut packet = vec![0; 32 * 1024];
loop {
let new_packet = match udp_connection.read(&mut packet) {
Ok(data_len) => Some(&packet[0..data_len]),
Err(err) => {
if err.kind() == ErrorKind::WouldBlock {
None
} else {
error!("Got error trying to recv from UDP: {err}");
continue;
}
}
};
let mut input_buffer = ctx.input_buffer.lock().expect("Mutex poisoned");
if let Err(err) = input_buffer.recv_packet(new_packet) {
error!("Adding packet to buffer: {err}")
} else {
ctx.data_rady.notify_all();
}
}
}
fn process_frame<SampleType>(output: &mut [SampleType], ctx: &Context)
where
SampleType: Sample + FromSample<f32>,
{
let mut input_buffer = ctx.input_buffer.lock().expect("Mutex poisoned");
let mut input_buffer = if input_buffer.samples_in_buffer() < output.len() {
while input_buffer.samples_in_buffer() < output.len() {
input_buffer = ctx.data_rady.wait(input_buffer).expect("Mutex poisoned");
}
input_buffer
} else {
input_buffer
};
input_buffer.read_frame_from_buffer(output)
}

108
src/audio_source.rs Normal file
View File

@ -0,0 +1,108 @@
use crate::output_buffer::OutputBuffer;
use crate::udp_connection::OpusUdpConnection;
use crate::{CIRC_BUFFER_SIZE, SAMPLE_RATE, SAMPLE_RATE_RAW, STEREO_FRAME_SIZE};
use anyhow::Error;
use audiopus::coder::Encoder;
use audiopus::{Application, Channels};
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{FromSample, Host, Sample, StreamConfig};
use log::{error, info};
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
pub fn tap(
device: &str,
src_addr: SocketAddr,
dest_addr: SocketAddr,
host: Host,
) -> Result<(), Error> {
// Set up the input device and stream with the default input config.
let device = if device == "default" {
host.default_input_device()
} else {
host.input_devices()?
.find(|x| x.name().map(|y| y == device).unwrap_or(false))
}
.expect("failed to find input device");
info!("Input device: {}", device.name()?);
let config = device
.default_input_config()
.expect("Failed to get default input config");
let mut stream_config = StreamConfig::from(config.clone());
stream_config.sample_rate = cpal::SampleRate(SAMPLE_RATE_RAW as u32);
info!("Stream config: {stream_config:?}");
let udp_connection = OpusUdpConnection::new(src_addr, dest_addr)?;
let encoder = Encoder::new(SAMPLE_RATE, Channels::Stereo, Application::Audio)?;
let sound_buffer = Arc::new(Mutex::new(OutputBuffer::new(
STEREO_FRAME_SIZE,
encoder,
udp_connection,
)));
let err_fn = move |err| {
error!("an error occurred on stream: {err}");
};
let sound_buffer2 = sound_buffer.clone();
let stream = match config.sample_format() {
cpal::SampleFormat::I8 => device.build_input_stream(
&stream_config,
move |data, _: &_| write_input_data::<CIRC_BUFFER_SIZE, i8>(data, &sound_buffer2),
err_fn,
None,
)?,
cpal::SampleFormat::I16 => device.build_input_stream(
&stream_config,
move |data, _: &_| write_input_data::<CIRC_BUFFER_SIZE, i16>(data, &sound_buffer2),
err_fn,
None,
)?,
cpal::SampleFormat::I32 => device.build_input_stream(
&stream_config,
move |data, _: &_| write_input_data::<CIRC_BUFFER_SIZE, i32>(data, &sound_buffer2),
err_fn,
None,
)?,
cpal::SampleFormat::F32 => device.build_input_stream(
&stream_config,
move |data, _: &_| write_input_data::<CIRC_BUFFER_SIZE, f32>(data, &sound_buffer2),
err_fn,
None,
)?,
sample_format => {
return Err(anyhow::Error::msg(format!(
"Unsupported sample format '{sample_format}'"
)));
}
};
info!("Running tap...");
stream.play()?;
std::thread::park();
info!("Exiting...");
Ok(())
}
fn write_input_data<const N: usize, T>(
input: &[T],
ctx: &Arc<Mutex<OutputBuffer<N, OpusUdpConnection>>>,
) where
T: Sample,
f32: FromSample<T>,
{
if let Ok(mut ctx) = ctx.try_lock() {
let sample: Vec<f32> = input.iter().map(|s| f32::from_sample(*s)).collect();
if let Err(err) = ctx.write_data(&sample) {
error!("Got error when writing data to sound buffer: {err}")
}
}
}

84
src/input_buffer.rs Normal file
View File

@ -0,0 +1,84 @@
use audiopus::MutSignals;
use audiopus::coder::Decoder;
use audiopus::packet::Packet;
use circular_buffer::CircularBuffer;
use cpal::{FromSample, Sample};
use thiserror::Error;
#[derive(Error, Debug)]
#[allow(clippy::enum_variant_names)]
pub enum Error {
#[error("IO error on Output: {0}")]
IoError(#[from] std::io::Error),
#[error("Decode error: {0}")]
DecodeError(#[from] audiopus::Error),
}
#[derive(Debug)]
pub struct InputBuffer<const N: usize> {
circular_buffer: CircularBuffer<N, f32>,
frame_size: usize,
decoder: Decoder,
}
impl<const N: usize> InputBuffer<N> {
pub fn new(frame_size: usize, decoder: Decoder) -> Self {
Self {
circular_buffer: CircularBuffer::new(),
frame_size,
decoder,
}
}
fn decode(&mut self, input_buffer: &[u8], output_buffer: &mut [f32]) -> Result<usize, Error> {
let packet = Packet::try_from(input_buffer)?;
let signals = MutSignals::try_from(output_buffer)?;
Ok(self.decoder.decode_float(Some(packet), signals, false)?)
}
fn missing_packet(&mut self, output_buffer: &mut [f32]) -> Result<usize, Error> {
let signals = MutSignals::try_from(output_buffer)?;
Ok(self.decoder.decode_float(None, signals, false)?)
}
fn write_frame_to_buffer(&mut self, frame: &[f32]) {
self.circular_buffer.extend_from_slice(frame)
}
pub fn recv_packet(&mut self, input_buffer: Option<&[u8]>) -> Result<(), Error> {
let mut output_buffer = vec![0.0; self.frame_size];
let size = match input_buffer {
Some(buf) => self.decode(buf, &mut output_buffer)?,
None => self.missing_packet(&mut output_buffer)?,
};
self.write_frame_to_buffer(&output_buffer[..size]);
Ok(())
}
pub fn samples_in_buffer(&self) -> usize {
self.circular_buffer.len()
}
pub fn read_frame_from_buffer<SampleType: Sample + FromSample<f32>>(
&mut self,
output_buffer: &mut [SampleType],
) {
let drain_count = output_buffer.len().min(self.circular_buffer.len());
let frame: Vec<f32> = self.circular_buffer.drain(..drain_count).collect();
for (ndx, output) in output_buffer.iter_mut().enumerate() {
let value: SampleType = if ndx < frame.len() {
SampleType::from_sample(frame[ndx])
} else {
SampleType::from_sample(0.0f32)
};
*output = value;
}
}
}

View File

@ -1,34 +1,51 @@
mod sound_buffer; mod audio_sink;
mod audio_source;
mod input_buffer;
mod output_buffer;
mod udp_connection; mod udp_connection;
mod util;
use crate::sound_buffer::SoundBuffer; use crate::audio_sink::listen;
use crate::udp_connection::OpusUdpConnection; use audiopus::SampleRate;
use audiopus::coder::Encoder; use clap::{Parser, Subcommand};
use audiopus::{Application, Channels, SampleRate}; use log::LevelFilter;
use clap::Parser;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{FromSample, Sample, StreamConfig};
use log::{LevelFilter, error, info};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
pub const SAMPLE_RATE: SampleRate = SampleRate::Hz48000; pub const SAMPLE_RATE: SampleRate = SampleRate::Hz48000;
pub const SAMPLE_RATE_RAW: usize = 48_000; pub const SAMPLE_RATE_RAW: usize = 48_000;
pub const AUDIO_FRAME_RATE: usize = 50; pub const AUDIO_FRAME_RATE: usize = 50;
pub const MONO_FRAME_SIZE: usize = SAMPLE_RATE_RAW / AUDIO_FRAME_RATE; pub const MONO_FRAME_SIZE: usize = SAMPLE_RATE_RAW / AUDIO_FRAME_RATE;
pub const STEREO_FRAME_SIZE: usize = 2 * MONO_FRAME_SIZE; pub const STEREO_FRAME_SIZE: usize = 2 * MONO_FRAME_SIZE;
pub const CIRC_BUFFER_SIZE: usize = STEREO_FRAME_SIZE * 4; pub const CIRC_BUFFER_SIZE: usize = STEREO_FRAME_SIZE * 50;
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[command(version, about = "Tap sounds from a audio device and send them over a UDP socket", long_about = None)] #[command(version, about = "Tap sounds from a audio device and send them over a UDP socket", long_about = None)]
struct Opt { struct Opt {
/// The audio device to use #[command(subcommand)]
#[arg(short, long, default_value_t = String::from("default"))] command: Tasks,
device: String, }
/// Src UDP socket addr to send data from
src: SocketAddr, #[derive(Subcommand, Debug, Clone)]
/// UDP socket addr to connect to enum Tasks {
dest: SocketAddr, /// Tap an Audio source to send over the network
Tap {
/// The audio device to use
#[arg(short, long, default_value_t = String::from("default"))]
device: String,
/// Src UDP socket addr to send data from
src: SocketAddr,
/// UDP socket addr to connect to
dest: SocketAddr,
},
/// Listen to data from a tap
Listen {
/// The audio device to use
#[arg(short, long, default_value_t = String::from("default"))]
device: String,
/// UDP addr to receive data on
addr: SocketAddr,
},
} }
fn main() -> Result<(), anyhow::Error> { fn main() -> Result<(), anyhow::Error> {
@ -40,92 +57,8 @@ fn main() -> Result<(), anyhow::Error> {
let opt = Opt::parse(); let opt = Opt::parse();
let host = cpal::default_host(); let host = cpal::default_host();
// Set up the input device and stream with the default input config. match opt.command {
let device = if opt.device == "default" { Tasks::Tap { device, src, dest } => audio_source::tap(&device, src, dest, host),
host.default_input_device() Tasks::Listen { device, addr } => listen(&device, addr, host),
} else {
host.input_devices()?
.find(|x| x.name().map(|y| y == opt.device).unwrap_or(false))
}
.expect("failed to find input device");
info!("Input device: {}", device.name()?);
let config = device
.default_input_config()
.expect("Failed to get default input config");
let mut stream_config = StreamConfig::from(config.clone());
stream_config.sample_rate = cpal::SampleRate(SAMPLE_RATE_RAW as u32);
let udp_connection = OpusUdpConnection::new(opt.src, opt.dest)?;
let encoder = Encoder::new(SAMPLE_RATE, Channels::Stereo, Application::Audio)?;
let sound_buffer = Arc::new(Mutex::new(SoundBuffer::new(
STEREO_FRAME_SIZE,
encoder,
udp_connection,
)));
let err_fn = move |err| {
error!("an error occurred on stream: {err}");
};
let sound_buffer2 = sound_buffer.clone();
let stream = match config.sample_format() {
cpal::SampleFormat::I8 => device.build_input_stream(
&stream_config,
move |data, _: &_| write_input_data::<CIRC_BUFFER_SIZE, i8>(data, &sound_buffer2),
err_fn,
None,
)?,
cpal::SampleFormat::I16 => device.build_input_stream(
&stream_config,
move |data, _: &_| write_input_data::<CIRC_BUFFER_SIZE, i16>(data, &sound_buffer2),
err_fn,
None,
)?,
cpal::SampleFormat::I32 => device.build_input_stream(
&stream_config,
move |data, _: &_| write_input_data::<CIRC_BUFFER_SIZE, i32>(data, &sound_buffer2),
err_fn,
None,
)?,
cpal::SampleFormat::F32 => device.build_input_stream(
&stream_config,
move |data, _: &_| write_input_data::<CIRC_BUFFER_SIZE, f32>(data, &sound_buffer2),
err_fn,
None,
)?,
sample_format => {
return Err(anyhow::Error::msg(format!(
"Unsupported sample format '{sample_format}'"
)));
}
};
info!("Running tap...");
stream.play()?;
std::thread::park();
info!("Exiting...");
Ok(())
}
fn write_input_data<const N: usize, T>(
input: &[T],
ctx: &Arc<Mutex<SoundBuffer<N, OpusUdpConnection>>>,
) where
T: Sample,
f32: FromSample<T>,
{
if let Ok(mut ctx) = ctx.try_lock() {
let sample: Vec<f32> = input.iter().map(|s| f32::from_sample(*s)).collect();
if let Err(err) = ctx.write_data(&sample) {
error!("Got error when writing data to sound buffer: {err}")
}
} }
} }

View File

@ -15,14 +15,14 @@ pub enum Error {
} }
#[derive(Debug)] #[derive(Debug)]
pub struct SoundBuffer<const N: usize, T: Write> { pub struct OutputBuffer<const N: usize, T: Write> {
circular_buffer: CircularBuffer<N, f32>, circular_buffer: CircularBuffer<N, f32>,
frame_size: usize, frame_size: usize,
encoder: Encoder, encoder: Encoder,
offload: T, offload: T,
} }
impl<const N: usize, T: Write> SoundBuffer<N, T> { impl<const N: usize, T: Write> OutputBuffer<N, T> {
pub fn new(frame_size: usize, encoder: Encoder, offload: T) -> Self { pub fn new(frame_size: usize, encoder: Encoder, offload: T) -> Self {
Self { Self {
circular_buffer: CircularBuffer::new(), circular_buffer: CircularBuffer::new(),
@ -46,13 +46,13 @@ impl<const N: usize, T: Write> SoundBuffer<N, T> {
} }
pub fn write_data(&mut self, buf: &[f32]) -> Result<usize, Error> { pub fn write_data(&mut self, buf: &[f32]) -> Result<usize, Error> {
if buf.len() > N * 4 { if buf.len() > N {
return Err(Error::BufferTooSmall); return Err(Error::BufferTooSmall);
} }
self.circular_buffer.extend_from_slice(buf); self.circular_buffer.extend_from_slice(buf);
if self.circular_buffer.len() > self.frame_size { if self.circular_buffer.len() > self.frame_size * 4 {
self.offload_buffer_data()?; self.offload_buffer_data()?;
} }
@ -60,7 +60,7 @@ impl<const N: usize, T: Write> SoundBuffer<N, T> {
} }
} }
impl<const N: usize, T: Write> Drop for SoundBuffer<N, T> { impl<const N: usize, T: Write> Drop for OutputBuffer<N, T> {
fn drop(&mut self) { fn drop(&mut self) {
if self.circular_buffer.len() > N { if self.circular_buffer.len() > N {
let _ = self.offload_buffer_data().is_ok(); let _ = self.offload_buffer_data().is_ok();

View File

@ -1,5 +1,6 @@
use std::io::Write; use std::io::{Read, Write};
use std::net::{SocketAddr, UdpSocket}; use std::net::{SocketAddr, UdpSocket};
use std::time::Duration;
#[derive(Debug)] #[derive(Debug)]
pub struct OpusUdpConnection { pub struct OpusUdpConnection {
@ -14,6 +15,10 @@ impl OpusUdpConnection {
dest, dest,
}) })
} }
pub fn set_read_timeout(&mut self, timeout: Option<Duration>) -> std::io::Result<()> {
self.udp_socket.set_read_timeout(timeout)
}
} }
impl Write for OpusUdpConnection { impl Write for OpusUdpConnection {
@ -25,3 +30,9 @@ impl Write for OpusUdpConnection {
Ok(()) Ok(())
} }
} }
impl Read for OpusUdpConnection {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.udp_socket.recv(buf)
}
}

1
src/util.rs Normal file
View File

@ -0,0 +1 @@