From 1225af8e42a83251f51013fddcb2f368c108612c Mon Sep 17 00:00:00 2001 From: Jonni Liljamo Date: Sun, 10 Nov 2024 14:30:04 +0200 Subject: [PATCH] feat: more experimentation --- Cargo.lock | 1 + emerwen-master/src/main.rs | 11 +- emerwen-master/src/server.rs | 48 ++++++ emerwen-protocol/Cargo.toml | 1 + emerwen-protocol/src/lib.rs | 156 ++++++++++++------ emerwen-protocol/src/{ => message}/header.rs | 2 +- emerwen-protocol/src/message/mod.rs | 69 ++++++++ emerwen-protocol/src/{ => message}/payload.rs | 2 +- emerwen-types/src/lib.rs | 2 +- emerwen-worker/src/client.rs | 64 +++++++ emerwen-worker/src/main.rs | 43 ++++- 11 files changed, 336 insertions(+), 63 deletions(-) create mode 100644 emerwen-master/src/server.rs rename emerwen-protocol/src/{ => message}/header.rs (97%) create mode 100644 emerwen-protocol/src/message/mod.rs rename emerwen-protocol/src/{ => message}/payload.rs (99%) create mode 100644 emerwen-worker/src/client.rs diff --git a/Cargo.lock b/Cargo.lock index 0060caa..588ed1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -240,6 +240,7 @@ dependencies = [ "emerwen-types", "thiserror 2.0.0", "tokio", + "tracing", ] [[package]] diff --git a/emerwen-master/src/main.rs b/emerwen-master/src/main.rs index 4451e81..3b2c419 100644 --- a/emerwen-master/src/main.rs +++ b/emerwen-master/src/main.rs @@ -1,9 +1,9 @@ use clap::Parser; -use emerwen_protocol::{Message, MessagePayload}; -use tokio::{io::AsyncWriteExt, net::TcpStream}; use tracing::{debug, info, level_filters::LevelFilter}; use tracing_subscriber::{prelude::*, EnvFilter}; +mod server; + #[derive(Parser)] #[command(version)] struct Args { @@ -38,6 +38,10 @@ async fn main() { debug!("Hello, debug!"); + let server = server::Server::new("127.0.0.1:8000"); + let _ = server.run().await; + + /* let mut worker_client = WorkerClient::new("127.0.0.1:8000"); let _ = worker_client.connect().await; let _ = worker_client @@ -46,8 +50,10 @@ async fn main() { }) .await .unwrap(); + */ } +/* struct WorkerClient { addr: String, stream: Option, @@ -79,3 +85,4 @@ impl WorkerClient { Ok(None) } } +*/ diff --git a/emerwen-master/src/server.rs b/emerwen-master/src/server.rs new file mode 100644 index 0000000..543b0f7 --- /dev/null +++ b/emerwen-master/src/server.rs @@ -0,0 +1,48 @@ +use emerwen_protocol::ProtocolActorHandle; +use tokio::net::TcpListener; +use tracing::info; + +pub struct Server { + addr: String, +} + +impl Server { + pub fn new(addr: impl Into) -> Server { + Server { addr: addr.into() } + } + + pub async fn run(&self) -> Result<(), Box> { + let listener = TcpListener::bind(&self.addr).await?; + info!("Server listening on {}", self.addr); + + loop { + let (socket, addr) = listener.accept().await?; + + info!("Client '{}' connected", addr); + + let handle_read = ProtocolActorHandle::new(socket); + let handle_write = handle_read.clone(); + + tokio::task::spawn(async move { + let handle = handle_read; + loop { + let message = handle.read_message().await; + + info!("Received message: {:?}", message); + + // match payload bla bla + // write something back if appliccable + } + }); + + tokio::task::spawn(async move { + let handle = handle_write; + loop { + // read some channel that comes passed from main to this server struct + // and send messages yeah. + //let message = handle.send_message(Message).await; + } + }); + } + } +} diff --git a/emerwen-protocol/Cargo.toml b/emerwen-protocol/Cargo.toml index 37f292c..1d29155 100644 --- a/emerwen-protocol/Cargo.toml +++ b/emerwen-protocol/Cargo.toml @@ -14,3 +14,4 @@ emerwen-types = { path = "../emerwen-types" } byteorder = "1" thiserror = "2" tokio = { version = "1", features = ["full"] } +tracing = "0.1" diff --git a/emerwen-protocol/src/lib.rs b/emerwen-protocol/src/lib.rs index cc389a5..a1fbc03 100644 --- a/emerwen-protocol/src/lib.rs +++ b/emerwen-protocol/src/lib.rs @@ -1,69 +1,123 @@ -mod header; -pub use header::{MessageHeader, HEADER_LENGTH}; - -mod payload; -pub use payload::MessagePayload; - -pub const PROTOCOL_VERSION: u8 = 0; - -#[derive(thiserror::Error, Debug)] -pub enum DecodeError { - #[error("invalid version {0} (expected {1})")] - InvalidVersion(u8, u8), - #[error("invalid paylaod type {0}")] - InvalidPayloadType(u8), - #[error("invalid boolean {0}")] - InvalidBoolean(u8), +pub mod message; + +use message::{ + header::{MessageHeader, HEADER_LENGTH}, + payload::MessagePayload, + Message, +}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::TcpStream, + sync::{mpsc, oneshot}, +}; +use tracing::{error, info}; + +enum ProtocolMessage { + ReadMessage { + respond_to: oneshot::Sender, + }, + SendMessage { + payload: MessagePayload, + }, } -#[derive(Debug, PartialEq)] -pub struct Message { - //header: MessageHeader, - payload: MessagePayload, +struct ProtocolActor { + receiver: mpsc::Receiver, + stream: TcpStream, } -impl Message { - pub fn new(payload: MessagePayload) -> Message { - Message { payload } +impl ProtocolActor { + fn new(receiver: mpsc::Receiver, stream: TcpStream) -> ProtocolActor { + ProtocolActor { receiver, stream } } - pub fn encode(&self) -> Vec { - let payload_type = (&self.payload).into(); - let payload_bytes = self.payload.encode(); + pub async fn run(&mut self) { + info!("enter run"); + while let Some(msg) = self.receiver.recv().await { + info!("handle in run"); + self.handle_message(msg).await; + } + info!("exit run"); + } - let header = MessageHeader::new(payload_type, payload_bytes.len() as u32); - let header_bytes = header.encode(); + async fn handle_message(&mut self, msg: ProtocolMessage) { + match msg { + ProtocolMessage::ReadMessage { respond_to } => { + let mut header_bytes = [0; HEADER_LENGTH]; + if self.stream.read_exact(&mut header_bytes).await.is_err() { + //break; + } + let header = match MessageHeader::decode(header_bytes) { + Ok(header) => header, + Err(e) => { + error!("Error reading header: {}", e.to_string()); + //continue; + return; + } + }; - let mut buf = vec![]; - buf.extend(header_bytes); - buf.extend(payload_bytes); - buf - } + let mut payload_bytes = vec![0; header.payload_length as usize]; + if self.stream.read_exact(&mut payload_bytes).await.is_err() { + //break; + } + let mut payload_reader = std::io::Cursor::new(payload_bytes); + let payload = match MessagePayload::decode(header.payload_type, &mut payload_reader) + { + Ok(payload) => payload, + Err(e) => { + error!("Error reading payload: {}", e.to_string()); + //continue; + return; + } + }; - pub fn decode(buf: &mut impl std::io::Read) -> Result> { - let mut header_bytes = [0; HEADER_LENGTH]; - buf.read_exact(&mut header_bytes)?; - let header = MessageHeader::decode(header_bytes)?; + let _ = respond_to.send(payload); + } + ProtocolMessage::SendMessage { payload } => { + let message_bytes = Message::new(payload).encode(); - let mut payload_bytes = vec![0; header.payload_length as usize]; - buf.read_exact(&mut payload_bytes)?; - let mut payload_reader = std::io::Cursor::new(payload_bytes); - let payload = MessagePayload::decode(header.payload_type, &mut payload_reader)?; + info!("help 2?"); - Ok(Message { payload }) + let _ = self.stream.write_all(&message_bytes).await; + } + } } } -#[test] -fn test_round_trip_message_authentication() { - let sent_message = Message::new(MessagePayload::Authentication { - key: "this_is_a_key".to_owned(), - }); +#[derive(Clone)] +pub struct ProtocolActorHandle { + sender: mpsc::Sender, +} + +impl ProtocolActorHandle { + pub fn new(stream: TcpStream) -> ProtocolActorHandle { + let (sender, receiver) = mpsc::channel(8); + let mut actor = ProtocolActor::new(receiver, stream); + tokio::task::spawn(async move { actor.run().await }); + + ProtocolActorHandle { sender } + } + + pub async fn read_message(&self) -> MessagePayload { + let (send, recv) = oneshot::channel(); + let msg = ProtocolMessage::ReadMessage { respond_to: send }; - let sent_message_bytes = sent_message.encode(); + info!("read"); + + let _ = self.sender.send(msg).await; + recv.await.unwrap() // handle, task has been killed + } - let mut reader = std::io::Cursor::new(sent_message_bytes); - let received_message = Message::decode(&mut reader).unwrap(); + pub async fn send_message( + &self, + payload: MessagePayload, + ) -> Result<(), Box> { + let msg = ProtocolMessage::SendMessage { payload }; - assert_eq!(sent_message, received_message); + info!("send"); + + let _ = self.sender.send(msg).await; + + Ok(()) + } } diff --git a/emerwen-protocol/src/header.rs b/emerwen-protocol/src/message/header.rs similarity index 97% rename from emerwen-protocol/src/header.rs rename to emerwen-protocol/src/message/header.rs index 76eb68c..d528b26 100644 --- a/emerwen-protocol/src/header.rs +++ b/emerwen-protocol/src/message/header.rs @@ -1,4 +1,4 @@ -use crate::{DecodeError, PROTOCOL_VERSION}; +use super::{DecodeError, PROTOCOL_VERSION}; pub const HEADER_LENGTH: usize = 6; diff --git a/emerwen-protocol/src/message/mod.rs b/emerwen-protocol/src/message/mod.rs new file mode 100644 index 0000000..d91a13f --- /dev/null +++ b/emerwen-protocol/src/message/mod.rs @@ -0,0 +1,69 @@ +pub mod header; +pub mod payload; + +use header::{MessageHeader, HEADER_LENGTH}; +use payload::MessagePayload; + +pub const PROTOCOL_VERSION: u8 = 0; + +#[derive(thiserror::Error, Debug)] +pub enum DecodeError { + #[error("invalid version {0} (expected {1})")] + InvalidVersion(u8, u8), + #[error("invalid paylaod type {0}")] + InvalidPayloadType(u8), + #[error("invalid boolean {0}")] + InvalidBoolean(u8), +} + +#[derive(Debug, PartialEq)] +pub struct Message { + //header: MessageHeader, + payload: MessagePayload, +} + +impl Message { + pub fn new(payload: MessagePayload) -> Message { + Message { payload } + } + + pub fn encode(&self) -> Vec { + let payload_type = (&self.payload).into(); + let payload_bytes = self.payload.encode(); + + let header = MessageHeader::new(payload_type, payload_bytes.len() as u32); + let header_bytes = header.encode(); + + let mut buf = vec![]; + buf.extend(header_bytes); + buf.extend(payload_bytes); + buf + } + + pub fn decode(buf: &mut impl std::io::Read) -> Result> { + let mut header_bytes = [0; HEADER_LENGTH]; + buf.read_exact(&mut header_bytes)?; + let header = MessageHeader::decode(header_bytes)?; + + let mut payload_bytes = vec![0; header.payload_length as usize]; + buf.read_exact(&mut payload_bytes)?; + let mut payload_reader = std::io::Cursor::new(payload_bytes); + let payload = MessagePayload::decode(header.payload_type, &mut payload_reader)?; + + Ok(Message { payload }) + } +} + +#[test] +fn test_round_trip_message_authentication() { + let sent_message = Message::new(MessagePayload::Authentication { + key: "this_is_a_key".to_owned(), + }); + + let sent_message_bytes = sent_message.encode(); + + let mut reader = std::io::Cursor::new(sent_message_bytes); + let received_message = Message::decode(&mut reader).unwrap(); + + assert_eq!(sent_message, received_message); +} diff --git a/emerwen-protocol/src/payload.rs b/emerwen-protocol/src/message/payload.rs similarity index 99% rename from emerwen-protocol/src/payload.rs rename to emerwen-protocol/src/message/payload.rs index b1e683b..0388d88 100644 --- a/emerwen-protocol/src/payload.rs +++ b/emerwen-protocol/src/message/payload.rs @@ -1,6 +1,6 @@ use byteorder::{BigEndian, ReadBytesExt}; -use crate::DecodeError; +use super::DecodeError; /// Message Payload /// diff --git a/emerwen-types/src/lib.rs b/emerwen-types/src/lib.rs index 7e00f09..a712e28 100644 --- a/emerwen-types/src/lib.rs +++ b/emerwen-types/src/lib.rs @@ -7,7 +7,7 @@ pub enum TargetMethod { } pub struct Target { - pub id: u32, + pub id: u16, pub addr: String, /// Check interval in ms pub interval: u32, diff --git a/emerwen-worker/src/client.rs b/emerwen-worker/src/client.rs new file mode 100644 index 0000000..5dbdde5 --- /dev/null +++ b/emerwen-worker/src/client.rs @@ -0,0 +1,64 @@ +use emerwen_protocol::{message::payload::MessagePayload, ProtocolActorHandle}; +use tokio::{ + net::TcpStream, + sync::mpsc::{Receiver, Sender}, +}; +use tracing::info; + +pub enum ClientMessage { + SendPayload { payload: MessagePayload }, +} + +pub struct Client { + addr: String, + worker_tx: Sender, + payload_rx: Receiver, +} + +impl Client { + pub fn new( + addr: impl Into, + worker_tx: Sender, + payload_rx: Receiver, + ) -> Client { + Client { + addr: addr.into(), + worker_tx, + payload_rx, + } + } + + pub async fn run(&mut self) -> Result<(), Box> { + let socket = TcpStream::connect(&self.addr).await?; + + let handle = ProtocolActorHandle::new(socket); + let handle_read = handle.clone(); + + tokio::task::spawn(async move { + let handle = handle_read; + loop { + let message = handle.read_message().await; + + info!("Received message: {:?}", message); + + // match payload bla bla + // write something back if appliccable + + // use worker_tx to send data to main worker thread + } + }); + + info!("before prx"); + while let Some(payload) = self.payload_rx.recv().await { + info!("prx"); + let _ = handle.send_message(payload).await; + } + info!("after prx"); + + // read some channel that comes passed from main to this server struct + // and send messages yeah. + //let message = handle.send_message(Message).await; + + Ok(()) + } +} diff --git a/emerwen-worker/src/main.rs b/emerwen-worker/src/main.rs index 7e562b7..236f0b4 100644 --- a/emerwen-worker/src/main.rs +++ b/emerwen-worker/src/main.rs @@ -1,15 +1,15 @@ use std::time::Duration; use clap::Parser; -use emerwen_protocol::{MessageHeader, MessagePayload, HEADER_LENGTH}; +use client::ClientMessage; +use emerwen_protocol::message::payload::MessagePayload; use emerwen_types::{Target, TargetMethod}; -use tokio::{ - io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}, - net::TcpListener, -}; -use tracing::{debug, error, info, level_filters::LevelFilter}; +use tokio::sync::mpsc; +use tracing::{debug, info, level_filters::LevelFilter}; use tracing_subscriber::{prelude::*, EnvFilter}; +mod client; + #[derive(Parser)] #[command(version)] struct Args { @@ -44,6 +44,11 @@ async fn main() { debug!("Hello, debug!"); + // Channel for sending payloads to Master via Client + let (payload_tx, payload_rx) = mpsc::channel(64); + // Channel for sending messages to Client via Master + let (worker_tx, worker_rx) = mpsc::channel(64); + let targets = vec![ Target { id: 0, @@ -68,35 +73,58 @@ async fn main() { ]; for target in targets { + let payload_tx = payload_tx.clone(); tokio::task::spawn(async move { info!("Starting monitoring task for target {}", target.id); let sleep_duration = Duration::from_millis(target.interval.into()); loop { + let payload; match target.check().await { Ok((up, _response)) => { if !up { info!("Target {} down!", target.id); + payload = MessagePayload::TargetDown { + target_id: target.id, + }; + } else { + payload = MessagePayload::TargetUp { + target_id: target.id, + }; } } Err(_) => { // We failed to check the state, status is unknown (grey). - info!("Target {} unknown!", target.id); + payload = MessagePayload::TargetUnknown { + target_id: target.id, + }; } } + let _ = payload_tx.send(payload).await; + tokio::time::sleep(sleep_duration).await; } }); } + let mut client = client::Client::new("127.0.0.1:8000", worker_tx, payload_rx); + let _ = client.run().await; + + loop { + // read worker_rx, i guess + } + + /* //tokio::task::spawn(async move { let worker_server = WorkerServer::new("127.0.0.1:8000"); let _ = worker_server.run().await; //}); + */ } +/* struct WorkerServer { addr: String, } @@ -171,3 +199,4 @@ impl WorkerServer { } } } +*/ -- 2.44.1