DEVELOPMENT ENVIRONMENT

~liljamo/emerwen

1225af8e42a83251f51013fddcb2f368c108612c — Jonni Liljamo 2 months ago 33e3c26 initial-tcp-experimentation
feat: more experimentation
11 files changed, 336 insertions(+), 63 deletions(-)

M Cargo.lock
M emerwen-master/src/main.rs
A emerwen-master/src/server.rs
M emerwen-protocol/Cargo.toml
M emerwen-protocol/src/lib.rs
R emerwen-protocol/src/{header.rs => message/header.rs}
A emerwen-protocol/src/message/mod.rs
R emerwen-protocol/src/{payload.rs => message/payload.rs}
M emerwen-types/src/lib.rs
A emerwen-worker/src/client.rs
M emerwen-worker/src/main.rs
M Cargo.lock => Cargo.lock +1 -0
@@ 240,6 240,7 @@ dependencies = [
 "emerwen-types",
 "thiserror 2.0.0",
 "tokio",
 "tracing",
]

[[package]]

M emerwen-master/src/main.rs => emerwen-master/src/main.rs +9 -2
@@ 1,9 1,9 @@
use clap::Parser;
use emerwen_protocol::{Message, MessagePayload};
use tokio::{io::AsyncWriteExt, net::TcpStream};
use tracing::{debug, info, level_filters::LevelFilter};
use tracing_subscriber::{prelude::*, EnvFilter};

mod server;

#[derive(Parser)]
#[command(version)]
struct Args {


@@ 38,6 38,10 @@ async fn main() {

    debug!("Hello, debug!");

    let server = server::Server::new("127.0.0.1:8000");
    let _ = server.run().await;

    /*
    let mut worker_client = WorkerClient::new("127.0.0.1:8000");
    let _ = worker_client.connect().await;
    let _ = worker_client


@@ 46,8 50,10 @@ async fn main() {
        })
        .await
        .unwrap();
    */
}

/*
struct WorkerClient {
    addr: String,
    stream: Option<TcpStream>,


@@ 79,3 85,4 @@ impl WorkerClient {
        Ok(None)
    }
}
*/

A emerwen-master/src/server.rs => emerwen-master/src/server.rs +48 -0
@@ 0,0 1,48 @@
use emerwen_protocol::ProtocolActorHandle;
use tokio::net::TcpListener;
use tracing::info;

pub struct Server {
    addr: String,
}

impl Server {
    pub fn new(addr: impl Into<String>) -> Server {
        Server { addr: addr.into() }
    }

    pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
        let listener = TcpListener::bind(&self.addr).await?;
        info!("Server listening on {}", self.addr);

        loop {
            let (socket, addr) = listener.accept().await?;

            info!("Client '{}' connected", addr);

            let handle_read = ProtocolActorHandle::new(socket);
            let handle_write = handle_read.clone();

            tokio::task::spawn(async move {
                let handle = handle_read;
                loop {
                    let message = handle.read_message().await;

                    info!("Received message: {:?}", message);

                    // match payload bla bla
                    // write something back if appliccable
                }
            });

            tokio::task::spawn(async move {
                let handle = handle_write;
                loop {
                    // read some channel that comes passed from main to this server struct
                    // and send messages yeah.
                    //let message = handle.send_message(Message).await;
                }
            });
        }
    }
}

M emerwen-protocol/Cargo.toml => emerwen-protocol/Cargo.toml +1 -0
@@ 14,3 14,4 @@ emerwen-types = { path = "../emerwen-types" }
byteorder = "1"
thiserror = "2"
tokio = { version = "1", features = ["full"] }
tracing = "0.1"

M emerwen-protocol/src/lib.rs => emerwen-protocol/src/lib.rs +105 -51
@@ 1,69 1,123 @@
mod header;
pub use header::{MessageHeader, HEADER_LENGTH};

mod payload;
pub use payload::MessagePayload;

pub const PROTOCOL_VERSION: u8 = 0;

