Switch to using protobuf and remove audio sink

This commit is contained in:
Joey Hines 2025-08-31 15:01:18 -06:00
parent 1f250f448d
commit 3e6552d012
Signed by: joeyahines
GPG Key ID: 38BA6F25C94C9382
13 changed files with 224 additions and 505 deletions

2
.cargo/config.toml Normal file
View File

@ -0,0 +1,2 @@
[registries.ahines]
index = "https://git.ahines.net/joeyahines/_cargo-index.git"

193
Cargo.lock generated
View File

@ -322,6 +322,12 @@ dependencies = [
"objc2", "objc2",
] ]
[[package]]
name = "either"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
[[package]] [[package]]
name = "env_filter" name = "env_filter"
version = "0.1.3" version = "0.1.3"
@ -351,6 +357,40 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
[[package]]
name = "errno"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad"
dependencies = [
"libc",
"windows-sys 0.60.2",
]
[[package]]
name = "fastrand"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "fixedbitset"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
[[package]]
name = "getrandom"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4"
dependencies = [
"cfg-if",
"libc",
"r-efi",
"wasi",
]
[[package]] [[package]]
name = "hashbrown" name = "hashbrown"
version = "0.15.5" version = "0.15.5"
@ -409,6 +449,15 @@ version = "1.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
[[package]]
name = "itertools"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285"
dependencies = [
"either",
]
[[package]] [[package]]
name = "jiff" name = "jiff"
version = "0.2.15" version = "0.2.15"
@ -471,6 +520,12 @@ version = "0.2.174"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1171693293099992e19cddea4e8b849964e9846f4acee11b3948bcc337be8776" checksum = "1171693293099992e19cddea4e8b849964e9846f4acee11b3948bcc337be8776"
[[package]]
name = "linux-raw-sys"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12"
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.27" version = "0.4.27"
@ -492,6 +547,12 @@ version = "2.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0"
[[package]]
name = "multimap"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084"
[[package]] [[package]]
name = "ndk" name = "ndk"
version = "0.9.0" version = "0.9.0"
@ -647,6 +708,16 @@ version = "1.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad"
[[package]]
name = "petgraph"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772"
dependencies = [
"fixedbitset",
"indexmap",
]
[[package]] [[package]]
name = "pkg-config" name = "pkg-config"
version = "0.3.32" version = "0.3.32"
@ -668,6 +739,16 @@ dependencies = [
"portable-atomic", "portable-atomic",
] ]
[[package]]
name = "prettyplease"
version = "0.2.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff24dfcda44452b9816fff4cd4227e1bb73ff5a2f1bc1105aa92fb8565ce44d2"
dependencies = [
"proc-macro2",
"syn",
]
[[package]] [[package]]
name = "proc-macro-crate" name = "proc-macro-crate"
version = "3.3.0" version = "3.3.0"
@ -686,6 +767,58 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "prost"
version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d"
dependencies = [
"bytes",
"prost-derive",
]
[[package]]
name = "prost-build"
version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1"
dependencies = [
"heck",
"itertools",
"log",
"multimap",
"once_cell",
"petgraph",
"prettyplease",
"prost",
"prost-types",
"regex",
"syn",
"tempfile",
]
[[package]]
name = "prost-derive"
version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425"
dependencies = [
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "prost-types"
version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9b4db3d6da204ed77bb26ba83b6122a73aeb2e87e25fbf7ad2e84c4ccbf8f72"
dependencies = [
"prost",
]
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.40" version = "1.0.40"
@ -695,6 +828,12 @@ dependencies = [
"proc-macro2", "proc-macro2",
] ]
[[package]]
name = "r-efi"
version = "5.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
[[package]] [[package]]
name = "regex" name = "regex"
version = "1.11.1" version = "1.11.1"
@ -724,6 +863,19 @@ version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]]
name = "rustix"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11181fbabf243db407ef8df94a6ce0b2f9a733bd8be4ad02b4eda9602296cac8"
dependencies = [
"bitflags 2.9.1",
"errno",
"libc",
"linux-raw-sys",
"windows-sys 0.60.2",
]
[[package]] [[package]]
name = "rustversion" name = "rustversion"
version = "1.0.21" version = "1.0.21"
@ -795,9 +947,35 @@ dependencies = [
"env_logger", "env_logger",
"hound", "hound",
"log", "log",
"prost",
"tap-interface",
"thiserror 2.0.12", "thiserror 2.0.12",
] ]
[[package]]
name = "tap-interface"
version = "0.1.0"
source = "registry+https://git.ahines.net/joeyahines/_cargo-index.git"
checksum = "4450520c17b84421ce3addd0cb91d99f3a358e969f55440f9946b79ef8b48e67"
dependencies = [
"prost",
"prost-build",
"prost-types",
]
[[package]]
name = "tempfile"
version = "3.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15b61f8f20e3a6f7e0649d825294eaf317edce30f82cf6026e7e4cb9222a7d1e"
dependencies = [
"fastrand",
"getrandom",
"once_cell",
"rustix",
"windows-sys 0.60.2",
]
[[package]] [[package]]
name = "thiserror" name = "thiserror"
version = "1.0.69" version = "1.0.69"
@ -877,6 +1055,15 @@ dependencies = [
"winapi-util", "winapi-util",
] ]
[[package]]
name = "wasi"
version = "0.14.3+wasi-0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a51ae83037bdd272a9e28ce236db8c07016dd0d50c27038b3f407533c030c95"
dependencies = [
"wit-bindgen",
]
[[package]] [[package]]
name = "wasm-bindgen" name = "wasm-bindgen"
version = "0.2.100" version = "0.2.100"
@ -1276,3 +1463,9 @@ checksum = "f3edebf492c8125044983378ecb5766203ad3b4c2f7a922bd7dd207f6d443e95"
dependencies = [ dependencies = [
"memchr", "memchr",
] ]
[[package]]
name = "wit-bindgen"
version = "0.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "052283831dbae3d879dc7f51f3d92703a316ca49f91540417d38591826127814"

