Switch to NATS for the backbone

This commit is contained in:
Joey Hines 2025-10-02 21:02:24 -06:00
parent 5b1251d089
commit 0586a5f00b
Signed by: joeyahines
GPG Key ID: 38BA6F25C94C9382
10 changed files with 1259 additions and 1128 deletions

View File

@ -1,2 +1,2 @@
[registries.jojo-dev]
index = "https://git.jojodev.com/joeyahines/_cargo-index.git"
[registries.ahines]
index = "https://git.ahines.net/joeyahines/_cargo-index.git"

1722
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,23 +1,19 @@
[package]
name = "raas"
version = "0.1.0"
edition = "2021"
version = "1.0.0"
edition = "2024"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
raas_types = { version = "0.0.9", features = ["async"], registry = "jojo-dev"}
prost = "0.12.6"
tokio = {version = "1.38.0", features = ["io-util", "net", "rt-multi-thread"]}
axum = "0.7.5"
raas_types = { version = "1.0.0", features = ["async"], registry = "ahines"}
prost = "0.14.1"
tokio = {version = "1.47.1", features = ["io-util", "net", "rt-multi-thread"]}
env_logger = "0.11.3"
log = "0.4.21"
thiserror = "1.0.61"
tonic = "0.11.0"
config = "0.14.0"
tonic = "0.14.2"
config = "0.15.18"
serde = "1.0.203"
structopt = "0.3.26"
chrono = "0.4.38"
[build-dependencies]
tonic-build = "0.11.0"
async-nats = "0.44.1"
thiserror = "2.0.17"

View File

