DEVELOPMENT ENVIRONMENT

~liljamo/emerwen

ref: 91b99511153d9e5adb1ffba835cb0706a54676f0 emerwen/emerwen-master/src/db.rs -rw-r--r-- 4.3 KiB
91b99511Jonni Liljamo feat(master): change to protocol Worker type 9 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/*
 * Copyright (C) 2024 Jonni Liljamo <jonni@liljamo.com>
 *
 * This file is licensed under GPL-3.0-or-later, see NOTICE and LICENSE for
 * more information.
 */

use std::path::Path;

use emerwen_protocol::emerwen::shared::Worker;
use redb::{Database, ReadableTable, TableDefinition, TypeName, Value};
use tokio::sync::{mpsc, oneshot};
use tracing::warn;

const WORKER_TABLE: TableDefinition<u32, WorkerType> = TableDefinition::new("workers");

pub enum DBMessage {
    ReadWorkers {
        respond_to: oneshot::Sender<Vec<Worker>>,
    },
    WriteWorker {
        respond_to: oneshot::Sender<bool>,
        worker: Worker,
    },
}

pub struct DatabaseInstance {
    receiver: mpsc::Receiver<DBMessage>,
    database: Database,
}

impl DatabaseInstance {
    fn new(receiver: mpsc::Receiver<DBMessage>, database: Database) -> DatabaseInstance {
        DatabaseInstance { receiver, database }
    }

    async fn run(&mut self) -> Result<(), Box<dyn std::error::Error>> {
        while let Some(msg) = self.receiver.recv().await {
            match msg {
                DBMessage::ReadWorkers { respond_to } => {
                    // FIXME: handle errors, offload to a generic function
                    let read_txn = self.database.begin_read().unwrap();
                    let table = match read_txn.open_table(WORKER_TABLE) {
                        Ok(table) => Some(table),
                        Err(redb::TableError::TableDoesNotExist(table)) => {
                            warn!(message="Table does not exist, assuming empty", %table);
                            None
                        }
                        Err(_) => {
                            // FIXME: handle
                            None
                        }
                    };

                    let mut workers = vec![];
                    if let Some(table) = table {
                        for a in table.iter().unwrap() {
                            workers.push(a.unwrap().1.value());
                        }
                    }

                    let _ = respond_to.send(workers);
                }
                DBMessage::WriteWorker { respond_to, worker } => {
                    let write_txn = self.database.begin_write().unwrap();
                    {
                        let mut table = write_txn.open_table(WORKER_TABLE).unwrap();
                        table.insert(worker.id, worker)?;
                    }
                    write_txn.commit()?;

                    let _ = respond_to.send(true);
                }
            }
        }

        Ok(())
    }
}

#[derive(Clone)]
pub struct DatabaseHandle {
    sender: mpsc::Sender<DBMessage>,
}

impl DatabaseHandle {
    pub fn new(database_file: &Path) -> Result<DatabaseHandle, Box<dyn std::error::Error>> {
        let database = match database_file.exists() {
            true => Database::open(database_file)?,
            false => Database::create(database_file)?,
        };

        let (sender, receiver) = mpsc::channel(64);
        let mut actor = DatabaseInstance::new(receiver, database);
        tokio::spawn(async move {
            let _ = actor.run().await;
        });

        Ok(DatabaseHandle { sender })
    }

    pub async fn read_workers(&self) -> Vec<Worker> {
        let (tx, rx) = oneshot::channel();
        let msg = DBMessage::ReadWorkers { respond_to: tx };

        let _ = self.sender.send(msg).await;
        rx.await.expect("actor task has been killed") // FIXME: handle
    }

    pub async fn write_worker(&self, worker: Worker) -> bool {
        let (tx, rx) = oneshot::channel();
        let msg = DBMessage::WriteWorker {
            respond_to: tx,
            worker,
        };

        let _ = self.sender.send(msg).await;
        rx.await.expect("actor task has been killed") // FIXME: handle
    }
}

#[derive(Debug)]
struct WorkerType;

impl Value for WorkerType {
    type SelfType<'a> = Worker;
    type AsBytes<'a> = Vec<u8>;

    fn fixed_width() -> Option<usize> {
        None
    }

    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
    where
        Self: 'a,
    {
        bincode::deserialize(data).unwrap()
    }

    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
    where
        Self: 'a,
        Self: 'b,
    {
        bincode::serialize(value).unwrap()
    }

    fn type_name() -> TypeName {
        TypeName::new("emerwen_worker")
    }
}