DEVELOPMENT ENVIRONMENT

~liljamo/emerwen

ref: 99bece8be8f11baece0ce10934feecb5d0f51863 emerwen/emerwen-master/src/db.rs -rw-r--r-- 3.7 KiB
99bece8bJonni Liljamo feat(protocol): init WebToMaster service 9 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/*
 * Copyright (C) 2024 Jonni Liljamo <jonni@liljamo.com>
 *
 * This file is licensed under GPL-3.0-or-later, see NOTICE and LICENSE for
 * more information.
 */

use std::path::Path;

use redb::{Database, ReadableTable, TableDefinition};
use tokio::sync::{mpsc, oneshot};
use tracing::warn;

use crate::worker::Worker;

const WORKER_TABLE: TableDefinition<u32, Worker> = TableDefinition::new("workers");

pub enum DBMessage {
    ReadWorkers {
        respond_to: oneshot::Sender<Vec<Worker>>,
    },
    WriteWorker {
        respond_to: oneshot::Sender<bool>,
        worker: Worker,
    },
}

pub struct DatabaseInstance {
    receiver: mpsc::Receiver<DBMessage>,
    database: Database,
}

impl DatabaseInstance {
    fn new(receiver: mpsc::Receiver<DBMessage>, database: Database) -> DatabaseInstance {
        DatabaseInstance { receiver, database }
    }

    async fn run(&mut self) -> Result<(), Box<dyn std::error::Error>> {
        while let Some(msg) = self.receiver.recv().await {
            match msg {
                DBMessage::ReadWorkers { respond_to } => {
                    // FIXME: handle errors, offload to a generic function
                    let read_txn = self.database.begin_read().unwrap();
                    let table = match read_txn.open_table(WORKER_TABLE) {
                        Ok(table) => Some(table),
                        Err(redb::TableError::TableDoesNotExist(table)) => {
                            warn!(message="Table does not exist, assuming empty", %table);
                            None
                        }
                        Err(_) => {
                            // FIXME: handle
                            None
                        }
                    };

                    let mut workers = vec![];
                    if let Some(table) = table {
                        for a in table.iter().unwrap() {
                            workers.push(a.unwrap().1.value());
                        }
                    }

                    let _ = respond_to.send(workers);
                }
                DBMessage::WriteWorker { respond_to, worker } => {
                    let write_txn = self.database.begin_write().unwrap();
                    {
                        let mut table = write_txn.open_table(WORKER_TABLE).unwrap();
                        table.insert(worker.id, worker)?;
                    }
                    write_txn.commit()?;

                    let _ = respond_to.send(true);
                }
            }
        }

        Ok(())
    }
}

#[derive(Clone)]
pub struct DatabaseHandle {
    sender: mpsc::Sender<DBMessage>,
}

impl DatabaseHandle {
    pub fn new(database_file: &Path) -> Result<DatabaseHandle, Box<dyn std::error::Error>> {
        let database = match database_file.exists() {
            true => Database::open(database_file)?,
            false => Database::create(database_file)?,
        };

        let (sender, receiver) = mpsc::channel(64);
        let mut actor = DatabaseInstance::new(receiver, database);
        tokio::spawn(async move {
            let _ = actor.run().await;
        });

        Ok(DatabaseHandle { sender })
    }

    pub async fn read_workers(&self) -> Vec<Worker> {
        let (tx, rx) = oneshot::channel();
        let msg = DBMessage::ReadWorkers { respond_to: tx };

        let _ = self.sender.send(msg).await;
        rx.await.expect("actor task has been killed") // FIXME: handle
    }

    pub async fn write_worker(&self, worker: Worker) -> bool {
        let (tx, rx) = oneshot::channel();
        let msg = DBMessage::WriteWorker {
            respond_to: tx,
            worker,
        };

        let _ = self.sender.send(msg).await;
        rx.await.expect("actor task has been killed") // FIXME: handle
    }
}