View File

@ -14,3 +14,5 @@ thiserror = "2.0.12"
circular-buffer = "1.1.0" circular-buffer = "1.1.0"
chrono = "0.4.41" chrono = "0.4.41"
hound = "3.5.1" hound = "3.5.1"
tap-interface = {version = "0.1.0", registry = "ahines"}
prost = "0.14.1"

View File

@ -1,17 +1,16 @@
use crate::tap_packet::TapPacketHeader;
use crate::udp_connection::OpusUdpConnection; use crate::udp_connection::OpusUdpConnection;
use crate::util::{ use crate::util::{SAMPLE_RATE, SAMPLE_RATE_RAW, STEREO_FRAME_SIZE, TAP_PACKET_MAX_DATA_LEN};
SAMPLE_RATE, SAMPLE_RATE_RAW, STEREO_FRAME_SIZE, TAP_PACKET_MAX_DATA_LEN,
};
use anyhow::Error; use anyhow::Error;
use audiopus::coder::Decoder; use audiopus::coder::Decoder;
use audiopus::packet::Packet; use audiopus::packet::Packet;
use audiopus::{Channels, MutSignals}; use audiopus::{Channels, MutSignals};
use chrono::Utc; use chrono::Utc;
use log::{error, warn}; use log::{error, warn};
use prost::Message;
use std::io::{ErrorKind, Read}; use std::io::{ErrorKind, Read};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::Path; use std::path::Path;
use tap_interface::tap::message::TapMessage;
pub fn record(addr: SocketAddr, output_path: &Path, duration_ms: u64) -> Result<(), Error> { pub fn record(addr: SocketAddr, output_path: &Path, duration_ms: u64) -> Result<(), Error> {
let mut decoder = Decoder::new(SAMPLE_RATE, Channels::Stereo)?; let mut decoder = Decoder::new(SAMPLE_RATE, Channels::Stereo)?;
@ -32,7 +31,7 @@ pub fn record(addr: SocketAddr, output_path: &Path, duration_ms: u64) -> Result<
let deadline = std::time::Instant::now() + std::time::Duration::from_millis(duration_ms); let deadline = std::time::Instant::now() + std::time::Duration::from_millis(duration_ms);
while std::time::Instant::now() < deadline { while std::time::Instant::now() < deadline {
let mut packet = vec![0; TAP_PACKET_MAX_DATA_LEN + TapPacketHeader::serialized_size()]; let mut packet = vec![0; TAP_PACKET_MAX_DATA_LEN];
let mut output_buffer = vec![0.0; STEREO_FRAME_SIZE]; let mut output_buffer = vec![0.0; STEREO_FRAME_SIZE];
let signals = MutSignals::try_from(&mut output_buffer)?; let signals = MutSignals::try_from(&mut output_buffer)?;
@ -49,26 +48,26 @@ pub fn record(addr: SocketAddr, output_path: &Path, duration_ms: u64) -> Result<
}; };
let data_size = if let Some(new_packet) = new_packet { let data_size = if let Some(new_packet) = new_packet {
let hdr = TapPacketHeader::from_buffer(new_packet)?; let tap_message = TapMessage::decode(new_packet)?;
if !hdr.check_acceptable_latency_tolerance() { if tap_message.timestamp - Utc::now().timestamp() as u64 > 1 {
warn!( warn!(
"Packet outside of tolerance: packet.timestamp={} time={}", "Packet outside of tolerance: packet.timestamp={} time={}",
hdr.timestamp, tap_message.timestamp,
Utc::now().timestamp() Utc::now().timestamp()
); );
} }
if hdr.seq != seq_count { if tap_message.seq != seq_count {
warn!( warn!(
"Found sequence mismatch, expected seq={}, {hdr:?}", "Found sequence mismatch, expected seq={}, {tap_message:?}",
seq_count seq_count
); );
} }
(seq_count, _) = hdr.seq.overflowing_add(1); (seq_count, _) = tap_message.seq.overflowing_add(1);
let packet = Packet::try_from(&new_packet[TapPacketHeader::serialized_size()..])?; let packet = Packet::try_from(&tap_message.audio_data)?;
decoder.decode_float(Some(packet), signals, false)? decoder.decode_float(Some(packet), signals, false)?
} else { } else {

View File

@ -1,169 +0,0 @@
use crate::input_buffer::InputBuffer;
use crate::udp_connection::OpusUdpConnection;
use crate::util::{CIRC_BUFFER_SIZE, SAMPLE_RATE, SAMPLE_RATE_RAW, STEREO_FRAME_SIZE};
use anyhow::Error;
use audiopus::Channels;
use audiopus::coder::Decoder;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{
BufferSize, Device, FromSample, Host, I24, Sample, SampleRate, SizedSample, StreamConfig,
SupportedStreamConfig,
};
use log::{error, info};
use std::io::{ErrorKind, Read};
use std::net::SocketAddr;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
const INPUT_BUFFER_SIZE: usize = CIRC_BUFFER_SIZE;
#[derive(Debug, Clone)]
struct Context {
pub addr: SocketAddr,
pub input_buffer: Arc<Mutex<InputBuffer<INPUT_BUFFER_SIZE>>>,
pub data_rady: Arc<Condvar>,
}
pub fn listen(device: &str, addr: SocketAddr, host: Host) -> Result<(), Error> {
// Set up the input device and stream with the default input config.
let device = if device == "default" {
host.default_output_device()
} else {
host.output_devices()?
.find(|x| x.name().map(|y| y == device).unwrap_or(false))
}
.expect("failed to find output device");
info!("Output device: {}", device.name()?);
let config = device.default_output_config()?;
println!("Default output config : {config:?}");
for cfg in device.supported_output_configs()? {
info!("{:?}", cfg);
}
let decoder = Decoder::new(SAMPLE_RATE, Channels::Stereo)?;
let input_buffer = InputBuffer::new(STEREO_FRAME_SIZE, decoder);
let ctx = Context {
addr,
input_buffer: Arc::new(Mutex::new(input_buffer)),
data_rady: Arc::new(Condvar::new()),
};
let stream = stream_setup_for(device, config, ctx)?;
stream.play()?;
thread::park();
Ok(())
}
fn stream_setup_for(
device: Device,
default_config: SupportedStreamConfig,
ctx: Context,
) -> Result<cpal::Stream, anyhow::Error>
where
{
let mut config: StreamConfig = default_config.clone().into();
config.sample_rate = SampleRate(SAMPLE_RATE_RAW as u32);
config.buffer_size = BufferSize::Fixed(STEREO_FRAME_SIZE as u32);
match default_config.sample_format() {
cpal::SampleFormat::I8 => make_stream::<i8>(&device, &config, ctx),
cpal::SampleFormat::I16 => make_stream::<i16>(&device, &config, ctx),
cpal::SampleFormat::I24 => make_stream::<I24>(&device, &config, ctx),
cpal::SampleFormat::I32 => make_stream::<i32>(&device, &config, ctx),
cpal::SampleFormat::I64 => make_stream::<i64>(&device, &config, ctx),
cpal::SampleFormat::U8 => make_stream::<u8>(&device, &config, ctx),
cpal::SampleFormat::U16 => make_stream::<u16>(&device, &config, ctx),
cpal::SampleFormat::U32 => make_stream::<u32>(&device, &config, ctx),
cpal::SampleFormat::U64 => make_stream::<u64>(&device, &config, ctx),
cpal::SampleFormat::F32 => make_stream::<f32>(&device, &config, ctx),
cpal::SampleFormat::F64 => make_stream::<f64>(&device, &config, ctx),
sample_format => Err(anyhow::Error::msg(format!(
"Unsupported sample format '{sample_format}'"
))),
}
}
fn make_stream<T>(
device: &Device,
config: &cpal::StreamConfig,
ctx: Context,
) -> Result<cpal::Stream, anyhow::Error>
where
T: SizedSample + FromSample<f32>,
{
let err_fn = |err| error!("Error building output sound stream: {err}");
let time_at_start = std::time::Instant::now();
info!("Time at start: {time_at_start:?}");
let ctx_recv_thread = ctx.clone();
thread::spawn(move || rx_thread(ctx_recv_thread));
let stream = device.build_output_stream(
config,
move |output: &mut [T], _: &cpal::OutputCallbackInfo| process_frame(output, &ctx),
err_fn,
None,
)?;
Ok(stream)
}
fn rx_thread(ctx: Context) {
info!("Starting RX thread...");
let mut udp_connection = OpusUdpConnection::new(ctx.addr, ctx.addr).unwrap();
udp_connection
.set_read_timeout(Some(Duration::from_millis(20)))
.unwrap();
let mut packet = vec![0; 32 * 1024];
loop {
let new_packet = match udp_connection.read(&mut packet) {
Ok(data_len) => Some(&packet[0..data_len]),
Err(err) => {
if err.kind() == ErrorKind::WouldBlock {
None
} else {
error!("Got error trying to recv from UDP: {err}");
continue;
}
}
};
let mut input_buffer = ctx.input_buffer.lock().expect("Mutex poisoned");
if let Err(err) = input_buffer.recv_packet(new_packet) {
error!("Adding packet to buffer: {err}")
} else {
ctx.data_rady.notify_all();
}
}
}
fn process_frame<SampleType>(output: &mut [SampleType], ctx: &Context)
where
SampleType: Sample + FromSample<f32>,
{
let mut input_buffer = ctx.input_buffer.lock().expect("Mutex poisoned");
let mut input_buffer = if input_buffer.samples_in_buffer() < output.len() {
while input_buffer.samples_in_buffer() < output.len() {
input_buffer = ctx.data_rady.wait(input_buffer).expect("Mutex poisoned");
}
input_buffer
} else {
input_buffer
};
input_buffer.read_frame_from_buffer(output)
}

View File

@ -1,8 +1,6 @@
use crate::tx_thread::TxThread; use crate::tx_thread::TxThread;
use crate::udp_connection::OpusUdpConnection; use crate::udp_connection::OpusUdpConnection;
use crate::util::{ use crate::util::{SAMPLE_RATE, SAMPLE_RATE_RAW, STEREO_FRAME_SIZE, ThreadMessage};
SAMPLE_RATE, SAMPLE_RATE_RAW, STEREO_FRAME_SIZE, ThreadMessage,
};
use anyhow::Error; use anyhow::Error;
use audiopus::coder::Encoder; use audiopus::coder::Encoder;
use audiopus::{Application, Bitrate, Channels, Signal}; use audiopus::{Application, Bitrate, Channels, Signal};

View File

@ -1,110 +0,0 @@
use crate::tap_packet::{TapPacketError, TapPacketHeader};
use audiopus::MutSignals;
use audiopus::coder::Decoder;
use audiopus::packet::Packet;
use chrono::Utc;
use circular_buffer::CircularBuffer;
use cpal::{FromSample, Sample};
use log::warn;
use thiserror::Error;
#[derive(Error, Debug)]
#[allow(clippy::enum_variant_names)]
pub enum Error {
#[error("IO error on Output: {0}")]
IoError(#[from] std::io::Error),
#[error("Decode error: {0}")]
DecodeError(#[from] audiopus::Error),
#[error("Tap packet error: {0}")]
TapPacketError(#[from] TapPacketError),
}
#[derive(Debug)]
pub struct InputBuffer<const N: usize> {
circular_buffer: CircularBuffer<N, f32>,
frame_size: usize,
decoder: Decoder,
expected_seq: u32,
}
impl<const N: usize> InputBuffer<N> {
pub fn new(frame_size: usize, decoder: Decoder) -> Self {
Self {
circular_buffer: CircularBuffer::new(),
frame_size,
decoder,
expected_seq: 0,
}
}
fn decode(&mut self, input_buffer: &[u8], output_buffer: &mut [f32]) -> Result<usize, Error> {
let hdr = TapPacketHeader::from_buffer(input_buffer)?;
if !hdr.check_acceptable_latency_tolerance() {
warn!(
"Packet outside of tolerance: packet.timestamp={} time={}",
hdr.timestamp,
Utc::now().timestamp()
);
}
if hdr.seq != self.expected_seq {
warn!(
"Found sequence mismatch, expected seq={}, {hdr:?}",
self.expected_seq
);
}
(self.expected_seq, _) = hdr.seq.overflowing_add(1);
let packet = Packet::try_from(&input_buffer[TapPacketHeader::serialized_size()..])?;
let signals = MutSignals::try_from(output_buffer)?;
Ok(self.decoder.decode_float(Some(packet), signals, false)?)
}
fn missing_packet(&mut self, output_buffer: &mut [f32]) -> Result<usize, Error> {
let signals = MutSignals::try_from(output_buffer)?;
Ok(self.decoder.decode_float(None, signals, false)?)
}
fn write_frame_to_buffer(&mut self, frame: &[f32]) {
self.circular_buffer.extend_from_slice(frame)
}
pub fn recv_packet(&mut self, input_buffer: Option<&[u8]>) -> Result<(), Error> {
let mut output_buffer = vec![0.0; self.frame_size];
let size = match input_buffer {
Some(buf) => self.decode(buf, &mut output_buffer)?,
None => self.missing_packet(&mut output_buffer)?,
} * 2;
self.write_frame_to_buffer(&output_buffer[..size]);
Ok(())
}
pub fn samples_in_buffer(&self) -> usize {
self.circular_buffer.len()
}
pub fn read_frame_from_buffer<SampleType: Sample + FromSample<f32>>(
&mut self,
output_buffer: &mut [SampleType],
) {
let drain_count = output_buffer.len().min(self.circular_buffer.len());
let frame: Vec<f32> = self.circular_buffer.drain(..drain_count).collect();
for (ndx, output) in output_buffer.iter_mut().enumerate() {
let value: SampleType = if ndx < frame.len() {
SampleType::from_sample(frame[ndx])
} else {
SampleType::from_sample(0.0f32)
};
*output = value;
}
}
}

View File

@ -1,15 +1,10 @@
mod audio_record; mod audio_record;
mod audio_sink;
mod audio_source; mod audio_source;
mod input_buffer;
mod rx_thread;
mod tap_packet;
mod tx_thread; mod tx_thread;
mod udp_connection; mod udp_connection;
mod util; mod util;
use crate::audio_record::record; use crate::audio_record::record;
use crate::audio_sink::listen;
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use log::LevelFilter; use log::LevelFilter;
use std::net::SocketAddr; use std::net::SocketAddr;
@ -35,15 +30,6 @@ enum Tasks {
dest: SocketAddr, dest: SocketAddr,
}, },
/// Listen to data from a tap
Listen {
/// The audio device to use
#[arg(short, long, default_value_t = String::from("default"))]
device: String,
/// UDP addr to receive data on
addr: SocketAddr,
},
Record { Record {
/// UDP addr to receive data on /// UDP addr to receive data on
addr: SocketAddr, addr: SocketAddr,
@ -65,7 +51,6 @@ fn main() -> Result<(), anyhow::Error> {
match opt.command { match opt.command {
Tasks::Tap { device, src, dest } => audio_source::tap(&device, src, dest, host), Tasks::Tap { device, src, dest } => audio_source::tap(&device, src, dest, host),
Tasks::Listen { device, addr } => listen(&device, addr, host),
Tasks::Record { Tasks::Record {
addr, addr,
duration_ms, duration_ms,

View File

@ -1,127 +0,0 @@
use crate::tap_packet::TapPacketError;
use crate::udp_connection::OpusUdpConnection;
use crate::util::{CIRC_BUFFER_SIZE, ThreadMessage};
use audiopus::coder::Decoder;
use circular_buffer::CircularBuffer;
use std::sync::mpsc::Sender;
use thiserror::Error;
#[derive(Error, Debug)]
#[allow(clippy::enum_variant_names)]
pub enum Error {
#[error("IO error on Output: {0}")]
IoError(#[from] std::io::Error),
#[error("Decode error: {0}")]
DecodeError(#[from] audiopus::Error),
#[error("Tap packet error: {0}")]
TapPacketError(#[from] TapPacketError),
}
pub struct RxThread {
sender: Sender<ThreadMessage>,
decoder: Decoder,
opus_udp_connection: OpusUdpConnection,
circular_buffer: CircularBuffer<CIRC_BUFFER_SIZE, f32>,
expected_seq: u32,
exit: bool,
}
//impl RxThread {
// pub fn new(sender: Sender<ThreadMessage>, decoder: Decoder, opus_udp_connection: OpusUdpConnection) -> Self {
// Self {
// sender,
// decoder,
// opus_udp_connection,
// circular_buffer: Default::default(),
// expected_seq: 0,
// exit: false,
// }
// }
//
// fn decode(&mut self, input_buffer: &[u8], output_buffer: &mut [f32]) -> Result<usize, Error> {
// let hdr = TapPacketHeader::from_buffer(input_buffer)?;
//
// if !hdr.check_acceptable_latency_tolerance() {
// warn!(
// "Packet outside of tolerance: packet.timestamp={} time={}",
// hdr.timestamp,
// Utc::now().timestamp()
// );
// }
//
// if hdr.seq != self.expected_seq {
// warn!(
// "Found sequence mismatch, expected seq={}, {hdr:?}",
// self.expected_seq
// );
// }
//
// (self.expected_seq, _) = hdr.seq.overflowing_add(1);
//
// let packet = Packet::try_from(&input_buffer[TapPacketHeader::serialized_size()..])?;
//
// let signals = MutSignals::try_from(output_buffer)?;
// Ok(self.decoder.decode_float(Some(packet), signals, false)?)
// }
//
// fn missing_packet(&mut self, output_buffer: &mut [f32]) -> Result<usize, Error> {
// let signals = MutSignals::try_from(output_buffer)?;
// Ok(self.decoder.decode_float(None, signals, false)?)
// }
//
// fn write_frame_to_buffer(&mut self, frame: &[f32]) {
// self.circular_buffer.extend_from_slice(frame)
// }
//
//
// pub fn recv_packet(&mut self, input_buffer: Option<&[u8]>) -> Result<(), Error> {
// let mut output_buffer = vec![0.0; STEREO_FRAME_SIZE];
//
// let size = match input_buffer {
// Some(buf) => self.decode(buf, &mut output_buffer)?,
// None => self.missing_packet(&mut output_buffer)?,
// };
//
// self.write_frame_to_buffer(&output_buffer[..size]);
//
// Ok(())
// }
//
// pub fn send_next_frame(
// &mut self,
// ) {
// let drain_count = output_buffer.len().min(self.circular_buffer.len());
//
// let frame: Vec<f32> = self.circular_buffer.drain(..drain_count).collect();
//
// for (ndx, output) in output_buffer.iter_mut().enumerate() {
// let value: SampleType = if ndx < frame.len() {
// SampleType::from_sample(frame[ndx])
// } else {
// SampleType::from_sample(0.0f32)
// };
//
// *output = value;
// }
// }
//
// fn handle_udp_msg(&mut self) -> Result<(), Error> {
// let mut udp_packet = vec![0; TAP_PACKET_MAX_DATA_LEN + TapPacketHeader::serialized_size()];
//
// let size = self.opus_udp_connection.read(&mut udp_packet)?;
//
// let udp_packet = if size == 0 {
// None
// }
// else {
// Some(udp_packet.as_slice())
// };
//
// self.recv_packet(udp_packet)?;
//
//
// Ok(())
// }
//
// pub fn thread(self)
//}

View File

@ -1,52 +0,0 @@
use chrono::Utc;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum TapPacketError {
#[error("Buffer is too small to serialize data into")]
BufTooSmall,
}
#[derive(Debug, Clone)]
pub struct TapPacketHeader {
pub seq: u32,
pub timestamp: i64,
}
impl TapPacketHeader {
pub fn write_to_buffer(&self, buf: &mut [u8]) -> Result<(), TapPacketError> {
if buf.len() < Self::serialized_size() {
return Err(TapPacketError::BufTooSmall);
}
buf[0..size_of::<u32>()].copy_from_slice(self.seq.to_be_bytes().as_slice());
buf[size_of::<u32>()..Self::serialized_size()]
.copy_from_slice(self.timestamp.to_be_bytes().as_slice());
Ok(())
}
pub fn check_acceptable_latency_tolerance(&self) -> bool {
Utc::now().timestamp().abs_diff(self.timestamp) < 1
}
pub fn from_buffer(buf: &[u8]) -> Result<Self, TapPacketError> {
if buf.len() < Self::serialized_size() {
return Err(TapPacketError::BufTooSmall);
}
let mut seq_bytes = [0u8; size_of::<u32>()];
seq_bytes.copy_from_slice(&buf[0..size_of::<u32>()]);
let seq = u32::from_be_bytes(seq_bytes);
let mut timestamp_bytes = [0u8; size_of::<i64>()];
timestamp_bytes.copy_from_slice(&buf[size_of::<u32>()..Self::serialized_size()]);
let timestamp = i64::from_be_bytes(timestamp_bytes);
Ok(Self { seq, timestamp })
}
pub fn serialized_size() -> usize {
size_of::<u32>() + size_of::<i64>()
}
}

View File

@ -1,26 +1,24 @@
use crate::tap_packet::{TapPacketError, TapPacketHeader};
use crate::udp_connection::OpusUdpConnection; use crate::udp_connection::OpusUdpConnection;
use crate::util::STEREO_FRAME_SIZE; use crate::util::STEREO_FRAME_SIZE;
use crate::util::{CIRC_BUFFER_SIZE, TAP_PACKET_MAX_DATA_LEN, ThreadMessage}; use crate::util::{CIRC_BUFFER_SIZE, TAP_PACKET_MAX_DATA_LEN, ThreadMessage};
use audiopus::coder::Encoder; use audiopus::coder::Encoder;
use chrono::Utc;
use circular_buffer::CircularBuffer; use circular_buffer::CircularBuffer;
use log::{error, info, warn}; use log::{error, info, warn};
use prost::Message;
use std::io::Write; use std::io::Write;
use std::sync::mpsc::{Receiver, RecvError}; use std::sync::mpsc::{Receiver, RecvError};
use tap_interface::tap::message::TapMessage;
use thiserror::Error; use thiserror::Error;
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum Error { pub enum Error {
#[error("Failed to receive message: {0}")] #[error("Failed to receive message: {0}")]
RecvFailed(#[from] RecvError), RecvFailed(#[from] RecvError),
#[error("Buffer too small to handle error")]
BufferTooSmall,
#[error("IO error on offload: {0}")] #[error("IO error on offload: {0}")]
IoError(#[from] std::io::Error), IoFailure(#[from] std::io::Error),
#[error("Encode error: {0}")] #[error("Encode error: {0}")]
EncodeError(#[from] audiopus::Error), EncodeFailure(#[from] audiopus::Error),
#[error("Tap packet error: {0}")]
TapPacketError(#[from] TapPacketError),
} }
pub struct TxThread { pub struct TxThread {
@ -54,21 +52,19 @@ impl TxThread {
} }
fn send_opus_frame(&mut self) -> Result<(), Error> { fn send_opus_frame(&mut self) -> Result<(), Error> {
let mut output_buffer = let mut output_buffer = vec![0; TAP_PACKET_MAX_DATA_LEN];
vec![0; TAP_PACKET_MAX_DATA_LEN + TapPacketHeader::serialized_size()]; let size = self.encode(&mut output_buffer)?;
let size = self.encode(&mut output_buffer[TapPacketHeader::serialized_size()..])?;
let hdr = TapPacketHeader { output_buffer.truncate(size);
let tap_message = TapMessage {
seq: self.seq_count, seq: self.seq_count,
timestamp: chrono::Utc::now().timestamp(), timestamp: Utc::now().timestamp() as u64,
audio_data: output_buffer,
}; };
hdr.write_to_buffer(&mut output_buffer)?;
let total_size = TapPacketHeader::serialized_size() + size;
self.opus_udp_connection self.opus_udp_connection
.write_all(&output_buffer[0..total_size])?; .write_all(tap_message.encode_to_vec().as_slice())?;
(self.seq_count, _) = self.seq_count.overflowing_add(1); (self.seq_count, _) = self.seq_count.overflowing_add(1);
Ok(()) Ok(())

View File

@ -16,6 +16,7 @@ impl OpusUdpConnection {
}) })
} }
#[allow(dead_code)]
pub fn set_read_timeout(&mut self, timeout: Option<Duration>) -> std::io::Result<()> { pub fn set_read_timeout(&mut self, timeout: Option<Duration>) -> std::io::Result<()> {
self.udp_socket.set_read_timeout(timeout) self.udp_socket.set_read_timeout(timeout)
} }

View File

@ -9,6 +9,7 @@ pub const CIRC_BUFFER_SIZE: usize = STEREO_FRAME_SIZE * 10;
pub const TAP_PACKET_MAX_DATA_LEN: usize = 32 * 1024; pub const TAP_PACKET_MAX_DATA_LEN: usize = 32 * 1024;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
#[allow(dead_code)]
pub enum ThreadMessage { pub enum ThreadMessage {
Data(Vec<f32>), Data(Vec<f32>),
Exit, Exit,