From 0c0fe9c87db36b2fae349d384f4783073cf1848e Mon Sep 17 00:00:00 2001 From: Joey Hines Date: Sun, 31 Aug 2025 18:35:32 -0600 Subject: [PATCH] Added more commands to control stream, added protobuf message support --- Cargo.lock | 119 ++++++++++++++++++++++++++++++++++ Cargo.toml | 5 +- README.md | 6 ++ src/circular_buffer_source.rs | 72 ++++++++++++++++++++ src/main.rs | 116 +++++++++++++++++++++++++-------- src/stream.rs | 82 ----------------------- src/stream_task.rs | 47 ++++++++++++++ src/udp_source.rs | 112 +++++++++++++++++++++++++------- 8 files changed, 424 insertions(+), 135 deletions(-) create mode 100644 README.md create mode 100644 src/circular_buffer_source.rs delete mode 100644 src/stream.rs create mode 100644 src/stream_task.rs diff --git a/Cargo.lock b/Cargo.lock index 3b4d7cd..f115969 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -126,6 +126,12 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "anyhow" +version = "1.0.99" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0674a1ddeecb70197781e945de4b3b8ffb61fa939a5597bcf48503737663100" + [[package]] name = "arraydeque" version = "0.5.1" @@ -359,6 +365,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "circular-buffer" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23bdce1da528cadbac4654b5632bfcd8c6c63e25b1d42cea919a95958790b51d" + [[package]] name = "clap" version = "4.5.45" @@ -663,14 +675,17 @@ name = "denny-jack" version = "0.1.0" dependencies = [ "audiopus", + "circular-buffer", "clap", "config", "error", "log", "poise", + "prost", "serde", "songbird", "symphonia", + "tap-interface", "thiserror 2.0.15", "tokio", "tracing-subscriber", @@ -847,6 +862,12 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "fixedbitset" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" + [[package]] name = "flate2" version = "1.1.2" @@ -1520,6 +1541,15 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.15" @@ -1674,6 +1704,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "multimap" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" + [[package]] name = "nanorand" version = "0.7.0" @@ -1900,6 +1936,16 @@ dependencies = [ "sha2", ] +[[package]] +name = "petgraph" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" +dependencies = [ + "fixedbitset", + "indexmap", +] + [[package]] name = "pin-project" version = "1.1.10" @@ -2059,6 +2105,16 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "prettyplease" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" +dependencies = [ + "proc-macro2", + "syn 2.0.106", +] + [[package]] name = "primal-check" version = "0.3.4" @@ -2077,6 +2133,58 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1" +dependencies = [ + "heck 0.5.0", + "itertools", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 2.0.106", + "tempfile", +] + +[[package]] +name = "prost-derive" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 2.0.106", +] + +[[package]] +name = "prost-types" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9b4db3d6da204ed77bb26ba83b6122a73aeb2e87e25fbf7ad2e84c4ccbf8f72" +dependencies = [ + "prost", +] + [[package]] name = "pulldown-cmark" version = "0.9.6" @@ -3356,6 +3464,17 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" +[[package]] +name = "tap-interface" +version = "0.1.0" +source = "registry+https://git.ahines.net/joeyahines/_cargo-index.git" +checksum = "4450520c17b84421ce3addd0cb91d99f3a358e969f55440f9946b79ef8b48e67" +dependencies = [ + "prost", + "prost-build", + "prost-types", +] + [[package]] name = "tempfile" version = "3.20.0" diff --git a/Cargo.toml b/Cargo.toml index 7dcc2f7..9785b26 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,10 @@ tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } error = "0.1.9" log = "0.4.27" audiopus = "0.3.0-rc.0" +circular-buffer = "1.1.0" +tap-interface = {version = "0.1.0", registry = "ahines"} +prost = "0.14.1" [dependencies.symphonia] version = "0.5" -features = ["aac", "mp3", "isomp4", "alac"] # ...as well as any extras you need! \ No newline at end of file +features = ["aac", "mp3", "isomp4", "alac"] # ...as well as any extras you need! diff --git a/README.md b/README.md new file mode 100644 index 0000000..446850a --- /dev/null +++ b/README.md @@ -0,0 +1,6 @@ +# Denny Jack + +> DJ Denny Jack in da house!!! + +A bot for streaming audio from local sources to discord. + diff --git a/src/circular_buffer_source.rs b/src/circular_buffer_source.rs new file mode 100644 index 0000000..6ef8ef1 --- /dev/null +++ b/src/circular_buffer_source.rs @@ -0,0 +1,72 @@ +use circular_buffer::CircularBuffer; +use songbird::input::core::io::MediaSource; +use std::io::SeekFrom; +use std::{ + io::{Read, Seek, Write}, + sync::{Arc, Condvar, Mutex}, +}; + +const BUFFER_SIZE: usize = 64 * 1024; + +#[derive(Clone, Default)] +pub struct CircularBufferSource { + condvar: Arc, + circular_buffer: Arc>>, +} + +impl Read for CircularBufferSource { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let mut circ_buffer = self.circular_buffer.lock().expect("Mutex was poisoned"); + + if circ_buffer.is_empty() { + buf.fill(0); + self.condvar.notify_all(); + return Ok(buf.len()); + } + + let bytes_to_read = usize::min(buf.len(), circ_buffer.len()); + + for (ndx, value) in circ_buffer.drain(0..bytes_to_read).enumerate() { + buf[ndx] = value; + } + + self.condvar.notify_all(); + + Ok(bytes_to_read) + } +} + +impl Write for CircularBufferSource { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let mut circ_buffer = self.circular_buffer.lock().expect("Mutex was poisoned"); + + while circ_buffer.len() + buf.len() > BUFFER_SIZE { + circ_buffer = self.condvar.wait(circ_buffer).expect("Mutex was poisoned"); + } + + circ_buffer.extend_from_slice(buf); + self.condvar.notify_all(); + + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +impl Seek for CircularBufferSource { + fn seek(&mut self, _pos: SeekFrom) -> std::io::Result { + Err(std::io::ErrorKind::Unsupported.into()) + } +} + +impl MediaSource for CircularBufferSource { + fn is_seekable(&self) -> bool { + false + } + + fn byte_len(&self) -> Option { + None + } +} diff --git a/src/main.rs b/src/main.rs index 2304a8d..27b64c0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,36 +1,53 @@ +mod circular_buffer_source; mod dj_config; -mod stream; +mod stream_task; mod udp_source; +use crate::circular_buffer_source::CircularBufferSource; use crate::dj_config::{Args, DJConfig}; -use crate::stream::Stream; +use crate::stream_task::{StreamControl, stream_task}; use crate::udp_source::UdpSource; use clap::Parser; -use log::error; +use log::{error, info}; use poise::{PrefixFrameworkOptions, serenity_prelude as serenity}; use songbird::SerenityInit; -use songbird::input::RawAdapter; -use std::time::Duration; use thiserror::Error; use tokio::net::UdpSocket; +use tokio::sync::Mutex; +use tokio::sync::mpsc::{Sender, channel}; use tracing_subscriber::EnvFilter; use tracing_subscriber::filter::LevelFilter; #[derive(Error, Debug)] pub enum Error { #[error("Discord error: {0}")] - DiscordError(#[from] poise::serenity_prelude::Error), + ErrorWithDiscord(#[from] poise::serenity_prelude::Error), #[error("Songbird error: {0}")] - JoinError(#[from] songbird::error::JoinError), + FailedToJoin(#[from] songbird::error::JoinError), } +#[allow(dead_code)] struct Data { - config: DJConfig, + pub config: DJConfig, + pub udp_task: tokio::task::JoinHandle<()>, + pub circular_buffer_source: CircularBufferSource, + pub stream_control_tx: Mutex>>, } type Context<'a> = poise::Context<'a, Data, Error>; +const COMMAND_QUEUE_SIZE: usize = 10; + #[poise::command(slash_command, prefix_command, guild_only)] async fn join(ctx: Context<'_>) -> Result<(), Error> { + let mut stream_control_tx = match ctx.data().stream_control_tx.try_lock() { + Ok(stream_control_tx) => stream_control_tx, + Err(_err) => { + ctx.reply("Yo yo yo, looks like someone else is streaming on the net.") + .await?; + return Ok(()); + } + }; + let (guild_id, channel_id) = { let guild = ctx.guild().unwrap(); let channel_id = guild @@ -44,7 +61,7 @@ async fn join(ctx: Context<'_>) -> Result<(), Error> { let connect_to = match channel_id { Some(channel) => channel, None => { - ctx.reply("Not in a voice channel").await?; + ctx.reply("I don't know where you're hanging bud!").await?; return Ok(()); } @@ -57,33 +74,67 @@ async fn join(ctx: Context<'_>) -> Result<(), Error> { manager.join(guild_id, connect_to).await?; - if let Some(handler_lock) = manager.get(guild_id) { - let mut handler = handler_lock.lock().await; + let (tx, rx) = channel(COMMAND_QUEUE_SIZE); + *stream_control_tx = Some(tx); - let udp_input = UdpSocket::bind(ctx.data().config.server_addr) - .await - .unwrap(); - let stream = Stream::new(); + let circular_buffer_source = ctx.data().circular_buffer_source.clone(); + tokio::spawn(async move { stream_task(circular_buffer_source, manager, guild_id, rx).await }); - let mut udp_source = UdpSource::new(udp_input, stream.clone()); + Ok(()) +} - tokio::spawn(async move { udp_source.worker().await }); +async fn send_command(ctx: Context<'_>, command: StreamControl) -> Result { + let stream_control_tx = ctx.data().stream_control_tx.lock().await; - let adapter = RawAdapter::new(stream.clone(), 48000, 2); - - let _track = handler.play_only_input(adapter.into()); - - loop { - tokio::time::sleep(Duration::from_secs(1000)).await; + Ok(match &*stream_control_tx { + None => { + ctx.reply("I can't control what doesn't exist, deep man.") + .await?; + false } - } else { - ctx.reply("Can't play you sounds out of a voice channel pal") - .await?; + Some(tx) => { + tx.send(command).await.unwrap(); + true + } + }) +} + +#[poise::command(slash_command, prefix_command, guild_only)] +async fn play(ctx: Context<'_>) -> Result<(), Error> { + if send_command(ctx, StreamControl::Play).await? { + ctx.reply("Let's get that beat going!").await?; } Ok(()) } +#[poise::command(slash_command, prefix_command, guild_only)] +async fn pause(ctx: Context<'_>) -> Result<(), Error> { + if send_command(ctx, StreamControl::Pause).await? { + ctx.reply("Alright, I'll chill here").await?; + } + + Ok(()) +} + +#[poise::command(slash_command, prefix_command, guild_only)] +async fn disconnect(ctx: Context<'_>) -> Result<(), Error> { + let mut stream_control_tx = ctx.data().stream_control_tx.lock().await; + *stream_control_tx = None; + + let manager = songbird::get(ctx.serenity_context()) + .await + .expect("Songbird Voice client placed in at initialisation.") + .clone(); + + manager.leave(ctx.guild_id().unwrap()).await?; + + ctx.reply("Until next time, friend DJ Denny Jack OUT!") + .await?; + + Ok(()) +} + #[tokio::main] async fn main() { tracing_subscriber::fmt() @@ -107,12 +158,22 @@ async fn main() { let intents = serenity::GatewayIntents::non_privileged() | serenity::GatewayIntents::MESSAGE_CONTENT; + let udp_input = UdpSocket::bind(cfg.server_addr).await.unwrap(); + let source = CircularBufferSource::default(); + + let mut udp_source = UdpSource::new(udp_input, source.clone()); + let udp_task = tokio::spawn(async move { udp_source.worker().await }); + let data = Data { config: cfg.clone(), + udp_task, + circular_buffer_source: source, + stream_control_tx: Mutex::new(None), }; + let framework = poise::Framework::builder() .options(poise::FrameworkOptions { - commands: vec![join()], + commands: vec![join(), play(), pause(), disconnect()], prefix_options: PrefixFrameworkOptions { prefix: Some("!".to_string()), ..Default::default() @@ -127,6 +188,7 @@ async fn main() { }) .build(); + info!("Starting bot..."); let client = serenity::ClientBuilder::new(cfg.bot_token, intents) .framework(framework) .register_songbird() diff --git a/src/stream.rs b/src/stream.rs deleted file mode 100644 index 700103f..0000000 --- a/src/stream.rs +++ /dev/null @@ -1,82 +0,0 @@ -use songbird::input::core::io::MediaSource; -use std::io::SeekFrom; -use std::{ - io::{Read, Seek, Write}, - sync::{Arc, Condvar, Mutex}, -}; - -/// The lower the value, the less latency -/// -/// Too low of a value results in jittery audio -const BUFFER_SIZE: usize = 64 * 1024; - -#[derive(Clone, Default)] -pub struct Stream { - inner: Arc<(Mutex>, Condvar)>, -} - -impl Stream { - pub fn new() -> Self { - Self::default() - } -} - -impl Read for Stream { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - let (mutex, condvar) = &*self.inner; - let mut buffer = mutex.lock().expect("Mutex was poisoned"); - - // Prevent Discord jitter by filling buffer with zeroes if we don't have any audio - // (i.e. when you skip too far ahead in a song which hasn't been downloaded yet) - if buffer.is_empty() { - buf.fill(0); - condvar.notify_all(); - - return Ok(buf.len()); - } - - let max_read = usize::min(buf.len(), buffer.len()); - - buf[0..max_read].copy_from_slice(&buffer[0..max_read]); - buffer.drain(0..max_read); - condvar.notify_all(); - - Ok(max_read) - } -} - -impl Write for Stream { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - let (mutex, condvar) = &*self.inner; - let mut buffer = mutex.lock().expect("Mutex was poisoned"); - - while buffer.len() + buf.len() > BUFFER_SIZE { - buffer = condvar.wait(buffer).expect("Mutex was poisoned"); - } - - buffer.extend_from_slice(buf); - condvar.notify_all(); - - Ok(buf.len()) - } - - fn flush(&mut self) -> std::io::Result<()> { - Ok(()) - } -} - -impl Seek for Stream { - fn seek(&mut self, _pos: SeekFrom) -> std::io::Result { - Err(std::io::ErrorKind::Unsupported.into()) - } -} - -impl MediaSource for Stream { - fn is_seekable(&self) -> bool { - false - } - - fn byte_len(&self) -> Option { - None - } -} diff --git a/src/stream_task.rs b/src/stream_task.rs new file mode 100644 index 0000000..45273ae --- /dev/null +++ b/src/stream_task.rs @@ -0,0 +1,47 @@ +use crate::circular_buffer_source::CircularBufferSource; +use log::{error, info}; +use poise::serenity_prelude::GuildId; +use songbird::Songbird; +use songbird::input::RawAdapter; +use std::sync::Arc; +use tokio::sync::mpsc::Receiver; + +pub enum StreamControl { + Pause, + Play, +} + +pub async fn stream_task( + circular_buffer_source: CircularBufferSource, + manager: Arc, + guild: GuildId, + mut receiver: Receiver, +) { + let track = { + let handler_lock = manager.get(guild).unwrap(); + let mut handler = handler_lock.lock().await; + + let adapter = RawAdapter::new(circular_buffer_source, 48000, 2); + + handler.play_only_input(adapter.into()) + }; + + loop { + let msg = match receiver.recv().await { + None => { + info!("Stopping Stram Task..."); + return; + } + Some(msg) => msg, + }; + + let res = match msg { + StreamControl::Pause => track.pause(), + StreamControl::Play => track.play(), + }; + + if let Err(err) = res { + error!("Error controlling track: {err:?}") + } + } +} diff --git a/src/udp_source.rs b/src/udp_source.rs index c1418c6..d41817b 100644 --- a/src/udp_source.rs +++ b/src/udp_source.rs @@ -1,54 +1,116 @@ -use crate::stream::Stream; +use crate::circular_buffer_source::CircularBufferSource; use audiopus::coder::Decoder; use audiopus::{Channels, MutSignals, SampleRate}; +use log::{debug, error, info}; +use prost::{DecodeError, Message}; use std::io::Write; +use tap_interface::tap::message::TapMessage; +use thiserror::Error; use tokio::net::UdpSocket; pub const SAMPLE_RATE: SampleRate = SampleRate::Hz48000; -pub const SAMPLE_RATE_RAW: usize = 48_000; -pub const AUDIO_FRAME_RATE: usize = 50; -pub const MONO_FRAME_SIZE: usize = SAMPLE_RATE_RAW / AUDIO_FRAME_RATE; -pub const STEREO_FRAME_SIZE: usize = 2 * MONO_FRAME_SIZE; + +#[derive(Debug, Error)] +#[allow(clippy::enum_variant_names)] +pub enum Error { + #[error("IO Error {0}")] + IOError(#[from] std::io::Error), + #[error("Failed to decode audio in packet: {0}")] + OpusError(#[from] audiopus::Error), + #[error("Failed to decode packet: {0}")] + DecodeError(#[from] DecodeError), +} pub struct UdpSource { udp: UdpSocket, - stream: Stream, + source: CircularBufferSource, decoder: Decoder, + expected_seq: u32, } impl UdpSource { - pub fn new(udp_socket: UdpSocket, stream: Stream) -> Self { + pub fn new(udp_socket: UdpSocket, source: CircularBufferSource) -> Self { Self { udp: udp_socket, - stream, + source, decoder: Decoder::new(SAMPLE_RATE, Channels::Stereo).unwrap(), + expected_seq: 0, } } + fn decode_data( + &mut self, + input_buffer: &[u8], + output_buffer: &mut [f32], + ) -> Result { + let packet = audiopus::packet::Packet::try_from(input_buffer)?; + + let signals = MutSignals::try_from(output_buffer)?; + + let sample_size = self.decoder.decode_float(Some(packet), signals, false)?; + + Ok(sample_size * 2) + } + + fn write_audio_data_to_source(&mut self, samples: &[f32]) -> Result<(), Error> { + let mut sample_bytes = Vec::with_capacity(std::mem::size_of_val(samples)); + + for sample in samples { + let bytes = sample.to_le_bytes(); + + sample_bytes.extend_from_slice(bytes.as_slice()); + } + + self.source.write_all(&sample_bytes)?; + Ok(()) + } + + async fn recv_message(&mut self, buffer: &mut [u8]) -> Result { + self.udp.recv(buffer).await + } + pub async fn worker(&mut self) { + info!("Starting TAP UDP Endpoint"); let mut buffer = vec![0; 1024 * 32]; + let mut decode_data = vec![0.0; tap_interface::STEREO_FRAME_SIZE]; loop { - let len = self.udp.recv(&mut buffer).await.unwrap(); + let msg_size = match self.recv_message(&mut buffer).await { + Ok(msg_size) => msg_size, + Err(err) => { + error!("Failed to recv udp packet: {err:?}"); + continue; + } + }; - let packet = audiopus::packet::Packet::try_from(&buffer[12..len]).unwrap(); + let tap_message = match TapMessage::decode(&buffer[..msg_size]) { + Ok(tap_msg) => tap_msg, + Err(err) => { + error!("Failed decode proto message: {err:?}"); + continue; + } + }; - let mut samples = vec![0.0; STEREO_FRAME_SIZE]; - let signals = MutSignals::try_from(&mut samples).unwrap(); - let sample_size = self - .decoder - .decode_float(Some(packet), signals, false) - .unwrap() - * 2; - - let mut sample_bytes = Vec::with_capacity(sample_size * std::mem::size_of::()); - - for sample in &samples[0..sample_size] { - let bytes = sample.to_le_bytes(); - - sample_bytes.extend_from_slice(bytes.as_slice()); + if tap_message.seq != self.expected_seq { + debug!( + "Mismatch sequence count found expected '{}': {tap_message:?}", + tap_message.seq + ) } - self.stream.write_all(&sample_bytes).unwrap(); + (self.expected_seq, _) = tap_message.seq.overflowing_add(1); + + let decoded_samples = match self.decode_data(&tap_message.audio_data, &mut decode_data) + { + Ok(decoded_samples) => decoded_samples, + Err(err) => { + error!("Failed to decode audio data: {err:?}"); + continue; + } + }; + + if let Err(err) = self.write_audio_data_to_source(&decode_data[..decoded_samples]) { + error!("Failed to write data to source: {err:?}"); + } } } }