tap/src/audio_sink.rs

170 lines
5.3 KiB
Rust

use crate::input_buffer::InputBuffer;
use crate::udp_connection::OpusUdpConnection;
use crate::{CIRC_BUFFER_SIZE, SAMPLE_RATE, SAMPLE_RATE_RAW, STEREO_FRAME_SIZE};
use anyhow::Error;
use audiopus::Channels;
use audiopus::coder::Decoder;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{
BufferSize, Device, FromSample, Host, I24, Sample, SampleRate, SizedSample, StreamConfig,
SupportedStreamConfig,
};
use log::{error, info};
use std::io::{ErrorKind, Read};
use std::net::SocketAddr;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
const INPUT_BUFFER_SIZE: usize = CIRC_BUFFER_SIZE;
#[derive(Debug, Clone)]
struct Context {
pub addr: SocketAddr,
pub input_buffer: Arc<Mutex<InputBuffer<INPUT_BUFFER_SIZE>>>,
pub data_rady: Arc<Condvar>,
}
pub fn listen(device: &str, addr: SocketAddr, host: Host) -> Result<(), Error> {
// Set up the input device and stream with the default input config.
let device = if device == "default" {
host.default_output_device()
} else {
host.output_devices()?
.find(|x| x.name().map(|y| y == device).unwrap_or(false))
}
.expect("failed to find output device");
info!("Output device: {}", device.name()?);
let config = device.default_output_config()?;
println!("Default output config : {config:?}");
for cfg in device.supported_output_configs()? {
info!("{:?}", cfg);
}
let decoder = Decoder::new(SAMPLE_RATE, Channels::Stereo)?;
let input_buffer = InputBuffer::new(STEREO_FRAME_SIZE, decoder);
let ctx = Context {
addr,
input_buffer: Arc::new(Mutex::new(input_buffer)),
data_rady: Arc::new(Condvar::new()),
};
let stream = stream_setup_for(device, config, ctx)?;
stream.play()?;
thread::park();
Ok(())
}
fn stream_setup_for(
device: Device,
default_config: SupportedStreamConfig,
ctx: Context,
) -> Result<cpal::Stream, anyhow::Error>
where
{
let mut config: StreamConfig = default_config.clone().into();
config.sample_rate = SampleRate(SAMPLE_RATE_RAW as u32);
config.buffer_size = BufferSize::Fixed(STEREO_FRAME_SIZE as u32);
match default_config.sample_format() {
cpal::SampleFormat::I8 => make_stream::<i8>(&device, &config, ctx),
cpal::SampleFormat::I16 => make_stream::<i16>(&device, &config, ctx),
cpal::SampleFormat::I24 => make_stream::<I24>(&device, &config, ctx),
cpal::SampleFormat::I32 => make_stream::<i32>(&device, &config, ctx),
cpal::SampleFormat::I64 => make_stream::<i64>(&device, &config, ctx),
cpal::SampleFormat::U8 => make_stream::<u8>(&device, &config, ctx),
cpal::SampleFormat::U16 => make_stream::<u16>(&device, &config, ctx),
cpal::SampleFormat::U32 => make_stream::<u32>(&device, &config, ctx),
cpal::SampleFormat::U64 => make_stream::<u64>(&device, &config, ctx),
cpal::SampleFormat::F32 => make_stream::<f32>(&device, &config, ctx),
cpal::SampleFormat::F64 => make_stream::<f64>(&device, &config, ctx),
sample_format => Err(anyhow::Error::msg(format!(
"Unsupported sample format '{sample_format}'"
))),
}
}
fn make_stream<T>(
device: &Device,
config: &cpal::StreamConfig,
ctx: Context,
) -> Result<cpal::Stream, anyhow::Error>
where
T: SizedSample + FromSample<f32>,
{
let err_fn = |err| error!("Error building output sound stream: {err}");
let time_at_start = std::time::Instant::now();
info!("Time at start: {time_at_start:?}");
let ctx_recv_thread = ctx.clone();
thread::spawn(move || rx_thread(ctx_recv_thread));
let stream = device.build_output_stream(
config,
move |output: &mut [T], _: &cpal::OutputCallbackInfo| process_frame(output, &ctx),
err_fn,
None,
)?;
Ok(stream)
}
fn rx_thread(ctx: Context) {
info!("Starting RX thread...");
let mut udp_connection = OpusUdpConnection::new(ctx.addr, ctx.addr).unwrap();
udp_connection
.set_read_timeout(Some(Duration::from_millis(1000)))
.unwrap();
let mut packet = vec![0; 32 * 1024];
loop {
let new_packet = match udp_connection.read(&mut packet) {
Ok(data_len) => Some(&packet[0..data_len]),
Err(err) => {
if err.kind() == ErrorKind::WouldBlock {
None
} else {
error!("Got error trying to recv from UDP: {err}");
continue;
}
}
};
let mut input_buffer = ctx.input_buffer.lock().expect("Mutex poisoned");
if let Err(err) = input_buffer.recv_packet(new_packet) {
error!("Adding packet to buffer: {err}")
} else {
ctx.data_rady.notify_all();
}
}
}
fn process_frame<SampleType>(output: &mut [SampleType], ctx: &Context)
where
SampleType: Sample + FromSample<f32>,
{
let mut input_buffer = ctx.input_buffer.lock().expect("Mutex poisoned");
let mut input_buffer = if input_buffer.samples_in_buffer() < output.len() {
while input_buffer.samples_in_buffer() < output.len() {
input_buffer = ctx.data_rady.wait(input_buffer).expect("Mutex poisoned");
}
input_buffer
} else {
input_buffer
};
input_buffer.read_frame_from_buffer(output)
}