#[derive(thiserror::Error, Debug)]
pub enum DecodeError {
    #[error("invalid version {0} (expected {1})")]
    InvalidVersion(u8, u8),
    #[error("invalid paylaod type {0}")]
    InvalidPayloadType(u8),
    #[error("invalid boolean {0}")]
    InvalidBoolean(u8),
pub mod message;

use message::{
    header::{MessageHeader, HEADER_LENGTH},
    payload::MessagePayload,
    Message,
};
use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    net::TcpStream,
    sync::{mpsc, oneshot},
};
use tracing::{error, info};

enum ProtocolMessage {
    ReadMessage {
        respond_to: oneshot::Sender<MessagePayload>,
    },
    SendMessage {
        payload: MessagePayload,
    },
}

#[derive(Debug, PartialEq)]
pub struct Message {
    //header: MessageHeader,
    payload: MessagePayload,
struct ProtocolActor {
    receiver: mpsc::Receiver<ProtocolMessage>,
    stream: TcpStream,
}

impl Message {
    pub fn new(payload: MessagePayload) -> Message {
        Message { payload }
impl ProtocolActor {
    fn new(receiver: mpsc::Receiver<ProtocolMessage>, stream: TcpStream) -> ProtocolActor {
        ProtocolActor { receiver, stream }
    }

    pub fn encode(&self) -> Vec<u8> {
        let payload_type = (&self.payload).into();
        let payload_bytes = self.payload.encode();
    pub async fn run(&mut self) {
        info!("enter run");
        while let Some(msg) = self.receiver.recv().await {
            info!("handle in run");
            self.handle_message(msg).await;
        }
        info!("exit run");
    }

        let header = MessageHeader::new(payload_type, payload_bytes.len() as u32);
        let header_bytes = header.encode();
    async fn handle_message(&mut self, msg: ProtocolMessage) {
        match msg {
            ProtocolMessage::ReadMessage { respond_to } => {
                let mut header_bytes = [0; HEADER_LENGTH];
                if self.stream.read_exact(&mut header_bytes).await.is_err() {
                    //break;
                }
                let header = match MessageHeader::decode(header_bytes) {
                    Ok(header) => header,
                    Err(e) => {
                        error!("Error reading header: {}", e.to_string());
                        //continue;
                        return;
                    }
                };

        let mut buf = vec![];
        buf.extend(header_bytes);
        buf.extend(payload_bytes);
        buf
    }
                let mut payload_bytes = vec![0; header.payload_length as usize];
                if self.stream.read_exact(&mut payload_bytes).await.is_err() {
                    //break;
                }
                let mut payload_reader = std::io::Cursor::new(payload_bytes);
                let payload = match MessagePayload::decode(header.payload_type, &mut payload_reader)
                {
                    Ok(payload) => payload,
                    Err(e) => {
                        error!("Error reading payload: {}", e.to_string());
                        //continue;
                        return;
                    }
                };

    pub fn decode(buf: &mut impl std::io::Read) -> Result<Message, Box<dyn std::error::Error>> {
        let mut header_bytes = [0; HEADER_LENGTH];
        buf.read_exact(&mut header_bytes)?;
        let header = MessageHeader::decode(header_bytes)?;
                let _ = respond_to.send(payload);
            }
            ProtocolMessage::SendMessage { payload } => {
                let message_bytes = Message::new(payload).encode();

        let mut payload_bytes = vec![0; header.payload_length as usize];
        buf.read_exact(&mut payload_bytes)?;
        let mut payload_reader = std::io::Cursor::new(payload_bytes);
        let payload = MessagePayload::decode(header.payload_type, &mut payload_reader)?;
                info!("help 2?");

        Ok(Message { payload })
                let _ = self.stream.write_all(&message_bytes).await;
            }
        }
    }
}

#[test]
fn test_round_trip_message_authentication() {
    let sent_message = Message::new(MessagePayload::Authentication {
        key: "this_is_a_key".to_owned(),
    });
#[derive(Clone)]
pub struct ProtocolActorHandle {
    sender: mpsc::Sender<ProtocolMessage>,
}

impl ProtocolActorHandle {
    pub fn new(stream: TcpStream) -> ProtocolActorHandle {
        let (sender, receiver) = mpsc::channel(8);
        let mut actor = ProtocolActor::new(receiver, stream);
        tokio::task::spawn(async move { actor.run().await });

        ProtocolActorHandle { sender }
    }

    pub async fn read_message(&self) -> MessagePayload {
        let (send, recv) = oneshot::channel();
        let msg = ProtocolMessage::ReadMessage { respond_to: send };

    let sent_message_bytes = sent_message.encode();
        info!("read");

        let _ = self.sender.send(msg).await;
        recv.await.unwrap() // handle, task has been killed
    }

    let mut reader = std::io::Cursor::new(sent_message_bytes);
    let received_message = Message::decode(&mut reader).unwrap();
    pub async fn send_message(
        &self,
        payload: MessagePayload,
    ) -> Result<(), Box<dyn std::error::Error>> {
        let msg = ProtocolMessage::SendMessage { payload };

    assert_eq!(sent_message, received_message);
        info!("send");

        let _ = self.sender.send(msg).await;

        Ok(())
    }
}

R emerwen-protocol/src/header.rs => emerwen-protocol/src/message/header.rs +1 -1
@@ 1,4 1,4 @@
use crate::{DecodeError, PROTOCOL_VERSION};
use super::{DecodeError, PROTOCOL_VERSION};

pub const HEADER_LENGTH: usize = 6;


A emerwen-protocol/src/message/mod.rs => emerwen-protocol/src/message/mod.rs +69 -0
@@ 0,0 1,69 @@
pub mod header;
pub mod payload;

use header::{MessageHeader, HEADER_LENGTH};
use payload::MessagePayload;

pub const PROTOCOL_VERSION: u8 = 0;

#[derive(thiserror::Error, Debug)]
pub enum DecodeError {
    #[error("invalid version {0} (expected {1})")]
    InvalidVersion(u8, u8),
    #[error("invalid paylaod type {0}")]
    InvalidPayloadType(u8),
    #[error("invalid boolean {0}")]
    InvalidBoolean(u8),
}

#[derive(Debug, PartialEq)]
pub struct Message {
    //header: MessageHeader,
    payload: MessagePayload,
}

impl Message {
    pub fn new(payload: MessagePayload) -> Message {
        Message { payload }
    }

    pub fn encode(&self) -> Vec<u8> {
        let payload_type = (&self.payload).into();
        let payload_bytes = self.payload.encode();

        let header = MessageHeader::new(payload_type, payload_bytes.len() as u32);
        let header_bytes = header.encode();

        let mut buf = vec![];
        buf.extend(header_bytes);
        buf.extend(payload_bytes);
        buf
    }

    pub fn decode(buf: &mut impl std::io::Read) -> Result<Message, Box<dyn std::error::Error>> {
        let mut header_bytes = [0; HEADER_LENGTH];
        buf.read_exact(&mut header_bytes)?;
        let header = MessageHeader::decode(header_bytes)?;

        let mut payload_bytes = vec![0; header.payload_length as usize];
        buf.read_exact(&mut payload_bytes)?;
        let mut payload_reader = std::io::Cursor::new(payload_bytes);
        let payload = MessagePayload::decode(header.payload_type, &mut payload_reader)?;

        Ok(Message { payload })
    }
}

#[test]
fn test_round_trip_message_authentication() {
    let sent_message = Message::new(MessagePayload::Authentication {
        key: "this_is_a_key".to_owned(),
    });

    let sent_message_bytes = sent_message.encode();

    let mut reader = std::io::Cursor::new(sent_message_bytes);
    let received_message = Message::decode(&mut reader).unwrap();

    assert_eq!(sent_message, received_message);
}

R emerwen-protocol/src/payload.rs => emerwen-protocol/src/message/payload.rs +1 -1
@@ 1,6 1,6 @@
use byteorder::{BigEndian, ReadBytesExt};

use crate::DecodeError;
use super::DecodeError;

/// Message Payload
///

M emerwen-types/src/lib.rs => emerwen-types/src/lib.rs +1 -1
@@ 7,7 7,7 @@ pub enum TargetMethod {
}

pub struct Target {
    pub id: u32,
    pub id: u16,
    pub addr: String,
    /// Check interval in ms
    pub interval: u32,

A emerwen-worker/src/client.rs => emerwen-worker/src/client.rs +64 -0
@@ 0,0 1,64 @@
use emerwen_protocol::{message::payload::MessagePayload, ProtocolActorHandle};
use tokio::{
    net::TcpStream,
    sync::mpsc::{Receiver, Sender},
};
use tracing::info;

pub enum ClientMessage {
    SendPayload { payload: MessagePayload },
}

pub struct Client {
    addr: String,
    worker_tx: Sender<ClientMessage>,
    payload_rx: Receiver<MessagePayload>,
}

impl Client {
    pub fn new(
        addr: impl Into<String>,
        worker_tx: Sender<ClientMessage>,
        payload_rx: Receiver<MessagePayload>,
    ) -> Client {
        Client {
            addr: addr.into(),
            worker_tx,
            payload_rx,
        }
    }

    pub async fn run(&mut self) -> Result<(), Box<dyn std::error::Error>> {
        let socket = TcpStream::connect(&self.addr).await?;

        let handle = ProtocolActorHandle::new(socket);
        let handle_read = handle.clone();

        tokio::task::spawn(async move {
            let handle = handle_read;
            loop {
                let message = handle.read_message().await;

                info!("Received message: {:?}", message);

                // match payload bla bla
                // write something back if appliccable

                // use worker_tx to send data to main worker thread
            }
        });

        info!("before prx");
        while let Some(payload) = self.payload_rx.recv().await {
            info!("prx");
            let _ = handle.send_message(payload).await;
        }
        info!("after prx");

        // read some channel that comes passed from main to this server struct
        // and send messages yeah.
        //let message = handle.send_message(Message).await;

        Ok(())
    }
}

M emerwen-worker/src/main.rs => emerwen-worker/src/main.rs +36 -7
@@ 1,15 1,15 @@
use std::time::Duration;

use clap::Parser;
use emerwen_protocol::{MessageHeader, MessagePayload, HEADER_LENGTH};
use client::ClientMessage;
use emerwen_protocol::message::payload::MessagePayload;
use emerwen_types::{Target, TargetMethod};
use tokio::{
    io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
    net::TcpListener,
};
use tracing::{debug, error, info, level_filters::LevelFilter};
use tokio::sync::mpsc;
use tracing::{debug, info, level_filters::LevelFilter};
use tracing_subscriber::{prelude::*, EnvFilter};

mod client;

#[derive(Parser)]
#[command(version)]
struct Args {


@@ 44,6 44,11 @@ async fn main() {

    debug!("Hello, debug!");

    // Channel for sending payloads to Master via Client
    let (payload_tx, payload_rx) = mpsc::channel(64);
    // Channel for sending messages to Client via Master
    let (worker_tx, worker_rx) = mpsc::channel(64);

    let targets = vec![
        Target {
            id: 0,


@@ 68,35 73,58 @@ async fn main() {
    ];

    for target in targets {
        let payload_tx = payload_tx.clone();
        tokio::task::spawn(async move {
            info!("Starting monitoring task for target {}", target.id);

            let sleep_duration = Duration::from_millis(target.interval.into());

            loop {
                let payload;
                match target.check().await {
                    Ok((up, _response)) => {
                        if !up {
                            info!("Target {} down!", target.id);
                            payload = MessagePayload::TargetDown {
                                target_id: target.id,
                            };
                        } else {
                            payload = MessagePayload::TargetUp {
                                target_id: target.id,
                            };
                        }
                    }
                    Err(_) => {
                        // We failed to check the state, status is unknown (grey).
                        info!("Target {} unknown!", target.id);
                        payload = MessagePayload::TargetUnknown {
                            target_id: target.id,
                        };
                    }
                }

                let _ = payload_tx.send(payload).await;

                tokio::time::sleep(sleep_duration).await;
            }
        });
    }

    let mut client = client::Client::new("127.0.0.1:8000", worker_tx, payload_rx);
    let _ = client.run().await;

    loop {
        // read worker_rx, i guess
    }

    /*
    //tokio::task::spawn(async move {
    let worker_server = WorkerServer::new("127.0.0.1:8000");
    let _ = worker_server.run().await;
    //});
    */
}

/*
struct WorkerServer {
    addr: String,
}


@@ 171,3 199,4 @@ impl WorkerServer {
        }
    }
}
*/