pub mod message;
use message::{
header::{MessageHeader, HEADER_LENGTH},
payload::MessagePayload,
Message,
};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
sync::{mpsc, oneshot},
};
use tracing::{error, info};
enum ProtocolMessage {
ReadMessage {
respond_to: oneshot::Sender<MessagePayload>,
},
SendMessage {
payload: MessagePayload,
},
}
struct ProtocolActor {
receiver: mpsc::Receiver<ProtocolMessage>,
stream: TcpStream,
}
impl ProtocolActor {
fn new(receiver: mpsc::Receiver<ProtocolMessage>, stream: TcpStream) -> ProtocolActor {
ProtocolActor { receiver, stream }
}
pub async fn run(&mut self) {
info!("enter run");
while let Some(msg) = self.receiver.recv().await {
info!("handle in run");
self.handle_message(msg).await;
}
info!("exit run");
}
async fn handle_message(&mut self, msg: ProtocolMessage) {
match msg {
ProtocolMessage::ReadMessage { respond_to } => {
let mut header_bytes = [0; HEADER_LENGTH];
if self.stream.read_exact(&mut header_bytes).await.is_err() {
//break;
}
let header = match MessageHeader::decode(header_bytes) {
Ok(header) => header,
Err(e) => {
error!("Error reading header: {}", e.to_string());
//continue;
return;
}
};
let mut payload_bytes = vec![0; header.payload_length as usize];
if self.stream.read_exact(&mut payload_bytes).await.is_err() {
//break;
}
let mut payload_reader = std::io::Cursor::new(payload_bytes);
let payload = match MessagePayload::decode(header.payload_type, &mut payload_reader)
{
Ok(payload) => payload,
Err(e) => {
error!("Error reading payload: {}", e.to_string());
//continue;
return;
}
};
let _ = respond_to.send(payload);
}
ProtocolMessage::SendMessage { payload } => {
let message_bytes = Message::new(payload).encode();
info!("help 2?");
let _ = self.stream.write_all(&message_bytes).await;
}
}
}
}
#[derive(Clone)]
pub struct ProtocolActorHandle {
sender: mpsc::Sender<ProtocolMessage>,
}
impl ProtocolActorHandle {
pub fn new(stream: TcpStream) -> ProtocolActorHandle {
let (sender, receiver) = mpsc::channel(8);
let mut actor = ProtocolActor::new(receiver, stream);
tokio::task::spawn(async move { actor.run().await });
ProtocolActorHandle { sender }
}
pub async fn read_message(&self) -> MessagePayload {
let (send, recv) = oneshot::channel();
let msg = ProtocolMessage::ReadMessage { respond_to: send };
info!("read");
let _ = self.sender.send(msg).await;
recv.await.unwrap() // handle, task has been killed
}
pub async fn send_message(
&self,
payload: MessagePayload,
) -> Result<(), Box<dyn std::error::Error>> {
let msg = ProtocolMessage::SendMessage { payload };
info!("send");
let _ = self.sender.send(msg).await;
Ok(())
}
}