M Cargo.lock => Cargo.lock +14 -0
@@ 103,6 103,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de"
[[package]]
+name = "byteorder"
+version = "1.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
+
+[[package]]
name = "bytes"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ 166,6 172,7 @@ version = "0.1.0"
dependencies = [
"clap",
"emerwen-protocol",
+ "emerwen-types",
"tokio",
"tracing",
"tracing-subscriber",
@@ 175,16 182,23 @@ dependencies = [
name = "emerwen-protocol"
version = "0.1.0"
dependencies = [
+ "byteorder",
+ "emerwen-types",
"thiserror",
"tokio",
]
[[package]]
+name = "emerwen-types"
+version = "0.1.0"
+
+[[package]]
name = "emerwen-worker"
version = "0.1.0"
dependencies = [
"clap",
"emerwen-protocol",
+ "emerwen-types",
"tokio",
"tracing",
"tracing-subscriber",
M Cargo.toml => Cargo.toml +6 -1
@@ 1,6 1,11 @@
[workspace]
resolver = "2"
-members = ["emerwen-master", "emerwen-protocol", "emerwen-worker"]
+members = [
+ "emerwen-master",
+ "emerwen-protocol",
+ "emerwen-types",
+ "emerwen-worker",
+]
[workspace.package]
authors = ["Jonni Liljamo <jonni@liljamo.com"]
M emerwen-master/Cargo.toml => emerwen-master/Cargo.toml +1 -0
@@ 10,6 10,7 @@ repository.workspace = true
[dependencies]
emerwen-protocol = { path = "../emerwen-protocol" }
+emerwen-types = { path = "../emerwen-types" }
clap = { version = "4", features = ["derive"] }
tokio = { version = "1", features = ["full"] }
M emerwen-protocol/Cargo.toml => emerwen-protocol/Cargo.toml +3 -0
@@ 9,5 9,8 @@ publish.workspace = true
repository.workspace = true
[dependencies]
+emerwen-types = { path = "../emerwen-types" }
+
+byteorder = "1"
thiserror = "2"
tokio = { version = "1", features = ["full"] }
A => +68 -0
@@ 0,0 1,68 @@
use crate::{DecodeError, PROTOCOL_VERSION};
pub(crate) const HEADER_LENGTH: usize = 6;
/// Message Header
///
/// Byte representation:
/// | u8 | u8 | [u8; 4] / u32 |
/// | protocol_version | payload_type | payload_length |
#[derive(Debug, PartialEq)]
pub struct MessageHeader {
protocol_version: u8,
pub payload_type: u8,
pub payload_length: u32,
}
impl MessageHeader {
pub fn new(payload_type: u8, payload_length: u32) -> MessageHeader {
MessageHeader {
protocol_version: PROTOCOL_VERSION,
payload_type,
payload_length,
}
}
pub fn encode(&self) -> [u8; HEADER_LENGTH] {
let length_bytes = self.payload_length.to_be_bytes();
[
self.protocol_version,
self.payload_type,
length_bytes[0],
length_bytes[1],
length_bytes[2],
length_bytes[3],
]
}
pub fn decode(bytes: [u8; HEADER_LENGTH]) -> Result<MessageHeader, Box<dyn std::error::Error>> {
let protocol_version = bytes[0];
if protocol_version != PROTOCOL_VERSION {
return Err(Box::new(DecodeError::InvalidVersion(
protocol_version,
PROTOCOL_VERSION,
)));
}
let payload_type = bytes[1];
// TODO: check?
let payload_length = u32::from_be_bytes([bytes[2], bytes[3], bytes[4], bytes[5]]);
Ok(MessageHeader {
protocol_version,
payload_type,
payload_length,
})
}
}
#[test]
fn test_round_trip_header() {
let sent_header = MessageHeader::new(6, 255);
let sent_header_bytes = sent_header.encode();
let received_header = MessageHeader::decode(sent_header_bytes).unwrap();
assert_eq!(sent_header, received_header);
}
M emerwen-protocol/src/lib.rs => emerwen-protocol/src/lib.rs +39 -113
@@ 1,143 1,69 @@
-use tokio::io::{AsyncRead, AsyncReadExt};
+mod header;
+use header::{MessageHeader, HEADER_LENGTH};
+
+mod payload;
+use payload::MessagePayload;
+
+pub const PROTOCOL_VERSION: u8 = 0;
#[derive(thiserror::Error, Debug)]
pub enum DecodeError {
#[error("invalid version {0} (expected {1})")]
InvalidVersion(u8, u8),
- #[error("invalid type {0}")]
- InvalidType(u8),
+ #[error("invalid paylaod type {0}")]
+ InvalidPayloadType(u8),
#[error("invalid boolean {0}")]
InvalidBoolean(u8),
}
-const PROTOCOL_VERSION: u8 = 0;
-
-/// Message
-///
-/// Byte representation:
-/// | u8 | u8 | [u8]
-/// | version | type | content bytes, structure varies per type
#[derive(Debug, PartialEq)]
-pub enum Message {
- /// Master -> Worker authentication.
- Authentication { key: String },
- /// Master -> Worker target configuration.
- //ConfigureTarget { target_id: u16, method: u8, addr: String },
- /// Worker -> Master target state change.
- TargetStateChange {
- /// ID of the target.
- target_id: u16,
- /// State the target changed to.
- ///
- /// [`true`] means up.
- /// [`false`] means down.
- state: bool,
- },
-}
-
-impl From<&Message> for u8 {
- fn from(message: &Message) -> u8 {
- match message {
- Message::Authentication { .. } => 0,
- Message::TargetStateChange { .. } => 1,
- }
- }
+pub struct Message {
+ //header: MessageHeader,
+ payload: MessagePayload,
}
impl Message {
- pub fn encode(&self) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
- let mut buf: Vec<u8> = Vec::new();
-
- buf.push(PROTOCOL_VERSION);
-
- // Message type
- buf.push(self.into());
-
- match self {
- Message::Authentication { key } => {
- // Key length
- buf.extend((key.len() as u32).to_be_bytes());
- // Key
- buf.extend(key.as_bytes());
- }
- Message::TargetStateChange { target_id, state } => {
- buf.extend(target_id.to_be_bytes());
- buf.push((*state).into());
- }
- }
-
- Ok(buf)
+ pub fn new(payload: MessagePayload) -> Message {
+ Message { payload }
}
- pub async fn decode(
- buf: &mut (impl AsyncRead + std::marker::Unpin),
- ) -> Result<Message, Box<dyn std::error::Error + Send + Sync>> {
- let version = buf.read_u8().await?;
- if version != PROTOCOL_VERSION {
- return Err(Box::new(DecodeError::InvalidVersion(
- version,
- PROTOCOL_VERSION,
- )));
- }
+ pub fn encode(&self) -> Vec<u8> {
+ let payload_type = (&self.payload).into();
+ let payload_bytes = self.payload.encode();
- let mtype = buf.read_u8().await?;
+ let header = MessageHeader::new(payload_type, payload_bytes.len() as u32);
+ let header_bytes = header.encode();
- let message;
- match mtype {
- 0 => {
- let key_length = buf.read_u32().await?;
- let mut key_bytes = vec![0; key_length as usize];
- buf.read_exact(&mut key_bytes).await?;
- message = Message::Authentication {
- key: String::from_utf8(key_bytes)?,
- };
- }
- 1 => {
- let target_id = buf.read_u16().await?;
- let state = u8_to_bool(buf.read_u8().await?)?;
+ let mut buf = vec![];
+ buf.extend(header_bytes);
+ buf.extend(payload_bytes);
+ buf
+ }
- message = Message::TargetStateChange { target_id, state };
- }
- _ => return Err(Box::new(DecodeError::InvalidType(mtype))),
- }
+ pub fn decode(buf: &mut impl std::io::Read) -> Result<Message, Box<dyn std::error::Error>> {
+ let mut header_bytes = [0; HEADER_LENGTH];
+ buf.read_exact(&mut header_bytes)?;
+ let header = MessageHeader::decode(header_bytes)?;
- Ok(message)
- }
-}
+ let mut payload_bytes = vec![0; header.payload_length as usize];
+ buf.read_exact(&mut payload_bytes)?;
+ let mut payload_reader = std::io::Cursor::new(payload_bytes);
+ let payload = MessagePayload::decode(header.payload_type, &mut payload_reader)?;
-fn u8_to_bool(n: u8) -> Result<bool, DecodeError> {
- match n {
- 0 => Ok(false),
- 1 => Ok(true),
- _ => Err(DecodeError::InvalidBoolean(n)),
+ Ok(Message { payload })
}
}
-#[tokio::test]
-async fn test_round_trip_authentication() {
- let sent_message = Message::Authentication {
+#[test]
+fn test_round_trip_message_authentication() {
+ let sent_message = Message::new(MessagePayload::Authentication {
key: "this_is_a_key".to_owned(),
- };
-
- let sent_message_bytes = sent_message.encode().unwrap();
-
- let mut reader = std::io::Cursor::new(sent_message_bytes);
- let received_message = Message::decode(&mut reader).await.unwrap();
-
- assert_eq!(sent_message, received_message);
-}
-
-#[tokio::test]
-async fn test_round_trip_targetstatechange() {
- let sent_message = Message::TargetStateChange {
- target_id: 42,
- state: true,
- };
+ });
- let sent_message_bytes = sent_message.encode().unwrap();
+ let sent_message_bytes = sent_message.encode();
let mut reader = std::io::Cursor::new(sent_message_bytes);
- let received_message = Message::decode(&mut reader).await.unwrap();
+ let received_message = Message::decode(&mut reader).unwrap();
assert_eq!(sent_message, received_message);
}
A emerwen-protocol/src/payload.rs => emerwen-protocol/src/payload.rs +124 -0
@@ 0,0 1,124 @@
+use byteorder::{BigEndian, ReadBytesExt};
+
+use crate::DecodeError;
+
+/// Message Payload
+///
+/// Byte representation:
+/// | [u8] |
+/// | content bytes, structure varies per type |
+#[derive(Debug, PartialEq)]
+pub enum MessagePayload {
+ /// Master -> Worker authentication.
+ Authentication { key: String },
+ /// Master -> Worker target configuration.
+ /*ConfigureTarget {
+ target_id: u16,
+ method: TargetMethod,
+ addr: String
+ },*/
+ /// Worker -> Master target state change.
+ TargetStateChange {
+ /// ID of the target.
+ target_id: u16,
+ /// State the target changed to.
+ ///
+ /// [`true`] means up.
+ /// [`false`] means down.
+ state: bool,
+ },
+}
+
+impl From<&MessagePayload> for u8 {
+ fn from(message: &MessagePayload) -> u8 {
+ match message {
+ MessagePayload::Authentication { .. } => 0,
+ MessagePayload::TargetStateChange { .. } => 1,
+ }
+ }
+}
+
+impl MessagePayload {
+ pub fn encode(&self) -> Vec<u8> {
+ let mut buf: Vec<u8> = Vec::new();
+
+ match self {
+ MessagePayload::Authentication { key } => {
+ // Key length
+ buf.extend((key.len() as u32).to_be_bytes());
+ // Key
+ buf.extend(key.as_bytes());
+ }
+ MessagePayload::TargetStateChange { target_id, state } => {
+ buf.extend(target_id.to_be_bytes());
+ buf.push((*state).into());
+ }
+ }
+
+ buf
+ }
+
+ pub fn decode(
+ payload_type: u8,
+ buf: &mut impl std::io::Read,
+ ) -> Result<MessagePayload, Box<dyn std::error::Error>> {
+ let payload = match payload_type {
+ 0 => {
+ let key_length = buf.read_u32::<BigEndian>()?;
+ let mut key_bytes = vec![0; key_length as usize];
+ buf.read_exact(&mut key_bytes)?;
+ MessagePayload::Authentication {
+ key: String::from_utf8(key_bytes)?,
+ }
+ }
+ 1 => {
+ let target_id = buf.read_u16::<BigEndian>()?;
+ let state = u8_to_bool(buf.read_u8()?)?;
+
+ MessagePayload::TargetStateChange { target_id, state }
+ }
+ _ => return Err(Box::new(DecodeError::InvalidPayloadType(payload_type))),
+ };
+
+ Ok(payload)
+ }
+}
+
+fn u8_to_bool(n: u8) -> Result<bool, DecodeError> {
+ match n {
+ 0 => Ok(false),
+ 1 => Ok(true),
+ _ => Err(DecodeError::InvalidBoolean(n)),
+ }
+}
+
+#[test]
+fn test_round_trip_payload_authentication() {
+ let sent_payload = MessagePayload::Authentication {
+ key: "this_is_a_key".to_owned(),
+ };
+
+ let sent_payload_bytes = sent_payload.encode();
+ let sent_payload_type = (&sent_payload).into();
+
+ let mut reader = std::io::Cursor::new(sent_payload_bytes);
+ let received_payload = MessagePayload::decode(sent_payload_type, &mut reader).unwrap();
+
+ assert_eq!(sent_payload, received_payload);
+}
+
+#[test]
+fn test_round_trip_payload_targetstatechange() {
+ let sent_payload = MessagePayload::TargetStateChange {
+ target_id: 42,
+ state: true,
+ };
+
+ let sent_payload_bytes = sent_payload.encode();
+ let sent_payload_type = (&sent_payload).into();
+
+ let mut reader = std::io::Cursor::new(sent_payload_bytes);
+ let received_payload = MessagePayload::decode(sent_payload_type, &mut reader).unwrap();
+
+ assert_eq!(sent_payload, received_payload);
+}
A emerwen-types/Cargo.toml => emerwen-types/Cargo.toml +11 -0
@@ 0,0 1,11 @@
+[package]
+name = "emerwen-types"
+version = "0.1.0"
+authors.workspace = true
+edition.workspace = true
+homepage.workspace = true
+license.workspace = true
+publish.workspace = true
+repository.workspace = true
+
+[dependencies]
A emerwen-types/src/lib.rs => emerwen-types/src/lib.rs +5 -0
@@ 0,0 1,5 @@
+#[derive(Debug, PartialEq)]
+pub enum TargetMethod {
+ Ping,
+ Get(u8),
+}
M emerwen-worker/Cargo.toml => emerwen-worker/Cargo.toml +1 -0
@@ 10,6 10,7 @@ repository.workspace = true
[dependencies]
emerwen-protocol = { path = "../emerwen-protocol" }
+emerwen-types = { path = "../emerwen-types" }
clap = { version = "4", features = ["derive"] }
tokio = { version = "1", features = ["full"] }