Added more commands to control stream, added protobuf message support
This commit is contained in:
parent
2d621929e3
commit
0c0fe9c87d
119
Cargo.lock
generated
119
Cargo.lock
generated
@ -126,6 +126,12 @@ dependencies = [
|
|||||||
"windows-sys 0.60.2",
|
"windows-sys 0.60.2",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "anyhow"
|
||||||
|
version = "1.0.99"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "b0674a1ddeecb70197781e945de4b3b8ffb61fa939a5597bcf48503737663100"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "arraydeque"
|
name = "arraydeque"
|
||||||
version = "0.5.1"
|
version = "0.5.1"
|
||||||
@ -359,6 +365,12 @@ dependencies = [
|
|||||||
"zeroize",
|
"zeroize",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "circular-buffer"
|
||||||
|
version = "1.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "23bdce1da528cadbac4654b5632bfcd8c6c63e25b1d42cea919a95958790b51d"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "clap"
|
name = "clap"
|
||||||
version = "4.5.45"
|
version = "4.5.45"
|
||||||
@ -663,14 +675,17 @@ name = "denny-jack"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"audiopus",
|
"audiopus",
|
||||||
|
"circular-buffer",
|
||||||
"clap",
|
"clap",
|
||||||
"config",
|
"config",
|
||||||
"error",
|
"error",
|
||||||
"log",
|
"log",
|
||||||
"poise",
|
"poise",
|
||||||
|
"prost",
|
||||||
"serde",
|
"serde",
|
||||||
"songbird",
|
"songbird",
|
||||||
"symphonia",
|
"symphonia",
|
||||||
|
"tap-interface",
|
||||||
"thiserror 2.0.15",
|
"thiserror 2.0.15",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
@ -847,6 +862,12 @@ version = "2.3.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
|
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "fixedbitset"
|
||||||
|
version = "0.5.7"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "flate2"
|
name = "flate2"
|
||||||
version = "1.1.2"
|
version = "1.1.2"
|
||||||
@ -1520,6 +1541,15 @@ version = "1.70.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
|
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "itertools"
|
||||||
|
version = "0.14.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285"
|
||||||
|
dependencies = [
|
||||||
|
"either",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "itoa"
|
name = "itoa"
|
||||||
version = "1.0.15"
|
version = "1.0.15"
|
||||||
@ -1674,6 +1704,12 @@ dependencies = [
|
|||||||
"windows-sys 0.59.0",
|
"windows-sys 0.59.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "multimap"
|
||||||
|
version = "0.10.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "nanorand"
|
name = "nanorand"
|
||||||
version = "0.7.0"
|
version = "0.7.0"
|
||||||
@ -1900,6 +1936,16 @@ dependencies = [
|
|||||||
"sha2",
|
"sha2",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "petgraph"
|
||||||
|
version = "0.7.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772"
|
||||||
|
dependencies = [
|
||||||
|
"fixedbitset",
|
||||||
|
"indexmap",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pin-project"
|
name = "pin-project"
|
||||||
version = "1.1.10"
|
version = "1.1.10"
|
||||||
@ -2059,6 +2105,16 @@ dependencies = [
|
|||||||
"zerocopy",
|
"zerocopy",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "prettyplease"
|
||||||
|
version = "0.2.37"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"syn 2.0.106",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "primal-check"
|
name = "primal-check"
|
||||||
version = "0.3.4"
|
version = "0.3.4"
|
||||||
@ -2077,6 +2133,58 @@ dependencies = [
|
|||||||
"unicode-ident",
|
"unicode-ident",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "prost"
|
||||||
|
version = "0.14.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d"
|
||||||
|
dependencies = [
|
||||||
|
"bytes",
|
||||||
|
"prost-derive",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "prost-build"
|
||||||
|
version = "0.14.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1"
|
||||||
|
dependencies = [
|
||||||
|
"heck 0.5.0",
|
||||||
|
"itertools",
|
||||||
|
"log",
|
||||||
|
"multimap",
|
||||||
|
"once_cell",
|
||||||
|
"petgraph",
|
||||||
|
"prettyplease",
|
||||||
|
"prost",
|
||||||
|
"prost-types",
|
||||||
|
"regex",
|
||||||
|
"syn 2.0.106",
|
||||||
|
"tempfile",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "prost-derive"
|
||||||
|
version = "0.14.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425"
|
||||||
|
dependencies = [
|
||||||
|
"anyhow",
|
||||||
|
"itertools",
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn 2.0.106",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "prost-types"
|
||||||
|
version = "0.14.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "b9b4db3d6da204ed77bb26ba83b6122a73aeb2e87e25fbf7ad2e84c4ccbf8f72"
|
||||||
|
dependencies = [
|
||||||
|
"prost",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pulldown-cmark"
|
name = "pulldown-cmark"
|
||||||
version = "0.9.6"
|
version = "0.9.6"
|
||||||
@ -3356,6 +3464,17 @@ version = "0.2.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
|
checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tap-interface"
|
||||||
|
version = "0.1.0"
|
||||||
|
source = "registry+https://git.ahines.net/joeyahines/_cargo-index.git"
|
||||||
|
checksum = "4450520c17b84421ce3addd0cb91d99f3a358e969f55440f9946b79ef8b48e67"
|
||||||
|
dependencies = [
|
||||||
|
"prost",
|
||||||
|
"prost-build",
|
||||||
|
"prost-types",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tempfile"
|
name = "tempfile"
|
||||||
version = "3.20.0"
|
version = "3.20.0"
|
||||||
|
@ -15,6 +15,9 @@ tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
|
|||||||
error = "0.1.9"
|
error = "0.1.9"
|
||||||
log = "0.4.27"
|
log = "0.4.27"
|
||||||
audiopus = "0.3.0-rc.0"
|
audiopus = "0.3.0-rc.0"
|
||||||
|
circular-buffer = "1.1.0"
|
||||||
|
tap-interface = {version = "0.1.0", registry = "ahines"}
|
||||||
|
prost = "0.14.1"
|
||||||
|
|
||||||
[dependencies.symphonia]
|
[dependencies.symphonia]
|
||||||
version = "0.5"
|
version = "0.5"
|
||||||
|
6
README.md
Normal file
6
README.md
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
# Denny Jack
|
||||||
|
|
||||||
|
> DJ Denny Jack in da house!!!
|
||||||
|
|
||||||
|
A bot for streaming audio from local sources to discord.
|
||||||
|
|
72
src/circular_buffer_source.rs
Normal file
72
src/circular_buffer_source.rs
Normal file
@ -0,0 +1,72 @@
|
|||||||
|
use circular_buffer::CircularBuffer;
|
||||||
|
use songbird::input::core::io::MediaSource;
|
||||||
|
use std::io::SeekFrom;
|
||||||
|
use std::{
|
||||||
|
io::{Read, Seek, Write},
|
||||||
|
sync::{Arc, Condvar, Mutex},
|
||||||
|
};
|
||||||
|
|
||||||
|
const BUFFER_SIZE: usize = 64 * 1024;
|
||||||
|
|
||||||
|
#[derive(Clone, Default)]
|
||||||
|
pub struct CircularBufferSource {
|
||||||
|
condvar: Arc<Condvar>,
|
||||||
|
circular_buffer: Arc<Mutex<CircularBuffer<BUFFER_SIZE, u8>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Read for CircularBufferSource {
|
||||||
|
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||||
|
let mut circ_buffer = self.circular_buffer.lock().expect("Mutex was poisoned");
|
||||||
|
|
||||||
|
if circ_buffer.is_empty() {
|
||||||
|
buf.fill(0);
|
||||||
|
self.condvar.notify_all();
|
||||||
|
return Ok(buf.len());
|
||||||
|
}
|
||||||
|
|
||||||
|
let bytes_to_read = usize::min(buf.len(), circ_buffer.len());
|
||||||
|
|
||||||
|
for (ndx, value) in circ_buffer.drain(0..bytes_to_read).enumerate() {
|
||||||
|
buf[ndx] = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.condvar.notify_all();
|
||||||
|
|
||||||
|
Ok(bytes_to_read)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Write for CircularBufferSource {
|
||||||
|
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||||
|
let mut circ_buffer = self.circular_buffer.lock().expect("Mutex was poisoned");
|
||||||
|
|
||||||
|
while circ_buffer.len() + buf.len() > BUFFER_SIZE {
|
||||||
|
circ_buffer = self.condvar.wait(circ_buffer).expect("Mutex was poisoned");
|
||||||
|
}
|
||||||
|
|
||||||
|
circ_buffer.extend_from_slice(buf);
|
||||||
|
self.condvar.notify_all();
|
||||||
|
|
||||||
|
Ok(buf.len())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn flush(&mut self) -> std::io::Result<()> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Seek for CircularBufferSource {
|
||||||
|
fn seek(&mut self, _pos: SeekFrom) -> std::io::Result<u64> {
|
||||||
|
Err(std::io::ErrorKind::Unsupported.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MediaSource for CircularBufferSource {
|
||||||
|
fn is_seekable(&self) -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
|
fn byte_len(&self) -> Option<u64> {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
118
src/main.rs
118
src/main.rs
@ -1,36 +1,53 @@
|
|||||||
|
mod circular_buffer_source;
|
||||||
mod dj_config;
|
mod dj_config;
|
||||||
mod stream;
|
mod stream_task;
|
||||||
mod udp_source;
|
mod udp_source;
|
||||||
|
|
||||||
|
use crate::circular_buffer_source::CircularBufferSource;
|
||||||
use crate::dj_config::{Args, DJConfig};
|
use crate::dj_config::{Args, DJConfig};
|
||||||
use crate::stream::Stream;
|
use crate::stream_task::{StreamControl, stream_task};
|
||||||
use crate::udp_source::UdpSource;
|
use crate::udp_source::UdpSource;
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use log::error;
|
use log::{error, info};
|
||||||
use poise::{PrefixFrameworkOptions, serenity_prelude as serenity};
|
use poise::{PrefixFrameworkOptions, serenity_prelude as serenity};
|
||||||
use songbird::SerenityInit;
|
use songbird::SerenityInit;
|
||||||
use songbird::input::RawAdapter;
|
|
||||||
use std::time::Duration;
|
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tokio::net::UdpSocket;
|
use tokio::net::UdpSocket;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
use tokio::sync::mpsc::{Sender, channel};
|
||||||
use tracing_subscriber::EnvFilter;
|
use tracing_subscriber::EnvFilter;
|
||||||
use tracing_subscriber::filter::LevelFilter;
|
use tracing_subscriber::filter::LevelFilter;
|
||||||
|
|
||||||
#[derive(Error, Debug)]
|
#[derive(Error, Debug)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
#[error("Discord error: {0}")]
|
#[error("Discord error: {0}")]
|
||||||
DiscordError(#[from] poise::serenity_prelude::Error),
|
ErrorWithDiscord(#[from] poise::serenity_prelude::Error),
|
||||||
#[error("Songbird error: {0}")]
|
#[error("Songbird error: {0}")]
|
||||||
JoinError(#[from] songbird::error::JoinError),
|
FailedToJoin(#[from] songbird::error::JoinError),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
struct Data {
|
struct Data {
|
||||||
config: DJConfig,
|
pub config: DJConfig,
|
||||||
|
pub udp_task: tokio::task::JoinHandle<()>,
|
||||||
|
pub circular_buffer_source: CircularBufferSource,
|
||||||
|
pub stream_control_tx: Mutex<Option<Sender<StreamControl>>>,
|
||||||
}
|
}
|
||||||
type Context<'a> = poise::Context<'a, Data, Error>;
|
type Context<'a> = poise::Context<'a, Data, Error>;
|
||||||
|
|
||||||
|
const COMMAND_QUEUE_SIZE: usize = 10;
|
||||||
|
|
||||||
#[poise::command(slash_command, prefix_command, guild_only)]
|
#[poise::command(slash_command, prefix_command, guild_only)]
|
||||||
async fn join(ctx: Context<'_>) -> Result<(), Error> {
|
async fn join(ctx: Context<'_>) -> Result<(), Error> {
|
||||||
|
let mut stream_control_tx = match ctx.data().stream_control_tx.try_lock() {
|
||||||
|
Ok(stream_control_tx) => stream_control_tx,
|
||||||
|
Err(_err) => {
|
||||||
|
ctx.reply("Yo yo yo, looks like someone else is streaming on the net.")
|
||||||
|
.await?;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let (guild_id, channel_id) = {
|
let (guild_id, channel_id) = {
|
||||||
let guild = ctx.guild().unwrap();
|
let guild = ctx.guild().unwrap();
|
||||||
let channel_id = guild
|
let channel_id = guild
|
||||||
@ -44,7 +61,7 @@ async fn join(ctx: Context<'_>) -> Result<(), Error> {
|
|||||||
let connect_to = match channel_id {
|
let connect_to = match channel_id {
|
||||||
Some(channel) => channel,
|
Some(channel) => channel,
|
||||||
None => {
|
None => {
|
||||||
ctx.reply("Not in a voice channel").await?;
|
ctx.reply("I don't know where you're hanging bud!").await?;
|
||||||
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
@ -57,29 +74,63 @@ async fn join(ctx: Context<'_>) -> Result<(), Error> {
|
|||||||
|
|
||||||
manager.join(guild_id, connect_to).await?;
|
manager.join(guild_id, connect_to).await?;
|
||||||
|
|
||||||
if let Some(handler_lock) = manager.get(guild_id) {
|
let (tx, rx) = channel(COMMAND_QUEUE_SIZE);
|
||||||
let mut handler = handler_lock.lock().await;
|
*stream_control_tx = Some(tx);
|
||||||
|
|
||||||
let udp_input = UdpSocket::bind(ctx.data().config.server_addr)
|
let circular_buffer_source = ctx.data().circular_buffer_source.clone();
|
||||||
.await
|
tokio::spawn(async move { stream_task(circular_buffer_source, manager, guild_id, rx).await });
|
||||||
.unwrap();
|
|
||||||
let stream = Stream::new();
|
|
||||||
|
|
||||||
let mut udp_source = UdpSource::new(udp_input, stream.clone());
|
Ok(())
|
||||||
|
|
||||||
tokio::spawn(async move { udp_source.worker().await });
|
|
||||||
|
|
||||||
let adapter = RawAdapter::new(stream.clone(), 48000, 2);
|
|
||||||
|
|
||||||
let _track = handler.play_only_input(adapter.into());
|
|
||||||
|
|
||||||
loop {
|
|
||||||
tokio::time::sleep(Duration::from_secs(1000)).await;
|
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
ctx.reply("Can't play you sounds out of a voice channel pal")
|
async fn send_command(ctx: Context<'_>, command: StreamControl) -> Result<bool, Error> {
|
||||||
|
let stream_control_tx = ctx.data().stream_control_tx.lock().await;
|
||||||
|
|
||||||
|
Ok(match &*stream_control_tx {
|
||||||
|
None => {
|
||||||
|
ctx.reply("I can't control what doesn't exist, deep man.")
|
||||||
.await?;
|
.await?;
|
||||||
|
false
|
||||||
}
|
}
|
||||||
|
Some(tx) => {
|
||||||
|
tx.send(command).await.unwrap();
|
||||||
|
true
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[poise::command(slash_command, prefix_command, guild_only)]
|
||||||
|
async fn play(ctx: Context<'_>) -> Result<(), Error> {
|
||||||
|
if send_command(ctx, StreamControl::Play).await? {
|
||||||
|
ctx.reply("Let's get that beat going!").await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[poise::command(slash_command, prefix_command, guild_only)]
|
||||||
|
async fn pause(ctx: Context<'_>) -> Result<(), Error> {
|
||||||
|
if send_command(ctx, StreamControl::Pause).await? {
|
||||||
|
ctx.reply("Alright, I'll chill here").await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[poise::command(slash_command, prefix_command, guild_only)]
|
||||||
|
async fn disconnect(ctx: Context<'_>) -> Result<(), Error> {
|
||||||
|
let mut stream_control_tx = ctx.data().stream_control_tx.lock().await;
|
||||||
|
*stream_control_tx = None;
|
||||||
|
|
||||||
|
let manager = songbird::get(ctx.serenity_context())
|
||||||
|
.await
|
||||||
|
.expect("Songbird Voice client placed in at initialisation.")
|
||||||
|
.clone();
|
||||||
|
|
||||||
|
manager.leave(ctx.guild_id().unwrap()).await?;
|
||||||
|
|
||||||
|
ctx.reply("Until next time, friend DJ Denny Jack OUT!")
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -107,12 +158,22 @@ async fn main() {
|
|||||||
let intents =
|
let intents =
|
||||||
serenity::GatewayIntents::non_privileged() | serenity::GatewayIntents::MESSAGE_CONTENT;
|
serenity::GatewayIntents::non_privileged() | serenity::GatewayIntents::MESSAGE_CONTENT;
|
||||||
|
|
||||||
|
let udp_input = UdpSocket::bind(cfg.server_addr).await.unwrap();
|
||||||
|
let source = CircularBufferSource::default();
|
||||||
|
|
||||||
|
let mut udp_source = UdpSource::new(udp_input, source.clone());
|
||||||
|
let udp_task = tokio::spawn(async move { udp_source.worker().await });
|
||||||
|
|
||||||
let data = Data {
|
let data = Data {
|
||||||
config: cfg.clone(),
|
config: cfg.clone(),
|
||||||
|
udp_task,
|
||||||
|
circular_buffer_source: source,
|
||||||
|
stream_control_tx: Mutex::new(None),
|
||||||
};
|
};
|
||||||
|
|
||||||
let framework = poise::Framework::builder()
|
let framework = poise::Framework::builder()
|
||||||
.options(poise::FrameworkOptions {
|
.options(poise::FrameworkOptions {
|
||||||
commands: vec![join()],
|
commands: vec![join(), play(), pause(), disconnect()],
|
||||||
prefix_options: PrefixFrameworkOptions {
|
prefix_options: PrefixFrameworkOptions {
|
||||||
prefix: Some("!".to_string()),
|
prefix: Some("!".to_string()),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
@ -127,6 +188,7 @@ async fn main() {
|
|||||||
})
|
})
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
info!("Starting bot...");
|
||||||
let client = serenity::ClientBuilder::new(cfg.bot_token, intents)
|
let client = serenity::ClientBuilder::new(cfg.bot_token, intents)
|
||||||
.framework(framework)
|
.framework(framework)
|
||||||
.register_songbird()
|
.register_songbird()
|
||||||
|
@ -1,82 +0,0 @@
|
|||||||
use songbird::input::core::io::MediaSource;
|
|
||||||
use std::io::SeekFrom;
|
|
||||||
use std::{
|
|
||||||
io::{Read, Seek, Write},
|
|
||||||
sync::{Arc, Condvar, Mutex},
|
|
||||||
};
|
|
||||||
|
|
||||||
/// The lower the value, the less latency
|
|
||||||
///
|
|
||||||
/// Too low of a value results in jittery audio
|
|
||||||
const BUFFER_SIZE: usize = 64 * 1024;
|
|
||||||
|
|
||||||
#[derive(Clone, Default)]
|
|
||||||
pub struct Stream {
|
|
||||||
inner: Arc<(Mutex<Vec<u8>>, Condvar)>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Stream {
|
|
||||||
pub fn new() -> Self {
|
|
||||||
Self::default()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Read for Stream {
|
|
||||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
|
||||||
let (mutex, condvar) = &*self.inner;
|
|
||||||
let mut buffer = mutex.lock().expect("Mutex was poisoned");
|
|
||||||
|
|
||||||
// Prevent Discord jitter by filling buffer with zeroes if we don't have any audio
|
|
||||||
// (i.e. when you skip too far ahead in a song which hasn't been downloaded yet)
|
|
||||||
if buffer.is_empty() {
|
|
||||||
buf.fill(0);
|
|
||||||
condvar.notify_all();
|
|
||||||
|
|
||||||
return Ok(buf.len());
|
|
||||||
}
|
|
||||||
|
|
||||||
let max_read = usize::min(buf.len(), buffer.len());
|
|
||||||
|
|
||||||
buf[0..max_read].copy_from_slice(&buffer[0..max_read]);
|
|
||||||
buffer.drain(0..max_read);
|
|
||||||
condvar.notify_all();
|
|
||||||
|
|
||||||
Ok(max_read)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Write for Stream {
|
|
||||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
|
||||||
let (mutex, condvar) = &*self.inner;
|
|
||||||
let mut buffer = mutex.lock().expect("Mutex was poisoned");
|
|
||||||
|
|
||||||
while buffer.len() + buf.len() > BUFFER_SIZE {
|
|
||||||
buffer = condvar.wait(buffer).expect("Mutex was poisoned");
|
|
||||||
}
|
|
||||||
|
|
||||||
buffer.extend_from_slice(buf);
|
|
||||||
condvar.notify_all();
|
|
||||||
|
|
||||||
Ok(buf.len())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn flush(&mut self) -> std::io::Result<()> {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Seek for Stream {
|
|
||||||
fn seek(&mut self, _pos: SeekFrom) -> std::io::Result<u64> {
|
|
||||||
Err(std::io::ErrorKind::Unsupported.into())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MediaSource for Stream {
|
|
||||||
fn is_seekable(&self) -> bool {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
|
|
||||||
fn byte_len(&self) -> Option<u64> {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
47
src/stream_task.rs
Normal file
47
src/stream_task.rs
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
use crate::circular_buffer_source::CircularBufferSource;
|
||||||
|
use log::{error, info};
|
||||||
|
use poise::serenity_prelude::GuildId;
|
||||||
|
use songbird::Songbird;
|
||||||
|
use songbird::input::RawAdapter;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::mpsc::Receiver;
|
||||||
|
|
||||||
|
pub enum StreamControl {
|
||||||
|
Pause,
|
||||||
|
Play,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn stream_task(
|
||||||
|
circular_buffer_source: CircularBufferSource,
|
||||||
|
manager: Arc<Songbird>,
|
||||||
|
guild: GuildId,
|
||||||
|
mut receiver: Receiver<StreamControl>,
|
||||||
|
) {
|
||||||
|
let track = {
|
||||||
|
let handler_lock = manager.get(guild).unwrap();
|
||||||
|
let mut handler = handler_lock.lock().await;
|
||||||
|
|
||||||
|
let adapter = RawAdapter::new(circular_buffer_source, 48000, 2);
|
||||||
|
|
||||||
|
handler.play_only_input(adapter.into())
|
||||||
|
};
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let msg = match receiver.recv().await {
|
||||||
|
None => {
|
||||||
|
info!("Stopping Stram Task...");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Some(msg) => msg,
|
||||||
|
};
|
||||||
|
|
||||||
|
let res = match msg {
|
||||||
|
StreamControl::Pause => track.pause(),
|
||||||
|
StreamControl::Play => track.play(),
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(err) = res {
|
||||||
|
error!("Error controlling track: {err:?}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,54 +1,116 @@
|
|||||||
use crate::stream::Stream;
|
use crate::circular_buffer_source::CircularBufferSource;
|
||||||
use audiopus::coder::Decoder;
|
use audiopus::coder::Decoder;
|
||||||
use audiopus::{Channels, MutSignals, SampleRate};
|
use audiopus::{Channels, MutSignals, SampleRate};
|
||||||
|
use log::{debug, error, info};
|
||||||
|
use prost::{DecodeError, Message};
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
|
use tap_interface::tap::message::TapMessage;
|
||||||
|
use thiserror::Error;
|
||||||
use tokio::net::UdpSocket;
|
use tokio::net::UdpSocket;
|
||||||
|
|
||||||
pub const SAMPLE_RATE: SampleRate = SampleRate::Hz48000;
|
pub const SAMPLE_RATE: SampleRate = SampleRate::Hz48000;
|
||||||
pub const SAMPLE_RATE_RAW: usize = 48_000;
|
|
||||||
pub const AUDIO_FRAME_RATE: usize = 50;
|
#[derive(Debug, Error)]
|
||||||
pub const MONO_FRAME_SIZE: usize = SAMPLE_RATE_RAW / AUDIO_FRAME_RATE;
|
#[allow(clippy::enum_variant_names)]
|
||||||
pub const STEREO_FRAME_SIZE: usize = 2 * MONO_FRAME_SIZE;
|
pub enum Error {
|
||||||
|
#[error("IO Error {0}")]
|
||||||
|
IOError(#[from] std::io::Error),
|
||||||
|
#[error("Failed to decode audio in packet: {0}")]
|
||||||
|
OpusError(#[from] audiopus::Error),
|
||||||
|
#[error("Failed to decode packet: {0}")]
|
||||||
|
DecodeError(#[from] DecodeError),
|
||||||
|
}
|
||||||
|
|
||||||
pub struct UdpSource {
|
pub struct UdpSource {
|
||||||
udp: UdpSocket,
|
udp: UdpSocket,
|
||||||
stream: Stream,
|
source: CircularBufferSource,
|
||||||
decoder: Decoder,
|
decoder: Decoder,
|
||||||
|
expected_seq: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl UdpSource {
|
impl UdpSource {
|
||||||
pub fn new(udp_socket: UdpSocket, stream: Stream) -> Self {
|
pub fn new(udp_socket: UdpSocket, source: CircularBufferSource) -> Self {
|
||||||
Self {
|
Self {
|
||||||
udp: udp_socket,
|
udp: udp_socket,
|
||||||
stream,
|
source,
|
||||||
decoder: Decoder::new(SAMPLE_RATE, Channels::Stereo).unwrap(),
|
decoder: Decoder::new(SAMPLE_RATE, Channels::Stereo).unwrap(),
|
||||||
|
expected_seq: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn worker(&mut self) {
|
fn decode_data(
|
||||||
let mut buffer = vec![0; 1024 * 32];
|
&mut self,
|
||||||
loop {
|
input_buffer: &[u8],
|
||||||
let len = self.udp.recv(&mut buffer).await.unwrap();
|
output_buffer: &mut [f32],
|
||||||
|
) -> Result<usize, Error> {
|
||||||
|
let packet = audiopus::packet::Packet::try_from(input_buffer)?;
|
||||||
|
|
||||||
let packet = audiopus::packet::Packet::try_from(&buffer[12..len]).unwrap();
|
let signals = MutSignals::try_from(output_buffer)?;
|
||||||
|
|
||||||
let mut samples = vec![0.0; STEREO_FRAME_SIZE];
|
let sample_size = self.decoder.decode_float(Some(packet), signals, false)?;
|
||||||
let signals = MutSignals::try_from(&mut samples).unwrap();
|
|
||||||
let sample_size = self
|
|
||||||
.decoder
|
|
||||||
.decode_float(Some(packet), signals, false)
|
|
||||||
.unwrap()
|
|
||||||
* 2;
|
|
||||||
|
|
||||||
let mut sample_bytes = Vec::with_capacity(sample_size * std::mem::size_of::<f32>());
|
Ok(sample_size * 2)
|
||||||
|
}
|
||||||
|
|
||||||
for sample in &samples[0..sample_size] {
|
fn write_audio_data_to_source(&mut self, samples: &[f32]) -> Result<(), Error> {
|
||||||
|
let mut sample_bytes = Vec::with_capacity(std::mem::size_of_val(samples));
|
||||||
|
|
||||||
|
for sample in samples {
|
||||||
let bytes = sample.to_le_bytes();
|
let bytes = sample.to_le_bytes();
|
||||||
|
|
||||||
sample_bytes.extend_from_slice(bytes.as_slice());
|
sample_bytes.extend_from_slice(bytes.as_slice());
|
||||||
}
|
}
|
||||||
|
|
||||||
self.stream.write_all(&sample_bytes).unwrap();
|
self.source.write_all(&sample_bytes)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn recv_message(&mut self, buffer: &mut [u8]) -> Result<usize, std::io::Error> {
|
||||||
|
self.udp.recv(buffer).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn worker(&mut self) {
|
||||||
|
info!("Starting TAP UDP Endpoint");
|
||||||
|
let mut buffer = vec![0; 1024 * 32];
|
||||||
|
let mut decode_data = vec![0.0; tap_interface::STEREO_FRAME_SIZE];
|
||||||
|
loop {
|
||||||
|
let msg_size = match self.recv_message(&mut buffer).await {
|
||||||
|
Ok(msg_size) => msg_size,
|
||||||
|
Err(err) => {
|
||||||
|
error!("Failed to recv udp packet: {err:?}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let tap_message = match TapMessage::decode(&buffer[..msg_size]) {
|
||||||
|
Ok(tap_msg) => tap_msg,
|
||||||
|
Err(err) => {
|
||||||
|
error!("Failed decode proto message: {err:?}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if tap_message.seq != self.expected_seq {
|
||||||
|
debug!(
|
||||||
|
"Mismatch sequence count found expected '{}': {tap_message:?}",
|
||||||
|
tap_message.seq
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
(self.expected_seq, _) = tap_message.seq.overflowing_add(1);
|
||||||
|
|
||||||
|
let decoded_samples = match self.decode_data(&tap_message.audio_data, &mut decode_data)
|
||||||
|
{
|
||||||
|
Ok(decoded_samples) => decoded_samples,
|
||||||
|
Err(err) => {
|
||||||
|
error!("Failed to decode audio data: {err:?}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(err) = self.write_audio_data_to_source(&decode_data[..decoded_samples]) {
|
||||||
|
error!("Failed to write data to source: {err:?}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user