Switch to NATS backbone
This commit is contained in:
parent
e3713f2512
commit
a7f663c905
@ -1,2 +1,2 @@
|
||||
[registries.jojo-dev]
|
||||
index = "https://git.jojodev.com/joeyahines/_cargo-index.git"
|
||||
[registries.ahines]
|
||||
index = "https://git.ahines.net/joeyahines/_cargo-index.git"
|
||||
1842
Cargo.lock
generated
1842
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
17
Cargo.toml
17
Cargo.toml
@ -1,19 +1,22 @@
|
||||
[package]
|
||||
name = "roll_bot"
|
||||
version = "0.2.0"
|
||||
edition = "2021"
|
||||
version = "1.0.0"
|
||||
edition = "2024"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
structopt = "0.3.26"
|
||||
config = "0.14.0"
|
||||
rppal = "0.18.0"
|
||||
raas_types = { version = "0.0.5", registry = "jojo-dev"}
|
||||
config = "0.15.16"
|
||||
rppal = "0.22.1"
|
||||
raas_types = { version = "1.0.0", registry = "ahines"}
|
||||
log = "0.4.21"
|
||||
env_logger = "0.11.3"
|
||||
thiserror = "1.0.61"
|
||||
thiserror = "2.0.16"
|
||||
serde = { version = "1.0.203", features = ["derive"] }
|
||||
prost = "0.12.6"
|
||||
prost = "0.14.1"
|
||||
async-nats = "0.44.1"
|
||||
tokio = { version = "1.47.1", features = ["io-util", "macros", "rt", "rt-multi-thread"] }
|
||||
futures-util = "0.3.31"
|
||||
|
||||
|
||||
|
||||
122
src/client.rs
122
src/client.rs
@ -1,4 +1,4 @@
|
||||
use crate::config::RollBotConfig;
|
||||
use futures_util::StreamExt;
|
||||
use crate::roll_bot::RollBot;
|
||||
use log::{error, info, warn};
|
||||
use prost::Message;
|
||||
@ -6,11 +6,8 @@ use raas_types::raas::bot::roll::{roll_cmd, Roll, RollCmd, RollImage, RollRespon
|
||||
use raas_types::raas::cmd::request::Cmd;
|
||||
use raas_types::raas::cmd::Command;
|
||||
use raas_types::raas::ping::{Ping, Pong};
|
||||
use raas_types::raas::register::{BotTypes, Register, RegisterResponse};
|
||||
use raas_types::raas::resp::response::Resp;
|
||||
use raas_types::raas::resp::Response;
|
||||
use raas_types::raas::{recv_raas_msg, send_raas_msg};
|
||||
use std::net::TcpStream;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use thiserror::Error;
|
||||
|
||||
@ -24,86 +21,31 @@ pub enum Error {
|
||||
ProtoBufEncode(#[from] prost::EncodeError),
|
||||
#[error("Protobuf Decode Error")]
|
||||
ProtoBufDecode(#[from] prost::DecodeError),
|
||||
#[error("Invalid command ID")]
|
||||
InvalidCommandID,
|
||||
#[error("Invalid command")]
|
||||
InvalidCommand,
|
||||
}
|
||||
|
||||
pub struct Client {
|
||||
config: RollBotConfig,
|
||||
roll_bot: RollBot,
|
||||
|
||||
id: Option<u32>,
|
||||
nats_client: async_nats::Client,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn new(config: RollBotConfig, roll_bot: RollBot) -> Self {
|
||||
pub fn new(roll_bot: RollBot, nats_client: async_nats::Client) -> Self {
|
||||
Self {
|
||||
config,
|
||||
roll_bot,
|
||||
id: None,
|
||||
nats_client,
|
||||
}
|
||||
}
|
||||
|
||||
fn try_connect(&self) -> Result<TcpStream, Error> {
|
||||
for attempt in 1..10 {
|
||||
info!("Attempting to connect to {}", self.config.server_addr);
|
||||
let socket = TcpStream::connect(self.config.server_addr.clone());
|
||||
|
||||
if let Ok(socket) = socket {
|
||||
return Ok(socket);
|
||||
}
|
||||
|
||||
let wait = attempt * 10;
|
||||
warn!("Connection failed, waiting {}s...", wait);
|
||||
std::thread::sleep(std::time::Duration::from_secs(wait))
|
||||
}
|
||||
|
||||
Err(std::io::Error::last_os_error().into())
|
||||
}
|
||||
|
||||
fn register(&mut self, socket: &mut TcpStream) -> Result<(), Error> {
|
||||
let register_msg = Register {
|
||||
name: "Roll Bot".to_string(),
|
||||
bot_type: BotTypes::Roll.into(),
|
||||
};
|
||||
|
||||
let mut message = Vec::new();
|
||||
register_msg.encode(&mut message)?;
|
||||
|
||||
send_raas_msg(socket, message)?;
|
||||
|
||||
let resp = recv_raas_msg(socket)?;
|
||||
|
||||
let register_msg = RegisterResponse::decode(&*resp.msg)?;
|
||||
fn parse_command(&mut self, msg: Vec<u8>) -> Result<RollCmd, Error> {
|
||||
let command = Command::decode(&*msg)?;
|
||||
|
||||
info!(
|
||||
"Registered '{}' as id {}",
|
||||
register_msg.name, register_msg.id
|
||||
);
|
||||
|
||||
self.id = Some(register_msg.id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_next_command(&mut self, socket: &mut TcpStream) -> Result<RollCmd, Error> {
|
||||
let msg = recv_raas_msg(socket)?;
|
||||
|
||||
let command = Command::decode(&*msg.msg)?;
|
||||
|
||||
info!(
|
||||
"Received command for id={} timestamp={}",
|
||||
command.id,
|
||||
"Received command timestamp={}",
|
||||
&command.request.as_ref().unwrap().timestamp
|
||||
);
|
||||
|
||||
if command.id != self.id.unwrap() {
|
||||
warn!("Command for a different ID was received, dropping");
|
||||
return Err(Error::InvalidCommandID);
|
||||
}
|
||||
|
||||
match command.request.unwrap().cmd.unwrap() {
|
||||
Cmd::RollCmd(cmd) => Ok(cmd),
|
||||
#[allow(unreachable_patterns)]
|
||||
@ -159,7 +101,7 @@ impl Client {
|
||||
Ok(resp)
|
||||
}
|
||||
|
||||
fn handle_command(&mut self, socket: &mut TcpStream, roll_cmd: RollCmd) -> Result<(), Error> {
|
||||
fn handle_command(&mut self, roll_cmd: RollCmd) -> Result<Vec<u8>, Error> {
|
||||
let roll_resp = match roll_cmd.cmd.unwrap() {
|
||||
roll_cmd::Cmd::Ping(ping) => self.handle_ping(ping),
|
||||
roll_cmd::Cmd::Roll(roll) => self.handle_roll(roll),
|
||||
@ -171,7 +113,7 @@ impl Client {
|
||||
.as_secs();
|
||||
|
||||
let resp = Response {
|
||||
id: self.id.unwrap(),
|
||||
id: 0,
|
||||
timestamp,
|
||||
resp: Some(Resp::RollResp(roll_resp)),
|
||||
};
|
||||
@ -180,33 +122,37 @@ impl Client {
|
||||
|
||||
resp.encode(&mut message)?;
|
||||
|
||||
send_raas_msg(socket, message)?;
|
||||
|
||||
Ok(())
|
||||
Ok(message)
|
||||
}
|
||||
|
||||
pub fn run_client(&mut self) -> Result<(), Error> {
|
||||
pub async fn run_client(&mut self) -> Result<(), Error> {
|
||||
info!("Running client...");
|
||||
let mut command_subscriber = self.nats_client.subscribe("raas.cmd.random.roll").await.unwrap();
|
||||
|
||||
loop {
|
||||
let mut socket = self.try_connect()?;
|
||||
|
||||
self.register(&mut socket)?;
|
||||
|
||||
loop {
|
||||
let roll_cmd_res = self.get_next_command(&mut socket);
|
||||
let next_command = command_subscriber.next().await;
|
||||
|
||||
if let Ok(roll_cmd) = roll_cmd_res {
|
||||
self.handle_command(&mut socket, roll_cmd)?;
|
||||
} else if let Err(roll_cmd_err) = roll_cmd_res {
|
||||
match roll_cmd_err {
|
||||
Error::IO(io_error) => {
|
||||
warn!("Connection issue, reattempting connection: {}", io_error);
|
||||
self.id = None;
|
||||
break;
|
||||
if let Some(next_command) = next_command {
|
||||
let roll_cmd_res = self.parse_command(next_command.payload.to_vec());
|
||||
|
||||
if let Ok(roll_cmd) = roll_cmd_res {
|
||||
let resp = self.handle_command(roll_cmd)?;
|
||||
|
||||
let reply = next_command.reply.unwrap();
|
||||
|
||||
self.nats_client.publish(reply, resp.into()).await.unwrap();
|
||||
} else if let Err(roll_cmd_err) = roll_cmd_res {
|
||||
match roll_cmd_err {
|
||||
Error::IO(io_error) => {
|
||||
warn!("Connection issue, reattempting connection: {}", io_error);
|
||||
break;
|
||||
}
|
||||
Error::InvalidCommand => {
|
||||
continue;
|
||||
}
|
||||
_ => return Err(roll_cmd_err),
|
||||
}
|
||||
Error::InvalidCommandID | Error::InvalidCommand => {
|
||||
continue;
|
||||
}
|
||||
_ => return Err(roll_cmd_err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -43,7 +43,8 @@ pub struct PWMConfig {
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct RollBotConfig {
|
||||
pub server_addr: String,
|
||||
pub nats_server: String,
|
||||
pub auth_token: Option<String>,
|
||||
|
||||
pub pwm_config: PWMConfig,
|
||||
}
|
||||
|
||||
20
src/main.rs
20
src/main.rs
@ -13,7 +13,8 @@ use crate::config::Args;
|
||||
use crate::roll_bot::RollBot;
|
||||
use rppal::pwm::{Polarity, Pwm};
|
||||
|
||||
fn main() -> Result<(), Box<dyn Error>> {
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn Error>> {
|
||||
env_logger::init();
|
||||
|
||||
let args = Args::from_args();
|
||||
@ -45,8 +46,21 @@ fn main() -> Result<(), Box<dyn Error>> {
|
||||
Err(err) => error!("Error rolling die: {}", err),
|
||||
}
|
||||
} else {
|
||||
let mut client = Client::new(config, roll_bot);
|
||||
let res = client.run_client();
|
||||
let nats_ops = if let Some(token) = config.auth_token.clone() {
|
||||
async_nats::ConnectOptions::with_token(token)
|
||||
}
|
||||
else {
|
||||
async_nats::ConnectOptions::new()
|
||||
};
|
||||
|
||||
info!("Connecting to NATS server @ {}", config.nats_server);
|
||||
let nats_client = nats_ops
|
||||
.name("Roll Bot")
|
||||
.connect(&config.nats_server).await
|
||||
.unwrap();
|
||||
|
||||
let mut client = Client::new(roll_bot, nats_client);
|
||||
let res = client.run_client().await;
|
||||
|
||||
if let Err(err) = res {
|
||||
error!("Error in client: {}", err);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user