From 36a2db664cb9b35a9b296226f3f8bb3809f5f92d Mon Sep 17 00:00:00 2001 From: Jonni Liljamo Date: Fri, 8 Nov 2024 17:05:30 +0200 Subject: [PATCH] feat: protocol testing --- emerwen-master/src/main.rs | 29 +++++++++------- emerwen-protocol/src/header.rs | 2 +- emerwen-protocol/src/lib.rs | 4 +-- emerwen-worker/src/main.rs | 60 +++++++++++++++++++++++++++++----- 4 files changed, 72 insertions(+), 23 deletions(-) diff --git a/emerwen-master/src/main.rs b/emerwen-master/src/main.rs index 6a994a0..7c54c8c 100644 --- a/emerwen-master/src/main.rs +++ b/emerwen-master/src/main.rs @@ -1,5 +1,5 @@ use clap::Parser; -use emerwen_protocol::Message; +use emerwen_protocol::{Message, MessagePayload}; use tokio::{io::AsyncWriteExt, net::TcpStream}; use tracing::{debug, info, level_filters::LevelFilter}; use tracing_subscriber::{prelude::*, EnvFilter}; @@ -40,17 +40,19 @@ async fn main() { let mut worker_client = WorkerClient::new("127.0.0.1:8000"); let _ = worker_client.connect().await; - let _ = worker_client - .send_message(&Message::Authentication { - key: "test".to_owned(), + .send_message(MessagePayload::Authentication { + key: "avain".to_owned(), }) - .await; + .await + .unwrap(); let _ = worker_client - .send_message(&Message::Authentication { - key: "askdas".to_owned(), + .send_message(MessagePayload::TargetStateChange { + target_id: 42, + state: true, }) - .await; + .await + .unwrap(); } struct WorkerClient { @@ -73,11 +75,14 @@ impl WorkerClient { pub async fn send_message( &mut self, - message: &Message, - ) -> Result<(), Box> { + payload: MessagePayload, + ) -> Result, Box> { if let Some(stream) = &mut self.stream { - stream.write_all(&message.encode()?).await?; + let message = Message::new(payload).encode(); + stream.write_all(&message).await?; + stream.flush().await.unwrap(); } - Ok(()) + + Ok(None) } } diff --git a/emerwen-protocol/src/header.rs b/emerwen-protocol/src/header.rs index 91f5d43..76eb68c 100644 --- a/emerwen-protocol/src/header.rs +++ b/emerwen-protocol/src/header.rs @@ -1,6 +1,6 @@ use crate::{DecodeError, PROTOCOL_VERSION}; -pub(crate) const HEADER_LENGTH: usize = 6; +pub const HEADER_LENGTH: usize = 6; /// Message Header /// diff --git a/emerwen-protocol/src/lib.rs b/emerwen-protocol/src/lib.rs index c91b15a..cc389a5 100644 --- a/emerwen-protocol/src/lib.rs +++ b/emerwen-protocol/src/lib.rs @@ -1,8 +1,8 @@ mod header; -use header::{MessageHeader, HEADER_LENGTH}; +pub use header::{MessageHeader, HEADER_LENGTH}; mod payload; -use payload::MessagePayload; +pub use payload::MessagePayload; pub const PROTOCOL_VERSION: u8 = 0; diff --git a/emerwen-worker/src/main.rs b/emerwen-worker/src/main.rs index a15bfb3..b64fc11 100644 --- a/emerwen-worker/src/main.rs +++ b/emerwen-worker/src/main.rs @@ -1,6 +1,9 @@ use clap::Parser; -use emerwen_protocol::Message; -use tokio::net::TcpListener; +use emerwen_protocol::{MessageHeader, MessagePayload, HEADER_LENGTH}; +use tokio::{ + io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}, + net::TcpListener, +}; use tracing::{debug, error, info, level_filters::LevelFilter}; use tracing_subscriber::{prelude::*, EnvFilter}; @@ -56,21 +59,62 @@ impl WorkerServer { info!("Worker Server listening on {}", self.addr); loop { - let (mut stream, addr) = listener.accept().await?; + let (mut socket, addr) = listener.accept().await?; + info!("Client '{}' connected", addr); tokio::task::spawn(async move { + let (socket_r, mut socket_w) = socket.split(); + let mut reader = BufReader::new(socket_r); + loop { - let message = match Message::decode(&mut stream).await { - Ok(message) => message, + let mut header_bytes = [0; HEADER_LENGTH]; + if reader.read_exact(&mut header_bytes).await.is_err() { + break; + } + let header = match MessageHeader::decode(header_bytes) { + Ok(header) => header, Err(e) => { - error!("Error decoding message: {:?}", e); - + error!("Error reading header: {}", e.to_string()); continue; } }; - debug!("{:?}", message); + + let mut payload_bytes = vec![0; header.payload_length as usize]; + if reader.read_exact(&mut payload_bytes).await.is_err() { + break; + } + let mut payload_reader = std::io::Cursor::new(payload_bytes); + let payload = + match MessagePayload::decode(header.payload_type, &mut payload_reader) { + Ok(payload) => payload, + Err(e) => { + error!("Error reading payload: {}", e.to_string()); + continue; + } + }; + + info!("Received message: {:?}", payload); + + // match payload bla bla + // write something back if appliccable + } + + info!("Client '{}' disconnected", addr); + + /* + let mut line = String::new(); + loop { + let bytes_read = reader.read_line(&mut line).await.unwrap(); + if bytes_read == 0 { + info!("Client '{}' disconnected", addr); + break; + } + + socket_w.write_all(&line.as_bytes()).await.unwrap(); + line.clear(); } + */ }); } } -- 2.44.1