Working with bot!
+ decode returns numbers of samples per channel, we were effectively chomping half the samples + needs a lot of cleanup
This commit is contained in:
parent
93ddba4a4a
commit
1f250f448d
7
Cargo.lock
generated
7
Cargo.lock
generated
@ -363,6 +363,12 @@ version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
|
||||
|
||||
[[package]]
|
||||
name = "hound"
|
||||
version = "3.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "62adaabb884c94955b19907d60019f4e145d091c75345379e70d1ee696f7854f"
|
||||
|
||||
[[package]]
|
||||
name = "iana-time-zone"
|
||||
version = "0.1.63"
|
||||
@ -787,6 +793,7 @@ dependencies = [
|
||||
"clap",
|
||||
"cpal",
|
||||
"env_logger",
|
||||
"hound",
|
||||
"log",
|
||||
"thiserror 2.0.12",
|
||||
]
|
||||
|
@ -13,3 +13,4 @@ audiopus = "0.3.0-rc.0"
|
||||
thiserror = "2.0.12"
|
||||
circular-buffer = "1.1.0"
|
||||
chrono = "0.4.41"
|
||||
hound = "3.5.1"
|
||||
|
88
src/audio_record.rs
Normal file
88
src/audio_record.rs
Normal file
@ -0,0 +1,88 @@
|
||||
use crate::tap_packet::TapPacketHeader;
|
||||
use crate::udp_connection::OpusUdpConnection;
|
||||
use crate::util::{
|
||||
SAMPLE_RATE, SAMPLE_RATE_RAW, STEREO_FRAME_SIZE, TAP_PACKET_MAX_DATA_LEN,
|
||||
};
|
||||
use anyhow::Error;
|
||||
use audiopus::coder::Decoder;
|
||||
use audiopus::packet::Packet;
|
||||
use audiopus::{Channels, MutSignals};
|
||||
use chrono::Utc;
|
||||
use log::{error, warn};
|
||||
use std::io::{ErrorKind, Read};
|
||||
use std::net::SocketAddr;
|
||||
use std::path::Path;
|
||||
|
||||
pub fn record(addr: SocketAddr, output_path: &Path, duration_ms: u64) -> Result<(), Error> {
|
||||
let mut decoder = Decoder::new(SAMPLE_RATE, Channels::Stereo)?;
|
||||
|
||||
let mut udp_connection = OpusUdpConnection::new(addr, addr)?;
|
||||
|
||||
let spec = hound::WavSpec {
|
||||
channels: 2,
|
||||
sample_rate: SAMPLE_RATE_RAW as u32,
|
||||
bits_per_sample: 32,
|
||||
sample_format: hound::SampleFormat::Float,
|
||||
};
|
||||
|
||||
let mut seq_count = 0;
|
||||
|
||||
let mut writer = hound::WavWriter::create(output_path, spec)?;
|
||||
|
||||
let deadline = std::time::Instant::now() + std::time::Duration::from_millis(duration_ms);
|
||||
|
||||
while std::time::Instant::now() < deadline {
|
||||
let mut packet = vec![0; TAP_PACKET_MAX_DATA_LEN + TapPacketHeader::serialized_size()];
|
||||
let mut output_buffer = vec![0.0; STEREO_FRAME_SIZE];
|
||||
let signals = MutSignals::try_from(&mut output_buffer)?;
|
||||
|
||||
let new_packet = match udp_connection.read(&mut packet) {
|
||||
Ok(data_len) => Some(&packet[0..data_len]),
|
||||
Err(err) => {
|
||||
if err.kind() == ErrorKind::WouldBlock {
|
||||
None
|
||||
} else {
|
||||
error!("Got error trying to recv from UDP: {err}");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let data_size = if let Some(new_packet) = new_packet {
|
||||
let hdr = TapPacketHeader::from_buffer(new_packet)?;
|
||||
|
||||
if !hdr.check_acceptable_latency_tolerance() {
|
||||
warn!(
|
||||
"Packet outside of tolerance: packet.timestamp={} time={}",
|
||||
hdr.timestamp,
|
||||
Utc::now().timestamp()
|
||||
);
|
||||
}
|
||||
|
||||
if hdr.seq != seq_count {
|
||||
warn!(
|
||||
"Found sequence mismatch, expected seq={}, {hdr:?}",
|
||||
seq_count
|
||||
);
|
||||
}
|
||||
|
||||
(seq_count, _) = hdr.seq.overflowing_add(1);
|
||||
|
||||
let packet = Packet::try_from(&new_packet[TapPacketHeader::serialized_size()..])?;
|
||||
|
||||
decoder.decode_float(Some(packet), signals, false)?
|
||||
} else {
|
||||
decoder.decode_float(None, signals, false)?
|
||||
} * 2;
|
||||
|
||||
for samples in output_buffer[..data_size].chunks(2) {
|
||||
for sample in samples {
|
||||
writer.write_sample(*sample)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
writer.finalize()?;
|
||||
|
||||
Ok(())
|
||||
}
|
@ -1,6 +1,6 @@
|
||||
use crate::input_buffer::InputBuffer;
|
||||
use crate::udp_connection::OpusUdpConnection;
|
||||
use crate::{CIRC_BUFFER_SIZE, SAMPLE_RATE, SAMPLE_RATE_RAW, STEREO_FRAME_SIZE};
|
||||
use crate::util::{CIRC_BUFFER_SIZE, SAMPLE_RATE, SAMPLE_RATE_RAW, STEREO_FRAME_SIZE};
|
||||
use anyhow::Error;
|
||||
use audiopus::Channels;
|
||||
use audiopus::coder::Decoder;
|
||||
@ -122,7 +122,7 @@ fn rx_thread(ctx: Context) {
|
||||
info!("Starting RX thread...");
|
||||
let mut udp_connection = OpusUdpConnection::new(ctx.addr, ctx.addr).unwrap();
|
||||
udp_connection
|
||||
.set_read_timeout(Some(Duration::from_millis(1000)))
|
||||
.set_read_timeout(Some(Duration::from_millis(20)))
|
||||
.unwrap();
|
||||
let mut packet = vec![0; 32 * 1024];
|
||||
|
||||
|
@ -1,8 +1,7 @@
|
||||
use crate::output_buffer::OutputBuffer;
|
||||
use crate::tx_thread::TxThread;
|
||||
use crate::udp_connection::OpusUdpConnection;
|
||||
use crate::{
|
||||
CIRC_BUFFER_SIZE, SAMPLE_RATE, SAMPLE_RATE_RAW,
|
||||
STEREO_FRAME_SIZE,
|
||||
use crate::util::{
|
||||
SAMPLE_RATE, SAMPLE_RATE_RAW, STEREO_FRAME_SIZE, ThreadMessage,
|
||||
};
|
||||
use anyhow::Error;
|
||||
use audiopus::coder::Encoder;
|
||||
@ -11,7 +10,8 @@ use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
|
||||
use cpal::{BufferSize, FromSample, Host, Sample, StreamConfig};
|
||||
use log::{error, info};
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::mpsc::{Sender, channel};
|
||||
use std::thread;
|
||||
|
||||
pub fn tap(
|
||||
device: &str,
|
||||
@ -36,7 +36,7 @@ pub fn tap(
|
||||
|
||||
let mut stream_config = StreamConfig::from(config.clone());
|
||||
stream_config.sample_rate = cpal::SampleRate(SAMPLE_RATE_RAW as u32);
|
||||
stream_config.buffer_size = BufferSize::Fixed(STEREO_FRAME_SIZE as u32);
|
||||
stream_config.buffer_size = BufferSize::Fixed(STEREO_FRAME_SIZE as u32 * 2);
|
||||
|
||||
for cfg in device.supported_input_configs()? {
|
||||
info!("{:?}", cfg)
|
||||
@ -50,39 +50,38 @@ pub fn tap(
|
||||
encoder.set_complexity(10)?;
|
||||
encoder.set_signal(Signal::Music)?;
|
||||
|
||||
let sound_buffer = Arc::new(Mutex::new(OutputBuffer::new(
|
||||
STEREO_FRAME_SIZE,
|
||||
encoder,
|
||||
udp_connection,
|
||||
)));
|
||||
let (sender, receiver) = channel();
|
||||
|
||||
let tx_thread = TxThread::new(receiver, encoder, udp_connection);
|
||||
|
||||
thread::spawn(move || tx_thread.thread());
|
||||
|
||||
let err_fn = move |err| {
|
||||
error!("an error occurred on stream: {err}");
|
||||
};
|
||||
|
||||
let sound_buffer2 = sound_buffer.clone();
|
||||
let stream = match config.sample_format() {
|
||||
cpal::SampleFormat::I8 => device.build_input_stream(
|
||||
&stream_config,
|
||||
move |data, _: &_| write_input_data::<CIRC_BUFFER_SIZE, i8>(data, &sound_buffer2),
|
||||
move |data, _: &_| write_input_data::<i8>(data, &sender),
|
||||
err_fn,
|
||||
None,
|
||||
)?,
|
||||
cpal::SampleFormat::I16 => device.build_input_stream(
|
||||
&stream_config,
|
||||
move |data, _: &_| write_input_data::<CIRC_BUFFER_SIZE, i16>(data, &sound_buffer2),
|
||||
move |data, _: &_| write_input_data::<i16>(data, &sender),
|
||||
err_fn,
|
||||
None,
|
||||
)?,
|
||||
cpal::SampleFormat::I32 => device.build_input_stream(
|
||||
&stream_config,
|
||||
move |data, _: &_| write_input_data::<CIRC_BUFFER_SIZE, i32>(data, &sound_buffer2),
|
||||
move |data, _: &_| write_input_data::<i32>(data, &sender),
|
||||
err_fn,
|
||||
None,
|
||||
)?,
|
||||
cpal::SampleFormat::F32 => device.build_input_stream(
|
||||
&stream_config,
|
||||
move |data, _: &_| write_input_data::<CIRC_BUFFER_SIZE, f32>(data, &sound_buffer2),
|
||||
move |data, _: &_| write_input_data::<f32>(data, &sender),
|
||||
err_fn,
|
||||
None,
|
||||
)?,
|
||||
@ -96,23 +95,20 @@ pub fn tap(
|
||||
info!("Running tap...");
|
||||
stream.play()?;
|
||||
|
||||
std::thread::park();
|
||||
thread::park();
|
||||
|
||||
info!("Exiting...");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_input_data<const N: usize, T>(
|
||||
input: &[T],
|
||||
ctx: &Arc<Mutex<OutputBuffer<N, OpusUdpConnection>>>,
|
||||
) where
|
||||
fn write_input_data<T>(input: &[T], ctx: &Sender<ThreadMessage>)
|
||||
where
|
||||
T: Sample,
|
||||
f32: FromSample<T>,
|
||||
{
|
||||
let mut ctx = ctx.lock().expect("Mutex poisoned");
|
||||
let sample: Vec<f32> = input.iter().map(|s| f32::from_sample(*s)).collect();
|
||||
|
||||
if let Err(err) = ctx.write_data(&sample) {
|
||||
error!("Got error when writing data to sound buffer: {err}")
|
||||
if let Err(err) = ctx.send(ThreadMessage::Data(sample)) {
|
||||
error!("Got error when sending data: {err}")
|
||||
}
|
||||
}
|
||||
|
@ -78,7 +78,7 @@ impl<const N: usize> InputBuffer<N> {
|
||||
let size = match input_buffer {
|
||||
Some(buf) => self.decode(buf, &mut output_buffer)?,
|
||||
None => self.missing_packet(&mut output_buffer)?,
|
||||
};
|
||||
} * 2;
|
||||
|
||||
self.write_frame_to_buffer(&output_buffer[..size]);
|
||||
|
||||
|
28
src/main.rs
28
src/main.rs
@ -1,23 +1,19 @@
|
||||
mod audio_record;
|
||||
mod audio_sink;
|
||||
mod audio_source;
|
||||
mod input_buffer;
|
||||
mod output_buffer;
|
||||
mod rx_thread;
|
||||
mod tap_packet;
|
||||
mod tx_thread;
|
||||
mod udp_connection;
|
||||
mod util;
|
||||
|
||||
use crate::audio_record::record;
|
||||
use crate::audio_sink::listen;
|
||||
use audiopus::SampleRate;
|
||||
use clap::{Parser, Subcommand};
|
||||
use log::LevelFilter;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
pub const SAMPLE_RATE: SampleRate = SampleRate::Hz48000;
|
||||
pub const SAMPLE_RATE_RAW: usize = 48_000;
|
||||
pub const AUDIO_FRAME_RATE: usize = 50;
|
||||
pub const MONO_FRAME_SIZE: usize = SAMPLE_RATE_RAW / AUDIO_FRAME_RATE;
|
||||
pub const STEREO_FRAME_SIZE: usize = 2 * MONO_FRAME_SIZE;
|
||||
pub const CIRC_BUFFER_SIZE: usize = STEREO_FRAME_SIZE * 50;
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(version, about = "Tap sounds from a audio device and send them over a UDP socket", long_about = None)]
|
||||
@ -47,6 +43,15 @@ enum Tasks {
|
||||
/// UDP addr to receive data on
|
||||
addr: SocketAddr,
|
||||
},
|
||||
|
||||
Record {
|
||||
/// UDP addr to receive data on
|
||||
addr: SocketAddr,
|
||||
/// Number of MS to record for
|
||||
duration_ms: u64,
|
||||
/// Output file
|
||||
output_file: PathBuf,
|
||||
},
|
||||
}
|
||||
|
||||
fn main() -> Result<(), anyhow::Error> {
|
||||
@ -61,5 +66,10 @@ fn main() -> Result<(), anyhow::Error> {
|
||||
match opt.command {
|
||||
Tasks::Tap { device, src, dest } => audio_source::tap(&device, src, dest, host),
|
||||
Tasks::Listen { device, addr } => listen(&device, addr, host),
|
||||
Tasks::Record {
|
||||
addr,
|
||||
duration_ms,
|
||||
output_file,
|
||||
} => record(addr, &output_file, duration_ms),
|
||||
}
|
||||
}
|
||||
|
@ -1,85 +0,0 @@
|
||||
use crate::tap_packet::{TapPacketError, TapPacketHeader};
|
||||
use audiopus::coder::Encoder;
|
||||
use circular_buffer::CircularBuffer;
|
||||
use std::io::Write;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
pub enum Error {
|
||||
#[error("Buffer too small to handle error")]
|
||||
BufferTooSmall,
|
||||
#[error("IO error on offload: {0}")]
|
||||
IoError(#[from] std::io::Error),
|
||||
#[error("Encode error: {0}")]
|
||||
EncodeError(#[from] audiopus::Error),
|
||||
#[error("Tap packet error: {0}")]
|
||||
TapPacketError(#[from] TapPacketError),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct OutputBuffer<const N: usize, T: Write> {
|
||||
circular_buffer: CircularBuffer<N, f32>,
|
||||
frame_size: usize,
|
||||
encoder: Encoder,
|
||||
offload: T,
|
||||
seq_count: u32,
|
||||
}
|
||||
|
||||
impl<const N: usize, T: Write> OutputBuffer<N, T> {
|
||||
pub fn new(frame_size: usize, encoder: Encoder, offload: T) -> Self {
|
||||
Self {
|
||||
circular_buffer: CircularBuffer::new(),
|
||||
frame_size,
|
||||
encoder,
|
||||
offload,
|
||||
seq_count: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn encode(&mut self, output_buffer: &mut [u8]) -> Result<usize, Error> {
|
||||
let encode_buffer: Vec<f32> = self.circular_buffer.drain(0..self.frame_size).collect();
|
||||
Ok(self.encoder.encode_float(&encode_buffer, output_buffer)?)
|
||||
}
|
||||
|
||||
fn offload_buffer_data(&mut self) -> Result<(), Error> {
|
||||
let mut output_buffer = vec![0; 32 * 1024 + TapPacketHeader::serialized_size()];
|
||||
let size = self.encode(&mut output_buffer[TapPacketHeader::serialized_size()..])?;
|
||||
|
||||
let hdr = TapPacketHeader {
|
||||
seq: self.seq_count,
|
||||
timestamp: chrono::Utc::now().timestamp(),
|
||||
};
|
||||
|
||||
hdr.write_to_buffer(&mut output_buffer)?;
|
||||
|
||||
let total_size = TapPacketHeader::serialized_size() + size;
|
||||
|
||||
self.offload.write_all(&output_buffer[0..total_size])?;
|
||||
|
||||
(self.seq_count, _) = self.seq_count.overflowing_add(1);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn write_data(&mut self, buf: &[f32]) -> Result<usize, Error> {
|
||||
if buf.len() > N {
|
||||
return Err(Error::BufferTooSmall);
|
||||
}
|
||||
|
||||
self.circular_buffer.extend_from_slice(buf);
|
||||
|
||||
if self.circular_buffer.len() > self.frame_size * 4 {
|
||||
self.offload_buffer_data()?;
|
||||
}
|
||||
|
||||
Ok(buf.len())
|
||||
}
|
||||
}
|
||||
|
||||
impl<const N: usize, T: Write> Drop for OutputBuffer<N, T> {
|
||||
fn drop(&mut self) {
|
||||
if self.circular_buffer.len() > N {
|
||||
let _ = self.offload_buffer_data().is_ok();
|
||||
}
|
||||
}
|
||||
}
|
127
src/rx_thread.rs
Normal file
127
src/rx_thread.rs
Normal file
@ -0,0 +1,127 @@
|
||||
use crate::tap_packet::TapPacketError;
|
||||
use crate::udp_connection::OpusUdpConnection;
|
||||
use crate::util::{CIRC_BUFFER_SIZE, ThreadMessage};
|
||||
use audiopus::coder::Decoder;
|
||||
use circular_buffer::CircularBuffer;
|
||||
use std::sync::mpsc::Sender;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
pub enum Error {
|
||||
#[error("IO error on Output: {0}")]
|
||||
IoError(#[from] std::io::Error),
|
||||
#[error("Decode error: {0}")]
|
||||
DecodeError(#[from] audiopus::Error),
|
||||
#[error("Tap packet error: {0}")]
|
||||
TapPacketError(#[from] TapPacketError),
|
||||
}
|
||||
|
||||
pub struct RxThread {
|
||||
sender: Sender<ThreadMessage>,
|
||||
decoder: Decoder,
|
||||
opus_udp_connection: OpusUdpConnection,
|
||||
circular_buffer: CircularBuffer<CIRC_BUFFER_SIZE, f32>,
|
||||
expected_seq: u32,
|
||||
exit: bool,
|
||||
}
|
||||
|
||||
//impl RxThread {
|
||||
// pub fn new(sender: Sender<ThreadMessage>, decoder: Decoder, opus_udp_connection: OpusUdpConnection) -> Self {
|
||||
// Self {
|
||||
// sender,
|
||||
// decoder,
|
||||
// opus_udp_connection,
|
||||
// circular_buffer: Default::default(),
|
||||
// expected_seq: 0,
|
||||
// exit: false,
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// fn decode(&mut self, input_buffer: &[u8], output_buffer: &mut [f32]) -> Result<usize, Error> {
|
||||
// let hdr = TapPacketHeader::from_buffer(input_buffer)?;
|
||||
//
|
||||
// if !hdr.check_acceptable_latency_tolerance() {
|
||||
// warn!(
|
||||
// "Packet outside of tolerance: packet.timestamp={} time={}",
|
||||
// hdr.timestamp,
|
||||
// Utc::now().timestamp()
|
||||
// );
|
||||
// }
|
||||
//
|
||||
// if hdr.seq != self.expected_seq {
|
||||
// warn!(
|
||||
// "Found sequence mismatch, expected seq={}, {hdr:?}",
|
||||
// self.expected_seq
|
||||
// );
|
||||
// }
|
||||
//
|
||||
// (self.expected_seq, _) = hdr.seq.overflowing_add(1);
|
||||
//
|
||||
// let packet = Packet::try_from(&input_buffer[TapPacketHeader::serialized_size()..])?;
|
||||
//
|
||||
// let signals = MutSignals::try_from(output_buffer)?;
|
||||
// Ok(self.decoder.decode_float(Some(packet), signals, false)?)
|
||||
// }
|
||||
//
|
||||
// fn missing_packet(&mut self, output_buffer: &mut [f32]) -> Result<usize, Error> {
|
||||
// let signals = MutSignals::try_from(output_buffer)?;
|
||||
// Ok(self.decoder.decode_float(None, signals, false)?)
|
||||
// }
|
||||
//
|
||||
// fn write_frame_to_buffer(&mut self, frame: &[f32]) {
|
||||
// self.circular_buffer.extend_from_slice(frame)
|
||||
// }
|
||||
//
|
||||
//
|
||||
// pub fn recv_packet(&mut self, input_buffer: Option<&[u8]>) -> Result<(), Error> {
|
||||
// let mut output_buffer = vec![0.0; STEREO_FRAME_SIZE];
|
||||
//
|
||||
// let size = match input_buffer {
|
||||
// Some(buf) => self.decode(buf, &mut output_buffer)?,
|
||||
// None => self.missing_packet(&mut output_buffer)?,
|
||||
// };
|
||||
//
|
||||
// self.write_frame_to_buffer(&output_buffer[..size]);
|
||||
//
|
||||
// Ok(())
|
||||
// }
|
||||
//
|
||||
// pub fn send_next_frame(
|
||||
// &mut self,
|
||||
// ) {
|
||||
// let drain_count = output_buffer.len().min(self.circular_buffer.len());
|
||||
//
|
||||
// let frame: Vec<f32> = self.circular_buffer.drain(..drain_count).collect();
|
||||
//
|
||||
// for (ndx, output) in output_buffer.iter_mut().enumerate() {
|
||||
// let value: SampleType = if ndx < frame.len() {
|
||||
// SampleType::from_sample(frame[ndx])
|
||||
// } else {
|
||||
// SampleType::from_sample(0.0f32)
|
||||
// };
|
||||
//
|
||||
// *output = value;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// fn handle_udp_msg(&mut self) -> Result<(), Error> {
|
||||
// let mut udp_packet = vec![0; TAP_PACKET_MAX_DATA_LEN + TapPacketHeader::serialized_size()];
|
||||
//
|
||||
// let size = self.opus_udp_connection.read(&mut udp_packet)?;
|
||||
//
|
||||
// let udp_packet = if size == 0 {
|
||||
// None
|
||||
// }
|
||||
// else {
|
||||
// Some(udp_packet.as_slice())
|
||||
// };
|
||||
//
|
||||
// self.recv_packet(udp_packet)?;
|
||||
//
|
||||
//
|
||||
// Ok(())
|
||||
// }
|
||||
//
|
||||
// pub fn thread(self)
|
||||
//}
|
111
src/tx_thread.rs
Normal file
111
src/tx_thread.rs
Normal file
@ -0,0 +1,111 @@
|
||||
use crate::tap_packet::{TapPacketError, TapPacketHeader};
|
||||
use crate::udp_connection::OpusUdpConnection;
|
||||
use crate::util::STEREO_FRAME_SIZE;
|
||||
use crate::util::{CIRC_BUFFER_SIZE, TAP_PACKET_MAX_DATA_LEN, ThreadMessage};
|
||||
use audiopus::coder::Encoder;
|
||||
use circular_buffer::CircularBuffer;
|
||||
use log::{error, info, warn};
|
||||
use std::io::Write;
|
||||
use std::sync::mpsc::{Receiver, RecvError};
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error("Failed to receive message: {0}")]
|
||||
RecvFailed(#[from] RecvError),
|
||||
#[error("Buffer too small to handle error")]
|
||||
BufferTooSmall,
|
||||
#[error("IO error on offload: {0}")]
|
||||
IoError(#[from] std::io::Error),
|
||||
#[error("Encode error: {0}")]
|
||||
EncodeError(#[from] audiopus::Error),
|
||||
#[error("Tap packet error: {0}")]
|
||||
TapPacketError(#[from] TapPacketError),
|
||||
}
|
||||
|
||||
pub struct TxThread {
|
||||
receiver: Receiver<ThreadMessage>,
|
||||
encoder: Encoder,
|
||||
opus_udp_connection: OpusUdpConnection,
|
||||
circular_buffer: CircularBuffer<CIRC_BUFFER_SIZE, f32>,
|
||||
seq_count: u32,
|
||||
exit: bool,
|
||||
}
|
||||
|
||||
impl TxThread {
|
||||
pub fn new(
|
||||
receiver: Receiver<ThreadMessage>,
|
||||
encoder: Encoder,
|
||||
opus_udp_connection: OpusUdpConnection,
|
||||
) -> Self {
|
||||
Self {
|
||||
receiver,
|
||||
encoder,
|
||||
opus_udp_connection,
|
||||
circular_buffer: Default::default(),
|
||||
seq_count: 0,
|
||||
exit: false,
|
||||
}
|
||||
}
|
||||
|
||||
fn encode(&mut self, output_buffer: &mut [u8]) -> Result<usize, Error> {
|
||||
let encode_buffer: Vec<f32> = self.circular_buffer.drain(0..STEREO_FRAME_SIZE).collect();
|
||||
Ok(self.encoder.encode_float(&encode_buffer, output_buffer)?)
|
||||
}
|
||||
|
||||
fn send_opus_frame(&mut self) -> Result<(), Error> {
|
||||
let mut output_buffer =
|
||||
vec![0; TAP_PACKET_MAX_DATA_LEN + TapPacketHeader::serialized_size()];
|
||||
let size = self.encode(&mut output_buffer[TapPacketHeader::serialized_size()..])?;
|
||||
|
||||
let hdr = TapPacketHeader {
|
||||
seq: self.seq_count,
|
||||
timestamp: chrono::Utc::now().timestamp(),
|
||||
};
|
||||
|
||||
hdr.write_to_buffer(&mut output_buffer)?;
|
||||
|
||||
let total_size = TapPacketHeader::serialized_size() + size;
|
||||
|
||||
self.opus_udp_connection
|
||||
.write_all(&output_buffer[0..total_size])?;
|
||||
|
||||
(self.seq_count, _) = self.seq_count.overflowing_add(1);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_data_msg(&mut self, data: Vec<f32>) -> Result<(), Error> {
|
||||
if self.circular_buffer.len() + data.len() > CIRC_BUFFER_SIZE {
|
||||
warn!("Will drop data in circular buffer");
|
||||
}
|
||||
|
||||
self.circular_buffer.extend_from_slice(&data);
|
||||
|
||||
if self.circular_buffer.len() >= STEREO_FRAME_SIZE {
|
||||
self.send_opus_frame()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_msg(&mut self) -> Result<(), Error> {
|
||||
let msg = self.receiver.recv()?;
|
||||
|
||||
match msg {
|
||||
ThreadMessage::Data(data) => self.handle_data_msg(data),
|
||||
ThreadMessage::Exit => {
|
||||
self.exit = true;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn thread(mut self) {
|
||||
info!("Starting TX Thread...");
|
||||
loop {
|
||||
if let Err(err) = self.handle_msg() {
|
||||
error!("Failed to process message: {err:?}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
14
src/util.rs
14
src/util.rs
@ -1 +1,15 @@
|
||||
use audiopus::SampleRate;
|
||||
|
||||
pub const SAMPLE_RATE: SampleRate = SampleRate::Hz48000;
|
||||
pub const SAMPLE_RATE_RAW: usize = 48_000;
|
||||
pub const AUDIO_FRAME_RATE: usize = 50;
|
||||
pub const MONO_FRAME_SIZE: usize = SAMPLE_RATE_RAW / AUDIO_FRAME_RATE;
|
||||
pub const STEREO_FRAME_SIZE: usize = 2 * MONO_FRAME_SIZE;
|
||||
pub const CIRC_BUFFER_SIZE: usize = STEREO_FRAME_SIZE * 10;
|
||||
pub const TAP_PACKET_MAX_DATA_LEN: usize = 32 * 1024;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ThreadMessage {
|
||||
Data(Vec<f32>),
|
||||
Exit,
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user