M Cargo.lock => Cargo.lock +1 -0
@@ 240,6 240,7 @@ dependencies = [
"emerwen-types",
"thiserror 2.0.0",
"tokio",
+ "tracing",
]
[[package]]
M emerwen-master/src/main.rs => emerwen-master/src/main.rs +9 -2
@@ 1,9 1,9 @@
use clap::Parser;
-use emerwen_protocol::{Message, MessagePayload};
-use tokio::{io::AsyncWriteExt, net::TcpStream};
use tracing::{debug, info, level_filters::LevelFilter};
use tracing_subscriber::{prelude::*, EnvFilter};
+mod server;
+
#[derive(Parser)]
#[command(version)]
struct Args {
@@ 38,6 38,10 @@ async fn main() {
debug!("Hello, debug!");
+ let server = server::Server::new("127.0.0.1:8000");
+ let _ = server.run().await;
+
+ /*
let mut worker_client = WorkerClient::new("127.0.0.1:8000");
let _ = worker_client.connect().await;
let _ = worker_client
@@ 46,8 50,10 @@ async fn main() {
})
.await
.unwrap();
+ */
}
+/*
struct WorkerClient {
addr: String,
stream: Option<TcpStream>,
@@ 79,3 85,4 @@ impl WorkerClient {
Ok(None)
}
}
+*/
A emerwen-master/src/server.rs => emerwen-master/src/server.rs +48 -0
@@ 0,0 1,48 @@
+use emerwen_protocol::ProtocolActorHandle;
+use tokio::net::TcpListener;
+use tracing::info;
+
+pub struct Server {
+ addr: String,
+}
+
+impl Server {
+ pub fn new(addr: impl Into<String>) -> Server {
+ Server { addr: addr.into() }
+ }
+
+ pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
+ let listener = TcpListener::bind(&self.addr).await?;
+ info!("Server listening on {}", self.addr);
+
+ loop {
+ let (socket, addr) = listener.accept().await?;
+
+ info!("Client '{}' connected", addr);
+
+ let handle_read = ProtocolActorHandle::new(socket);
+ let handle_write = handle_read.clone();
+
+ tokio::task::spawn(async move {
+ let handle = handle_read;
+ loop {
+ let message = handle.read_message().await;
+
+ info!("Received message: {:?}", message);
+
+ // match payload bla bla
+ // write something back if appliccable
+ }
+ });
+
+ tokio::task::spawn(async move {
+ let handle = handle_write;
+ loop {
+ // read some channel that comes passed from main to this server struct
+ // and send messages yeah.
+ //let message = handle.send_message(Message).await;
+ }
+ });
+ }
+ }
+}
M emerwen-protocol/Cargo.toml => emerwen-protocol/Cargo.toml +1 -0
@@ 14,3 14,4 @@ emerwen-types = { path = "../emerwen-types" }
byteorder = "1"
thiserror = "2"
tokio = { version = "1", features = ["full"] }
+tracing = "0.1"
M emerwen-protocol/src/lib.rs => emerwen-protocol/src/lib.rs +105 -51
@@ 1,69 1,123 @@
-mod header;
-pub use header::{MessageHeader, HEADER_LENGTH};
-
-mod payload;
-pub use payload::MessagePayload;
-
-pub const PROTOCOL_VERSION: u8 = 0;
-
-#[derive(thiserror::Error, Debug)]
-pub enum DecodeError {
- #[error("invalid version {0} (expected {1})")]
- InvalidVersion(u8, u8),
- #[error("invalid paylaod type {0}")]
- InvalidPayloadType(u8),
- #[error("invalid boolean {0}")]
- InvalidBoolean(u8),
+pub mod message;
+
+use message::{
+ header::{MessageHeader, HEADER_LENGTH},
+ payload::MessagePayload,
+ Message,
+};
+use tokio::{
+ io::{AsyncReadExt, AsyncWriteExt},
+ net::TcpStream,
+ sync::{mpsc, oneshot},
+};
+use tracing::{error, info};
+
+enum ProtocolMessage {
+ ReadMessage {
+ respond_to: oneshot::Sender<MessagePayload>,
+ },
+ SendMessage {
+ payload: MessagePayload,
+ },
}
-#[derive(Debug, PartialEq)]
-pub struct Message {
- //header: MessageHeader,
- payload: MessagePayload,
+struct ProtocolActor {
+ receiver: mpsc::Receiver<ProtocolMessage>,
+ stream: TcpStream,
}
-impl Message {
- pub fn new(payload: MessagePayload) -> Message {
- Message { payload }
+impl ProtocolActor {
+ fn new(receiver: mpsc::Receiver<ProtocolMessage>, stream: TcpStream) -> ProtocolActor {
+ ProtocolActor { receiver, stream }
}
- pub fn encode(&self) -> Vec<u8> {
- let payload_type = (&self.payload).into();
- let payload_bytes = self.payload.encode();
+ pub async fn run(&mut self) {
+ info!("enter run");
+ while let Some(msg) = self.receiver.recv().await {
+ info!("handle in run");
+ self.handle_message(msg).await;
+ }
+ info!("exit run");
+ }
- let header = MessageHeader::new(payload_type, payload_bytes.len() as u32);
- let header_bytes = header.encode();
+ async fn handle_message(&mut self, msg: ProtocolMessage) {
+ match msg {
+ ProtocolMessage::ReadMessage { respond_to } => {
+ let mut header_bytes = [0; HEADER_LENGTH];
+ if self.stream.read_exact(&mut header_bytes).await.is_err() {
+ //break;
+ }
+ let header = match MessageHeader::decode(header_bytes) {
+ Ok(header) => header,
+ Err(e) => {
+ error!("Error reading header: {}", e.to_string());
+ //continue;
+ return;
+ }
+ };
- let mut buf = vec![];
- buf.extend(header_bytes);
- buf.extend(payload_bytes);
- buf
- }
+ let mut payload_bytes = vec![0; header.payload_length as usize];
+ if self.stream.read_exact(&mut payload_bytes).await.is_err() {
+ //break;
+ }
+ let mut payload_reader = std::io::Cursor::new(payload_bytes);
+ let payload = match MessagePayload::decode(header.payload_type, &mut payload_reader)
+ {
+ Ok(payload) => payload,
+ Err(e) => {
+ error!("Error reading payload: {}", e.to_string());
+ //continue;
+ return;
+ }
+ };
- pub fn decode(buf: &mut impl std::io::Read) -> Result<Message, Box<dyn std::error::Error>> {
- let mut header_bytes = [0; HEADER_LENGTH];
- buf.read_exact(&mut header_bytes)?;
- let header = MessageHeader::decode(header_bytes)?;
+ let _ = respond_to.send(payload);
+ }
+ ProtocolMessage::SendMessage { payload } => {
+ let message_bytes = Message::new(payload).encode();
- let mut payload_bytes = vec![0; header.payload_length as usize];
- buf.read_exact(&mut payload_bytes)?;
- let mut payload_reader = std::io::Cursor::new(payload_bytes);
- let payload = MessagePayload::decode(header.payload_type, &mut payload_reader)?;
+ info!("help 2?");
- Ok(Message { payload })
+ let _ = self.stream.write_all(&message_bytes).await;
+ }
+ }
}
}
-#[test]
-fn test_round_trip_message_authentication() {
- let sent_message = Message::new(MessagePayload::Authentication {
- key: "this_is_a_key".to_owned(),
- });
+#[derive(Clone)]
+pub struct ProtocolActorHandle {
+ sender: mpsc::Sender<ProtocolMessage>,
+}
+
+impl ProtocolActorHandle {
+ pub fn new(stream: TcpStream) -> ProtocolActorHandle {
+ let (sender, receiver) = mpsc::channel(8);
+ let mut actor = ProtocolActor::new(receiver, stream);
+ tokio::task::spawn(async move { actor.run().await });
+
+ ProtocolActorHandle { sender }
+ }
+
+ pub async fn read_message(&self) -> MessagePayload {
+ let (send, recv) = oneshot::channel();
+ let msg = ProtocolMessage::ReadMessage { respond_to: send };
- let sent_message_bytes = sent_message.encode();
+ info!("read");
+
+ let _ = self.sender.send(msg).await;
+ recv.await.unwrap() // handle, task has been killed
+ }
- let mut reader = std::io::Cursor::new(sent_message_bytes);
- let received_message = Message::decode(&mut reader).unwrap();
+ pub async fn send_message(
+ &self,
+ payload: MessagePayload,
+ ) -> Result<(), Box<dyn std::error::Error>> {
+ let msg = ProtocolMessage::SendMessage { payload };
- assert_eq!(sent_message, received_message);
+ info!("send");
+
+ let _ = self.sender.send(msg).await;
+
+ Ok(())
+ }
}
R => +1 -1
@@ 1,4 1,4 @@
use crate::{DecodeError, PROTOCOL_VERSION};
use super::{DecodeError, PROTOCOL_VERSION};
pub const HEADER_LENGTH: usize = 6;
A emerwen-protocol/src/message/mod.rs => emerwen-protocol/src/message/mod.rs +69 -0
@@ 0,0 1,69 @@
+pub mod header;
+pub mod payload;
+
+use header::{MessageHeader, HEADER_LENGTH};
+use payload::MessagePayload;
+
+pub const PROTOCOL_VERSION: u8 = 0;
+
+#[derive(thiserror::Error, Debug)]
+pub enum DecodeError {
+ #[error("invalid version {0} (expected {1})")]
+ InvalidVersion(u8, u8),
+ #[error("invalid paylaod type {0}")]
+ InvalidPayloadType(u8),
+ #[error("invalid boolean {0}")]
+ InvalidBoolean(u8),
+}
+
+#[derive(Debug, PartialEq)]
+pub struct Message {
+ //header: MessageHeader,
+ payload: MessagePayload,
+}
+
+impl Message {
+ pub fn new(payload: MessagePayload) -> Message {
+ Message { payload }
+ }
+
+ pub fn encode(&self) -> Vec<u8> {
+ let payload_type = (&self.payload).into();
+ let payload_bytes = self.payload.encode();
+
+ let header = MessageHeader::new(payload_type, payload_bytes.len() as u32);
+ let header_bytes = header.encode();
+
+ let mut buf = vec![];
+ buf.extend(header_bytes);
+ buf.extend(payload_bytes);
+ buf
+ }
+
+ pub fn decode(buf: &mut impl std::io::Read) -> Result<Message, Box<dyn std::error::Error>> {
+ let mut header_bytes = [0; HEADER_LENGTH];
+ buf.read_exact(&mut header_bytes)?;
+ let header = MessageHeader::decode(header_bytes)?;
+
+ let mut payload_bytes = vec![0; header.payload_length as usize];
+ buf.read_exact(&mut payload_bytes)?;
+ let mut payload_reader = std::io::Cursor::new(payload_bytes);
+ let payload = MessagePayload::decode(header.payload_type, &mut payload_reader)?;
+
+ Ok(Message { payload })
+ }
+}
+
+#[test]
+fn test_round_trip_message_authentication() {
+ let sent_message = Message::new(MessagePayload::Authentication {
+ key: "this_is_a_key".to_owned(),
+ });
+
+ let sent_message_bytes = sent_message.encode();
+
+ let mut reader = std::io::Cursor::new(sent_message_bytes);
+ let received_message = Message::decode(&mut reader).unwrap();
+
+ assert_eq!(sent_message, received_message);
+}
R emerwen-protocol/src/payload.rs => emerwen-protocol/src/message/payload.rs +1 -1
@@ 1,6 1,6 @@
use byteorder::{BigEndian, ReadBytesExt};
-use crate::DecodeError;
+use super::DecodeError;
/// Message Payload
///
M emerwen-types/src/lib.rs => emerwen-types/src/lib.rs +1 -1
@@ 7,7 7,7 @@ pub enum TargetMethod {
}
pub struct Target {
- pub id: u32,
+ pub id: u16,
pub addr: String,
/// Check interval in ms
pub interval: u32,
A emerwen-worker/src/client.rs => emerwen-worker/src/client.rs +64 -0
@@ 0,0 1,64 @@
+use emerwen_protocol::{message::payload::MessagePayload, ProtocolActorHandle};
+use tokio::{
+ net::TcpStream,
+ sync::mpsc::{Receiver, Sender},
+};
+use tracing::info;
+
+pub enum ClientMessage {
+ SendPayload { payload: MessagePayload },
+}
+
+pub struct Client {
+ addr: String,
+ worker_tx: Sender<ClientMessage>,
+ payload_rx: Receiver<MessagePayload>,
+}
+
+impl Client {
+ pub fn new(
+ addr: impl Into<String>,
+ worker_tx: Sender<ClientMessage>,
+ payload_rx: Receiver<MessagePayload>,
+ ) -> Client {
+ Client {
+ addr: addr.into(),
+ worker_tx,
+ payload_rx,
+ }
+ }
+
+ pub async fn run(&mut self) -> Result<(), Box<dyn std::error::Error>> {
+ let socket = TcpStream::connect(&self.addr).await?;
+
+ let handle = ProtocolActorHandle::new(socket);
+ let handle_read = handle.clone();
+
+ tokio::task::spawn(async move {
+ let handle = handle_read;
+ loop {
+ let message = handle.read_message().await;
+
+ info!("Received message: {:?}", message);
+
+ // match payload bla bla
+ // write something back if appliccable
+
+ // use worker_tx to send data to main worker thread
+ }
+ });
+
+ info!("before prx");
+ while let Some(payload) = self.payload_rx.recv().await {
+ info!("prx");
+ let _ = handle.send_message(payload).await;
+ }
+ info!("after prx");
+
+ // read some channel that comes passed from main to this server struct
+ // and send messages yeah.
+ //let message = handle.send_message(Message).await;
+
+ Ok(())
+ }
+}
M emerwen-worker/src/main.rs => emerwen-worker/src/main.rs +36 -7
@@ 1,15 1,15 @@
use std::time::Duration;
use clap::Parser;
-use emerwen_protocol::{MessageHeader, MessagePayload, HEADER_LENGTH};
+use client::ClientMessage;
+use emerwen_protocol::message::payload::MessagePayload;
use emerwen_types::{Target, TargetMethod};
-use tokio::{
- io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
- net::TcpListener,
-};
-use tracing::{debug, error, info, level_filters::LevelFilter};
+use tokio::sync::mpsc;
+use tracing::{debug, info, level_filters::LevelFilter};
use tracing_subscriber::{prelude::*, EnvFilter};
+mod client;
+
#[derive(Parser)]
#[command(version)]
struct Args {
@@ 44,6 44,11 @@ async fn main() {
debug!("Hello, debug!");
+ // Channel for sending payloads to Master via Client
+ let (payload_tx, payload_rx) = mpsc::channel(64);
+ // Channel for sending messages to Client via Master
+ let (worker_tx, worker_rx) = mpsc::channel(64);
+
let targets = vec![
Target {
id: 0,
@@ 68,35 73,58 @@ async fn main() {
];
for target in targets {
+ let payload_tx = payload_tx.clone();
tokio::task::spawn(async move {
info!("Starting monitoring task for target {}", target.id);
let sleep_duration = Duration::from_millis(target.interval.into());
loop {
+ let payload;
match target.check().await {
Ok((up, _response)) => {
if !up {
info!("Target {} down!", target.id);
+ payload = MessagePayload::TargetDown {
+ target_id: target.id,
+ };
+ } else {
+ payload = MessagePayload::TargetUp {
+ target_id: target.id,
+ };
}
}
Err(_) => {
// We failed to check the state, status is unknown (grey).
- info!("Target {} unknown!", target.id);
+ payload = MessagePayload::TargetUnknown {
+ target_id: target.id,
+ };
}
}
+ let _ = payload_tx.send(payload).await;
+
tokio::time::sleep(sleep_duration).await;
}
});
}
+ let mut client = client::Client::new("127.0.0.1:8000", worker_tx, payload_rx);
+ let _ = client.run().await;
+
+ loop {
+ // read worker_rx, i guess
+ }
+
+ /*
//tokio::task::spawn(async move {
let worker_server = WorkerServer::new("127.0.0.1:8000");
let _ = worker_server.run().await;
//});
+ */
}
+/*
struct WorkerServer {
addr: String,
}
@@ 171,3 199,4 @@ impl WorkerServer {
}
}
}
+*/