M emerwen-master/src/main.rs => emerwen-master/src/main.rs +17 -12
@@ 1,5 1,5 @@
use clap::Parser;
-use emerwen_protocol::Message;
+use emerwen_protocol::{Message, MessagePayload};
use tokio::{io::AsyncWriteExt, net::TcpStream};
use tracing::{debug, info, level_filters::LevelFilter};
use tracing_subscriber::{prelude::*, EnvFilter};
@@ 40,17 40,19 @@ async fn main() {
let mut worker_client = WorkerClient::new("127.0.0.1:8000");
let _ = worker_client.connect().await;
-
let _ = worker_client
- .send_message(&Message::Authentication {
- key: "test".to_owned(),
+ .send_message(MessagePayload::Authentication {
+ key: "avain".to_owned(),
})
- .await;
+ .await
+ .unwrap();
let _ = worker_client
- .send_message(&Message::Authentication {
- key: "askdas".to_owned(),
+ .send_message(MessagePayload::TargetStateChange {
+ target_id: 42,
+ state: true,
})
- .await;
+ .await
+ .unwrap();
}
struct WorkerClient {
@@ 73,11 75,14 @@ impl WorkerClient {
pub async fn send_message(
&mut self,
- message: &Message,
- ) -> Result<(), Box<dyn std::error::Error>> {
+ payload: MessagePayload,
+ ) -> Result<Option<MessagePayload>, Box<dyn std::error::Error>> {
if let Some(stream) = &mut self.stream {
- stream.write_all(&message.encode()?).await?;
+ let message = Message::new(payload).encode();
+ stream.write_all(&message).await?;
+ stream.flush().await.unwrap();
}
- Ok(())
+
+ Ok(None)
}
}
M => +1 -1
@@ 1,6 1,6 @@
use crate::{DecodeError, PROTOCOL_VERSION};
pub(crate) const HEADER_LENGTH: usize = 6;
pub const HEADER_LENGTH: usize = 6;
/// Message Header
///
M emerwen-protocol/src/lib.rs => emerwen-protocol/src/lib.rs +2 -2
@@ 1,8 1,8 @@
mod header;
-use header::{MessageHeader, HEADER_LENGTH};
+pub use header::{MessageHeader, HEADER_LENGTH};
mod payload;
-use payload::MessagePayload;
+pub use payload::MessagePayload;
pub const PROTOCOL_VERSION: u8 = 0;
M emerwen-worker/src/main.rs => emerwen-worker/src/main.rs +52 -8
@@ 1,6 1,9 @@
use clap::Parser;
-use emerwen_protocol::Message;
-use tokio::net::TcpListener;
+use emerwen_protocol::{MessageHeader, MessagePayload, HEADER_LENGTH};
+use tokio::{
+ io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
+ net::TcpListener,
+};
use tracing::{debug, error, info, level_filters::LevelFilter};
use tracing_subscriber::{prelude::*, EnvFilter};
@@ 56,21 59,62 @@ impl WorkerServer {
info!("Worker Server listening on {}", self.addr);
loop {
- let (mut stream, addr) = listener.accept().await?;
+ let (mut socket, addr) = listener.accept().await?;
+
info!("Client '{}' connected", addr);
tokio::task::spawn(async move {
+ let (socket_r, mut socket_w) = socket.split();
+ let mut reader = BufReader::new(socket_r);
+
loop {
- let message = match Message::decode(&mut stream).await {
- Ok(message) => message,
+ let mut header_bytes = [0; HEADER_LENGTH];
+ if reader.read_exact(&mut header_bytes).await.is_err() {
+ break;
+ }
+ let header = match MessageHeader::decode(header_bytes) {
+ Ok(header) => header,
Err(e) => {
- error!("Error decoding message: {:?}", e);
-
+ error!("Error reading header: {}", e.to_string());
continue;
}
};
- debug!("{:?}", message);
+
+ let mut payload_bytes = vec![0; header.payload_length as usize];
+ if reader.read_exact(&mut payload_bytes).await.is_err() {
+ break;
+ }
+ let mut payload_reader = std::io::Cursor::new(payload_bytes);
+ let payload =
+ match MessagePayload::decode(header.payload_type, &mut payload_reader) {
+ Ok(payload) => payload,
+ Err(e) => {
+ error!("Error reading payload: {}", e.to_string());
+ continue;
+ }
+ };
+
+ info!("Received message: {:?}", payload);
+
+ // match payload bla bla
+ // write something back if appliccable
+ }
+
+ info!("Client '{}' disconnected", addr);
+
+ /*
+ let mut line = String::new();
+ loop {
+ let bytes_read = reader.read_line(&mut line).await.unwrap();
+ if bytes_read == 0 {
+ info!("Client '{}' disconnected", addr);
+ break;
+ }
+
+ socket_w.write_all(&line.as_bytes()).await.unwrap();
+ line.clear();
}
+ */
});
}
}