From 0136772c2320e307c4574139ab25c09a983d6913 Mon Sep 17 00:00:00 2001 From: Joey Hines Date: Sun, 16 Jun 2024 16:37:28 -0600 Subject: [PATCH] Refactored, handle failures more gracefully + Split ConnectionHandler into its own struct + Handle errors more gracefully + System no longer locks up or crashes if a bot goes offline + Bots who go offline are automatically removed from the connector pool --- src/main.rs | 45 +++++++-- src/robot/connection_handler.rs | 94 +++++++++++++++++++ src/robot/mod.rs | 30 ++++++ src/robot/robot_connector.rs | 16 +--- src/robot/robot_manager.rs | 160 ++++++++------------------------ 5 files changed, 205 insertions(+), 140 deletions(-) create mode 100644 src/robot/connection_handler.rs diff --git a/src/main.rs b/src/main.rs index 264a70d..6d58699 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,17 @@ +use crate::robot::connection_handler::ConnectionHandler; use crate::robot::robot_manager::RobotManager; -use log::info; +use crate::robot::ConnectorManager; +use log::{error, info}; use prost::Message; use raas_types::raas; use raas_types::raas::cmd::Request; use raas_types::raas::ping::Ping; use raas_types::raas::{recv_raas_msg, send_raas_msg}; +use std::sync::Arc; use std::time::Duration; use tokio::io::duplex; +use tokio::net::TcpListener; +use tokio::sync::Mutex; mod robot; @@ -15,10 +20,25 @@ async fn main() { env_logger::init(); info!("Starting..."); - let (mut main_stream, manager_stream) = duplex(10); - let mut manager = RobotManager::new("0.0.0.0:8080", manager_stream); + let addr = "0.0.0.0:8080"; - manager.start_manager().await.unwrap(); + let connector_manager = ConnectorManager { + next_id: 0, + connectors: Default::default(), + }; + + let connector_manager = Arc::new(Mutex::new(connector_manager)); + + let tcp_listener = TcpListener::bind(addr).await.unwrap(); + + let (mut main_stream, manager_stream) = duplex(10); + + let robot_manager = RobotManager::new(manager_stream); + let connector_handler = ConnectionHandler::new(addr, tcp_listener); + + let robot_manager_connector_manager = connector_manager.clone(); + tokio::task::spawn(robot_manager.worker(robot_manager_connector_manager)); + tokio::task::spawn(connector_handler.worker(connector_manager)); loop { info!("Sleeping"); @@ -43,8 +63,21 @@ async fn main() { send_raas_msg(&mut main_stream, msg).await.unwrap(); info!("Wait for resp"); - let resp = recv_raas_msg(&mut main_stream).await.unwrap(); + let ret = + tokio::time::timeout(Duration::from_secs(5), recv_raas_msg(&mut main_stream)).await; - info!("Got resp {:?}", resp); + match ret { + Ok(resp) => match resp { + Ok(resp) => { + info!("Got response {:?}", resp); + } + Err(err) => { + error!("Error getting response: {}", err) + } + }, + Err(_) => { + error!("Timed out waiting for response!") + } + } } } diff --git a/src/robot/connection_handler.rs b/src/robot/connection_handler.rs new file mode 100644 index 0000000..9ec6e84 --- /dev/null +++ b/src/robot/connection_handler.rs @@ -0,0 +1,94 @@ +use crate::robot::robot_connector::RobotConnector; +use crate::robot::ROBOT_MESSAGE_QUEUE_SIZE; +use crate::robot::{ConnectorManager, Error}; +use log::{error, info}; +use prost::Message; +use raas_types::raas::register::{Register, RegisterResponse}; +use raas_types::raas::{recv_raas_msg, send_raas_msg}; +use std::sync::Arc; +use tokio::io::duplex; +use tokio::net::TcpListener; +use tokio::sync::Mutex; + +pub struct ConnectionHandler { + tcp_listener: TcpListener, + addr: String, +} + +impl ConnectionHandler { + pub fn new(addr: &str, tcp_listener: TcpListener) -> Self { + Self { + tcp_listener, + addr: addr.to_string(), + } + } + + async fn handle_register( + &mut self, + connector_manager: Arc>, + ) -> Result<(), Error> { + info!("Listening for connection at {}", self.addr); + let (mut socket, addr) = self.tcp_listener.accept().await?; + info!("Got connection from {}", addr); + + let register_msg = recv_raas_msg(&mut socket).await?; + + let register = Register::decode(&*register_msg.msg)?; + + let mut connector_manager = connector_manager.lock().await; + + let id = connector_manager.next_id; + connector_manager.next_id += 1; + + let register_resp = RegisterResponse { + name: register.name.to_string(), + r#type: register.bot_type, + id, + }; + + let mut msg = Vec::new(); + + register_resp.encode(&mut msg)?; + + send_raas_msg(&mut socket, msg).await?; + + let (connector_duplex_manager, connector_duplex_connector) = + duplex(ROBOT_MESSAGE_QUEUE_SIZE); + + let mut connector = RobotConnector::new( + id, + vec![register.bot_type()], + socket, + connector_duplex_connector, + ); + + let connector_handler = crate::robot::ConnectorHandle { + stream: connector_duplex_manager, + tags: connector.bot_tags.clone(), + }; + + connector_manager.connectors.insert(id, connector_handler); + + tokio::spawn(async move { connector.worker().await }); + + Ok(()) + } + + pub async fn worker( + mut self, + connector_manager: Arc>, + ) -> Result<(), Error> { + loop { + let res = self.handle_register(connector_manager.clone()).await; + + if let Err(err) = res { + error!("Got error handling new register: {}", err); + + if let Error::Io(_io_err) = &err { + error!("IO Error, exiting..."); + return Err(err); + } + } + } + } +} diff --git a/src/robot/mod.rs b/src/robot/mod.rs index 70ae77f..d0c90ce 100644 --- a/src/robot/mod.rs +++ b/src/robot/mod.rs @@ -1,4 +1,34 @@ +use raas_types::raas; +use std::collections::HashMap; +use thiserror::Error; +use tokio::io::DuplexStream; + +pub mod connection_handler; pub mod robot_connector; pub mod robot_manager; pub const ROBOT_MESSAGE_QUEUE_SIZE: usize = 10; + +#[derive(Debug)] +pub struct ConnectorHandle { + pub stream: DuplexStream, + pub tags: Vec, +} + +#[derive(Debug)] +pub struct ConnectorManager { + pub next_id: u32, + pub connectors: HashMap, +} + +#[derive(Error, Debug)] +pub enum Error { + #[error("IO Error")] + Io(#[from] std::io::Error), + #[error("Protobuf Encode Error")] + ProtobufEncode(#[from] prost::EncodeError), + #[error("Protobuf Decode Error")] + ProtobufDecode(#[from] prost::DecodeError), + #[error("Connection to bot has been closed")] + ConnectionClosed(u32), +} diff --git a/src/robot/robot_connector.rs b/src/robot/robot_connector.rs index 9970013..7f0dd7d 100644 --- a/src/robot/robot_connector.rs +++ b/src/robot/robot_connector.rs @@ -1,23 +1,13 @@ +use crate::robot::Error; use log::{error, info}; use prost::Message; use raas_types::raas::cmd::{Command, Request}; use raas_types::raas::register::BotTypes; use raas_types::raas::resp::Response; use raas_types::raas::{recv_raas_msg, send_raas_msg}; -use thiserror::Error; use tokio::io::DuplexStream; use tokio::net::TcpStream; -#[derive(Error, Debug)] -pub enum Error { - #[error("IO Error")] - Io(#[from] std::io::Error), - #[error("Protobuf Encode Error")] - ProtobufEncode(#[from] prost::EncodeError), - #[error("Protobuf Decode Error")] - ProtobufDecode(#[from] prost::DecodeError), -} - #[derive(Debug)] pub struct RobotConnector { pub bot_id: u32, @@ -85,10 +75,6 @@ impl RobotConnector { Ok(()) } - pub fn is_running(&self) -> bool { - self.is_running - } - pub async fn worker(&mut self) { self.is_running = true; diff --git a/src/robot/robot_manager.rs b/src/robot/robot_manager.rs index 162eaad..24371ab 100644 --- a/src/robot/robot_manager.rs +++ b/src/robot/robot_manager.rs @@ -1,129 +1,42 @@ -use crate::robot::robot_connector::{Error, RobotConnector}; -use crate::robot::ROBOT_MESSAGE_QUEUE_SIZE; +use crate::robot::{ConnectorManager, Error}; use log::{error, info}; use prost::Message; use raas_types::raas; -use raas_types::raas::bot::roll::Roll; use raas_types::raas::cmd::request::Cmd; -use raas_types::raas::cmd::{Command, Request}; -use raas_types::raas::register::{Register, RegisterResponse}; -use raas_types::raas::resp::response::Resp; -use raas_types::raas::resp::Response; +use raas_types::raas::cmd::Request; use raas_types::raas::{recv_raas_msg, send_raas_msg, RaasMessage}; -use std::collections::HashMap; -use std::ops::{Deref, DerefMut}; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt, DuplexStream}; -use tokio::net::{TcpListener, TcpStream}; -use tokio::select; -use tokio::time::timeout; - -#[derive(Debug, Clone)] -pub enum RaaSCmd { - Roll(u32), - Img(Vec), -} +use std::sync::Arc; +use tokio::io::{AsyncWriteExt, DuplexStream}; +use tokio::sync::Mutex; #[derive(Debug)] -struct ConnectorHandle { - pub id: u32, - pub stream: DuplexStream, - pub tags: Vec, -} - -#[derive(Debug)] -struct ConnectorManager { - next_id: u32, - connectors: HashMap, -} - -#[derive(Debug, Clone)] pub struct RobotManager { - addr: String, - connector_manager: std::sync::Arc>, - command_stream: std::sync::Arc>, + command_stream: DuplexStream, } impl RobotManager { - pub fn new(addr: &str, stream: DuplexStream) -> Self { - let connector_manager = ConnectorManager { - next_id: 0, - connectors: Default::default(), - }; - + pub fn new(stream: DuplexStream) -> Self { Self { - addr: addr.to_string(), - connector_manager: std::sync::Arc::new(tokio::sync::Mutex::new(connector_manager)), - command_stream: std::sync::Arc::new(tokio::sync::Mutex::new(stream)), + command_stream: stream, } } - async fn handle_register(&self, tcp_listener: &mut TcpListener) -> Result<(), std::io::Error> { - info!("Listening for connection at {}", self.addr); - let (mut socket, addr) = tcp_listener.accept().await?; - info!("Got connection from {}", addr); - - let register_msg = recv_raas_msg(&mut socket).await?; - - let register = Register::decode(&*register_msg.msg).unwrap(); - - let mut connector_manager = self.connector_manager.lock().await; - - let id = connector_manager.next_id; - connector_manager.next_id += 1; - - let register_resp = RegisterResponse { - name: register.name.to_string(), - r#type: register.bot_type, - id, - }; - - let mut msg = Vec::new(); - - register_resp.encode(&mut msg).unwrap(); - - send_raas_msg(&mut socket, msg).await?; - - let (connector_duplex_manager, connector_duplex_connector) = - duplex(ROBOT_MESSAGE_QUEUE_SIZE); - - let mut connector = RobotConnector::new( - id, - vec![register.bot_type()], - socket, - connector_duplex_connector, - ); - - let connector_handler = ConnectorHandle { - id, - stream: connector_duplex_manager, - tags: connector.bot_tags.clone(), - }; - - connector_manager.connectors.insert(id, connector_handler); - - tokio::spawn(async move { connector.worker().await }); - - Ok(()) - } - - async fn recv_command(&self) -> Result { - let mut command_stream = self.command_stream.lock().await; - - let resp = recv_raas_msg(command_stream.deref_mut()).await?; + async fn recv_command(&mut self) -> Result { + let resp = recv_raas_msg(&mut self.command_stream).await?; Ok(resp) } - async fn send_resp(&self, msg: RaasMessage) -> Result<(), Error> { - let mut command_stream = self.command_stream.lock().await; - - command_stream.write_all(&*msg.into_bytes()).await?; + async fn send_resp(&mut self, msg: RaasMessage) -> Result<(), Error> { + self.command_stream.write_all(&msg.into_bytes()).await?; Ok(()) } - pub async fn handle_command(&self) -> Result<(), Error> { + async fn handle_command( + &mut self, + connector_manager: Arc>, + ) -> Result<(), Error> { info!("Waiting for command..."); let msg = self.recv_command().await?; @@ -134,7 +47,7 @@ impl RobotManager { Cmd::RollCmd(_) => raas::register::BotTypes::Roll, }; - let mut connector_manager = self.connector_manager.lock().await; + let mut connector_manager = connector_manager.lock().await; let connector = connector_manager .connectors .iter_mut() @@ -144,7 +57,15 @@ impl RobotManager { info!("Sending message to connector with id={}", id); let mut msg = Vec::new(); request.encode(&mut msg)?; - send_raas_msg(&mut connector.stream, msg).await?; + let ret = send_raas_msg(&mut connector.stream, msg).await; + + if let Err(err) = ret { + error!( + "Got '{}' sending message to Robot Connector id={}, closing connection", + err, id + ); + return Err(Error::ConnectionClosed(*id)); + } let resp = recv_raas_msg(&mut connector.stream).await?; info!("Got response from id={}", id); @@ -156,21 +77,22 @@ impl RobotManager { Ok(()) } - pub async fn start_manager(self) -> Result<(), std::io::Error> { - let handle_command_manager = self.clone(); - tokio::spawn(async move { - loop { - handle_command_manager.handle_command().await.unwrap() - } - }); + pub async fn worker( + mut self, + connector_manager: Arc>, + ) -> Result<(), Error> { + loop { + let res = self.handle_command(connector_manager.clone()).await; - tokio::spawn(async move { - let mut tcp_listener = TcpListener::bind(&self.addr).await.unwrap(); - loop { - self.handle_register(&mut tcp_listener).await.unwrap() - } - }); + if let Err(err) = res { + error!("Got error in robot manager: {}", err); - Ok(()) + if let Error::ConnectionClosed(id) = err { + let mut connection_manager = connector_manager.lock().await; + + connection_manager.connectors.remove(&id); + } + } + } } }