DEVELOPMENT ENVIRONMENT

~liljamo/emerwen

ref: initial-tcp-experimentation emerwen/emerwen-protocol/src/lib.rs -rw-r--r-- 3.5 KiB
1225af8eJonni Liljamo feat: more experimentation 13 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
pub mod message;

use message::{
    header::{MessageHeader, HEADER_LENGTH},
    payload::MessagePayload,
    Message,
};
use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    net::TcpStream,
    sync::{mpsc, oneshot},
};
use tracing::{error, info};

enum ProtocolMessage {
    ReadMessage {
        respond_to: oneshot::Sender<MessagePayload>,
    },
    SendMessage {
        payload: MessagePayload,
    },
}

struct ProtocolActor {
    receiver: mpsc::Receiver<ProtocolMessage>,
    stream: TcpStream,
}

impl ProtocolActor {
    fn new(receiver: mpsc::Receiver<ProtocolMessage>, stream: TcpStream) -> ProtocolActor {
        ProtocolActor { receiver, stream }
    }

    pub async fn run(&mut self) {
        info!("enter run");
        while let Some(msg) = self.receiver.recv().await {
            info!("handle in run");
            self.handle_message(msg).await;
        }
        info!("exit run");
    }

    async fn handle_message(&mut self, msg: ProtocolMessage) {
        match msg {
            ProtocolMessage::ReadMessage { respond_to } => {
                let mut header_bytes = [0; HEADER_LENGTH];
                if self.stream.read_exact(&mut header_bytes).await.is_err() {
                    //break;
                }
                let header = match MessageHeader::decode(header_bytes) {
                    Ok(header) => header,
                    Err(e) => {
                        error!("Error reading header: {}", e.to_string());
                        //continue;
                        return;
                    }
                };

                let mut payload_bytes = vec![0; header.payload_length as usize];
                if self.stream.read_exact(&mut payload_bytes).await.is_err() {
                    //break;
                }
                let mut payload_reader = std::io::Cursor::new(payload_bytes);
                let payload = match MessagePayload::decode(header.payload_type, &mut payload_reader)
                {
                    Ok(payload) => payload,
                    Err(e) => {
                        error!("Error reading payload: {}", e.to_string());
                        //continue;
                        return;
                    }
                };

                let _ = respond_to.send(payload);
            }
            ProtocolMessage::SendMessage { payload } => {
                let message_bytes = Message::new(payload).encode();

                info!("help 2?");

                let _ = self.stream.write_all(&message_bytes).await;
            }
        }
    }
}

#[derive(Clone)]
pub struct ProtocolActorHandle {
    sender: mpsc::Sender<ProtocolMessage>,
}

impl ProtocolActorHandle {
    pub fn new(stream: TcpStream) -> ProtocolActorHandle {
        let (sender, receiver) = mpsc::channel(8);
        let mut actor = ProtocolActor::new(receiver, stream);
        tokio::task::spawn(async move { actor.run().await });

        ProtocolActorHandle { sender }
    }

    pub async fn read_message(&self) -> MessagePayload {
        let (send, recv) = oneshot::channel();
        let msg = ProtocolMessage::ReadMessage { respond_to: send };

        info!("read");

        let _ = self.sender.send(msg).await;
        recv.await.unwrap() // handle, task has been killed
    }

    pub async fn send_message(
        &self,
        payload: MessagePayload,
    ) -> Result<(), Box<dyn std::error::Error>> {
        let msg = ProtocolMessage::SendMessage { payload };

        info!("send");

        let _ = self.sender.send(msg).await;

        Ok(())
    }
}