@ -11,7 +11,8 @@ pub struct Args {
#[derive(Debug, Clone, Deserialize)]
pub struct RaasConfig {
pub grpc_server: String,
pub robot_server: String,
pub nats_server: String,
pub auth_token: Option<String>,
}
impl RaasConfig {

View File

@ -1,19 +1,11 @@
use crate::config::{Args, RaasConfig};
use crate::robot::connection_handler::ConnectionHandler;
use crate::robot::robot_manager::RobotManager;
use crate::robot::{ConnectorManager, ROBOT_MESSAGE_QUEUE_SIZE};
use log::{error, info};
use raas_types::raas::service::raas_server::RaasServer;
use server::grpc::RaaSServer;
use std::sync::Arc;
use structopt::StructOpt;
use tokio::io::duplex;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tonic::transport::Server;
mod config;
mod robot;
mod server;
#[tokio::main]
@ -33,29 +25,35 @@ async fn main() {
}
};
let connector_manager = ConnectorManager {
next_id: 0,
connectors: Default::default(),
let nats_ops = if let Some(token) = config.auth_token.clone() {
async_nats::ConnectOptions::with_token(token)
} else {
async_nats::ConnectOptions::new()
};
let connector_manager = Arc::new(Mutex::new(connector_manager));
info!("Connecting to NATS server @ {}", config.nats_server);
let nats_client_res = nats_ops
.name("RaaS Server")
.connect(&config.nats_server)
.await;
let tcp_listener = TcpListener::bind(&config.robot_server).await.unwrap();
let nats_client = match nats_client_res {
Ok(nats_client) => nats_client,
Err(err) => {
error!("Failed to connect to NATS server: {err:?}");
return;
}
};
let (main_stream, manager_stream) = duplex(ROBOT_MESSAGE_QUEUE_SIZE);
info!("Serving GRPC at {}", &config.grpc_server);
let raas_server = RaaSServer::new(nats_client);
let robot_manager = RobotManager::new(manager_stream);
let connector_handler = ConnectionHandler::new(&config.robot_server, tcp_listener);
let robot_manager_connector_manager = connector_manager.clone();
tokio::task::spawn(robot_manager.worker(robot_manager_connector_manager));
tokio::task::spawn(connector_handler.worker(connector_manager));
info!("Serving grpc at {}", &config.grpc_server);
let raas_server = RaaSServer::new(main_stream);
Server::builder()
let res = Server::builder()
.add_service(RaasServer::new(raas_server))
.serve(config.grpc_server.parse().unwrap())
.await
.unwrap()
.await;
if let Err(err) = res {
error!("Error while serving GRPC: {err:?}")
}
}

View File

@ -1,94 +0,0 @@
use crate::robot::robot_connector::RobotConnector;
use crate::robot::ROBOT_MESSAGE_QUEUE_SIZE;
use crate::robot::{ConnectorManager, Error};
use log::{error, info};
use prost::Message;
use raas_types::raas::register::{Register, RegisterResponse};
use raas_types::raas::{recv_raas_msg, send_raas_msg};
use std::sync::Arc;
use tokio::io::duplex;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
pub struct ConnectionHandler {
tcp_listener: TcpListener,
addr: String,
}
impl ConnectionHandler {
pub fn new(addr: &str, tcp_listener: TcpListener) -> Self {
Self {
tcp_listener,
addr: addr.to_string(),
}
}
async fn handle_register(
&mut self,
connector_manager: Arc<Mutex<ConnectorManager>>,
) -> Result<(), Error> {
info!("Listening for connection at {}", self.addr);
let (mut socket, addr) = self.tcp_listener.accept().await?;
info!("Got connection from {}", addr);
let register_msg = recv_raas_msg(&mut socket).await?;
let register = Register::decode(&*register_msg.msg)?;
let mut connector_manager = connector_manager.lock().await;
let id = connector_manager.next_id;
connector_manager.next_id += 1;
let register_resp = RegisterResponse {
name: register.name.to_string(),
r#type: register.bot_type,
id,
};
let mut msg = Vec::new();
register_resp.encode(&mut msg)?;
send_raas_msg(&mut socket, msg).await?;
let (connector_duplex_manager, connector_duplex_connector) =
duplex(ROBOT_MESSAGE_QUEUE_SIZE);
let mut connector = RobotConnector::new(
id,
vec![register.bot_type()],
socket,
connector_duplex_connector,
);
let connector_handler = crate::robot::ConnectorHandle {
stream: connector_duplex_manager,
tags: connector.bot_tags.clone(),
};
connector_manager.connectors.insert(id, connector_handler);
tokio::spawn(async move { connector.worker().await });
Ok(())
}
pub async fn worker(
mut self,
connector_manager: Arc<Mutex<ConnectorManager>>,
) -> Result<(), Error> {
loop {
let res = self.handle_register(connector_manager.clone()).await;
if let Err(err) = res {
error!("Got error handling new register: {}", err);
if let Error::Io(_io_err) = &err {
error!("IO Error, exiting...");
return Err(err);
}
}
}
}
}

View File

@ -1,39 +0,0 @@
use raas_types::raas;
use std::collections::HashMap;
use thiserror::Error;
use tokio::io::DuplexStream;
pub mod connection_handler;
pub mod robot_connector;
pub mod robot_manager;
pub const ROBOT_MESSAGE_QUEUE_SIZE: usize = 10;
const ROBOT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
#[derive(Debug)]
pub struct ConnectorHandle {
pub stream: DuplexStream,
pub tags: Vec<raas::register::BotTypes>,
}
#[derive(Debug)]
pub struct ConnectorManager {
pub next_id: u32,
pub connectors: HashMap<u32, ConnectorHandle>,
}
#[derive(Error, Debug)]
pub enum Error {
#[error("IO Error")]
Io(#[from] std::io::Error),
#[error("Protobuf Encode Error")]
ProtobufEncode(#[from] prost::EncodeError),
#[error("Protobuf Decode Error")]
ProtobufDecode(#[from] prost::DecodeError),
#[error("Connection to robot has been closed")]
ConnectionClosed(u32),
#[error("No robots to handle requests")]
NoRobotsToHandleRequest,
#[error("Timed out waiting for response")]
Timeout(#[from] tokio::time::error::Elapsed),
}

View File

@ -1,179 +0,0 @@
use crate::robot::{Error, ROBOT_TIMEOUT};
use log::{debug, error, info};
use prost::Message;
use raas_types::raas::cmd::{Command, Request};
use raas_types::raas::register::BotTypes;
use raas_types::raas::resp::Response;
use raas_types::raas::{recv_raas_msg, send_raas_msg};
use std::time::Duration;
use tokio::io::DuplexStream;
use tokio::net::TcpStream;
#[derive(Debug)]
pub struct RobotConnector {
pub bot_id: u32,
pub bot_tags: Vec<BotTypes>,
tcp_stream: TcpStream,
msg_stream: DuplexStream,
is_running: bool,
}
impl RobotConnector {
pub fn new(
bot_id: u32,
bot_tags: Vec<BotTypes>,
tcp_stream: TcpStream,
msg_stream: DuplexStream,
) -> Self {
Self {
bot_id,
bot_tags,
tcp_stream,
msg_stream,
is_running: false,
}
}
async fn wait_for_request(&mut self) -> Result<Request, Error> {
let request = recv_raas_msg(&mut self.msg_stream).await?;
let request = Request::decode(request.msg.as_slice())?;
Ok(request)
}
async fn send_command(&mut self, request: Request) -> Result<(), Error> {
let cmd = Command {
id: self.bot_id,
request: Some(request),
};
let mut msg = Vec::new();
cmd.encode(&mut msg).unwrap();
send_raas_msg(&mut self.tcp_stream, msg).await?;
Ok(())
}
async fn get_response(&mut self) -> Result<Response, Error> {
let recv_timeout = tokio::time::timeout(ROBOT_TIMEOUT, recv_raas_msg(&mut self.tcp_stream));
let recv = recv_timeout.await??;
let resp = Response::decode(recv.msg.as_slice()).unwrap();
debug!("Worker (bot_id={}) got resp", self.bot_id);
Ok(resp)
}
async fn respond_to_request(&mut self, resp: Response) -> Result<(), Error> {
let mut msg = Vec::new();
resp.encode(&mut msg)?;
send_raas_msg(&mut self.msg_stream, msg).await?;
Ok(())
}
pub async fn worker(&mut self) {
self.is_running = true;
loop {
let timeout_res =
tokio::time::timeout(Duration::from_secs(120), self.handle_request()).await;
if let Ok(res) = timeout_res {
if let Err(res) = res {
error!(
"Worker (bot_id = {}) Closing robot connection: {}",
self.bot_id, res
);
break;
}
} else if let Err(err) = self.handle_ping().await {
error!(
"Worker (bot_id = {}) Got error response from ping: {}",
self.bot_id, err
)
}
}
error!("Exiting Worker (bot_id={})", self.bot_id);
self.is_running = false;
}
async fn handle_ping(&mut self) -> Result<(), Error> {
self.send_command(Request {
timestamp: chrono::Utc::now().timestamp() as u64,
cmd: Some(raas_types::raas::cmd::request::Cmd::RollCmd(
raas_types::raas::bot::roll::RollCmd {
cmd: Some(raas_types::raas::bot::roll::roll_cmd::Cmd::Ping(
raas_types::raas::ping::Ping {
data: format!("id: {}", self.bot_id),
},
)),
},
)),
})
.await?;
let resp = tokio::time::timeout(Duration::from_secs(5), self.get_response()).await??;
debug!(
"Worker (bot_id = {}) Got ping resp: {:?}",
self.bot_id, resp
);
Ok(())
}
async fn handle_request(&mut self) -> Result<(), Error> {
info!("Worker (bot_id={}) is waiting for requests", self.bot_id);
let request = match self.wait_for_request().await {
Ok(r) => r,
Err(err) => {
error!(
"Worker (bot_id={}) failed to get request: {:?}",
self.bot_id, err
);
return Err(err);
}
};
match self.send_command(request).await {
Ok(_) => {}
Err(err) => {
error!(
"Worker (bot_id={}) failed to send command to bot: {:?}",
self.bot_id, err
);
return Err(err);
}
}
let resp = match self.get_response().await {
Ok(r) => r,
Err(err) => {
error!(
"Worker (bot_id={}) failed to get response from bot: {:?}",
self.bot_id, err
);
return Err(err);
}
};
match self.respond_to_request(resp).await {
Ok(_) => Ok(()),
Err(err) => {
error!(
"Worker (bot_id={}) failed to send response: {:?}",
self.bot_id, err
);
Err(err)
}
}
}
}

View File

@ -1,173 +0,0 @@
use crate::robot::{ConnectorManager, Error, ROBOT_TIMEOUT};
use log::{error, info};
use prost::Message;
use raas_types::raas;
use raas_types::raas::cmd::request::Cmd;
use raas_types::raas::cmd::Request;
use raas_types::raas::{recv_raas_msg, send_raas_msg, RaasMessage};
use std::sync::Arc;
use tokio::io::{AsyncWriteExt, DuplexStream};
use tokio::sync::Mutex;
#[derive(Debug)]
pub struct RobotManager {
command_stream: DuplexStream,
}
impl RobotManager {
pub fn new(stream: DuplexStream) -> Self {
Self {
command_stream: stream,
}
}
async fn recv_command(&mut self) -> Result<RaasMessage, Error> {
let resp = recv_raas_msg(&mut self.command_stream).await?;
Ok(resp)
}
async fn send_resp(&mut self, msg: RaasMessage) -> Result<(), Error> {
self.command_stream.write_all(&msg.into_bytes()).await?;
Ok(())
}
async fn send_error(
&mut self,
id: u32,
error: raas::error::ErrorType,
err_msg: &str,
) -> Result<(), Error> {
let resp = raas::resp::Response {
id,
timestamp: 0,
resp: Some(raas::resp::response::Resp::Error(raas::error::Error {
err: error.into(),
msg: err_msg.to_string(),
})),
};
let mut msg = Vec::new();
resp.encode(&mut msg)?;
self.send_resp(RaasMessage::new(msg)).await?;
Ok(())
}
async fn handle_command(
&mut self,
connector_manager: Arc<Mutex<ConnectorManager>>,
) -> Result<(), Error> {
info!("Waiting for command...");
let msg = self.recv_command().await?;
let request = Request::decode(&*msg.msg)?;
let dest_type = match request.cmd.clone().unwrap() {
Cmd::RollCmd(_) => raas::register::BotTypes::Roll,
};
let mut connector_manager = connector_manager.lock().await;
let connector = connector_manager
.connectors
.iter_mut()
.find(|(_, handler)| handler.tags.contains(&dest_type));
if let Some((id, connector)) = connector {
info!("Sending message to connector with id={}", id);
let mut msg = Vec::new();
request.encode(&mut msg)?;
let ret = send_raas_msg(&mut connector.stream, msg).await;
if let Err(err) = ret {
error!(
"Got '{}' sending message to Robot Connector id={}, closing connection",
err, id
);
return Err(Error::ConnectionClosed(*id));
}
let timeout = tokio::time::timeout(ROBOT_TIMEOUT, recv_raas_msg(&mut connector.stream));
let ret = timeout.await?;
let resp = match ret {
Ok(r) => r,
Err(err) => {
error!(
"Got '{}' receiving message to Robot Connector id={}, closing connection",
err, id
);
return Err(Error::ConnectionClosed(*id));
}
};
info!("Got response from id={}", id);
self.send_resp(resp).await?;
} else {
error!("No connectors available to handle request");
return Err(Error::NoRobotsToHandleRequest);
}
Ok(())
}
pub async fn worker(
mut self,
connector_manager: Arc<Mutex<ConnectorManager>>,
) -> Result<(), Error> {
loop {
let res = self.handle_command(connector_manager.clone()).await;
if let Err(err) = res {
error!("Got error in robot manager: {}", err);
let (id, error, error_msg) = match err {
Error::ConnectionClosed(id) => {
let mut connection_manager = connector_manager.lock().await;
connection_manager.connectors.remove(&id);
(
id,
raas::error::ErrorType::RobotOffline,
"Robot offline.".to_string(),
)
}
Error::Io(err) => (
0,
raas::error::ErrorType::RobotError,
format!("IO Error: {}", err),
),
Error::ProtobufEncode(err) => (
0,
raas::error::ErrorType::RobotError,
format!("Encode Error: {}", err),
),
Error::ProtobufDecode(err) => (
0,
raas::error::ErrorType::RobotError,
format!("Decode Error: {}", err),
),
Error::NoRobotsToHandleRequest => (
0,
raas::error::ErrorType::NoRobotsToHandleRequest,
"No robots to handle request".to_string(),
),
Error::Timeout(_) => (
0,
raas::error::ErrorType::RobotError,
"Timed out waiting for response from Robot".to_string(),
),
};
self.send_error(id, error, error_msg.as_str()).await?;
}
}
}
}

View File

@ -1,19 +1,69 @@
use log::info;
use async_nats::Client;
use log::{error, info};
use prost::Message;
use raas_types::raas::cmd::Command;
use raas_types::raas::cmd::request::Cmd;
use raas_types::raas::service::raas_server::Raas;
use raas_types::raas::{recv_raas_msg, send_raas_msg};
use std::ops::DerefMut;
use tokio::io::DuplexStream;
use tonic::{Request, Response, Status};
use thiserror::Error;
use tonic::codegen::tokio_stream::StreamExt;
use tonic::{Code, Request, Response, Status};
#[derive(Error, Debug)]
pub enum RequestError {
#[error("Failed to subscribe to reply: {0}")]
FailedToSubscribeReply(#[from] async_nats::SubscribeError),
#[error("Failed to unsubscribe to reply: {0}")]
FailedToUnscribe(#[from] async_nats::UnsubscribeError),
#[error("Failed to publish msg: {0}")]
FailedToPublish(#[from] async_nats::PublishError),
#[error("Failed to decode response: {0}")]
DecodeError(#[from] prost::DecodeError),
#[error("No response received from Robot")]
NoResponseReceived,
}
pub struct RaaSServer {
command_stream: tokio::sync::Mutex<DuplexStream>,
nats_client: Client,
}
impl RaaSServer {
pub fn new(command_stream: DuplexStream) -> Self {
Self {
command_stream: tokio::sync::Mutex::new(command_stream),
pub fn new(nats_client: Client) -> Self {
Self { nats_client }
}
}
impl RaaSServer {
fn get_command_topic(cmd: &Option<Cmd>) -> Option<String> {
if let Some(cmd) = cmd {
match cmd {
Cmd::RollCmd(_) => Some("random.roll".to_string()),
}
} else {
None
}
}
async fn send_cmd(
&self,
subject: &str,
command: Vec<u8>,
) -> Result<raas_types::raas::resp::Response, RequestError> {
let publish_subject = format!("raas.cmd.{}", subject);
let reply = self.nats_client.new_inbox();
let mut reply_sub = self.nats_client.subscribe(reply.clone()).await?;
self.nats_client
.publish_with_reply(publish_subject, reply, command.into())
.await?;
let resp = reply_sub.next().await;
reply_sub.unsubscribe().await?;
if let Some(resp) = resp {
Ok(raas_types::raas::resp::Response::decode(&*resp.payload)?)
} else {
Err(RequestError::NoResponseReceived)
}
}
}
@ -24,24 +74,37 @@ impl Raas for RaaSServer {
&self,
request: Request<raas_types::raas::cmd::Request>,
) -> Result<Response<raas_types::raas::resp::Response>, Status> {
let mut command_stream = self.command_stream.lock().await;
let remote_addr = request.remote_addr().unwrap();
let request = request.into_inner();
info!("Got request from {}: {:?}", remote_addr, request);
let mut command_msg = Vec::new();
let mut msg = Vec::new();
let subject = Self::get_command_topic(&request.cmd);
request.encode(&mut msg).unwrap();
let subject = if let Some(subject) = subject {
subject
} else {
error!("Nothing implemented for '{:?}'", request.cmd);
return Err(Status::new(
Code::Unimplemented,
"Command not implemented".to_string(),
));
};
send_raas_msg(command_stream.deref_mut(), msg)
.await
.unwrap();
let command = Command {
request: Some(request),
};
let resp = recv_raas_msg(command_stream.deref_mut()).await.unwrap();
command.encode(&mut command_msg).unwrap();
let resp = raas_types::raas::resp::Response::decode(&*resp.msg).unwrap();
let resp = self.send_cmd(&subject, command_msg).await;
Ok(Response::new(resp))
match resp {
Ok(resp) => Ok(Response::new(resp)),
Err(err) => Err(Status::new(
Code::Internal,
format!("Failed to handle command: {err:?}",),
)),
}
}
}