DEVELOPMENT ENVIRONMENT

~liljamo/emerwen

ref: 1b8eda0a49f5fa733c699116c16f51898f5f06b4 emerwen/emerwen-master/src/db.rs -rw-r--r-- 4.3 KiB
1b8eda0aJonni Liljamo feat: move emerwen-proto crate out of here 4 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/*
 * Copyright (C) 2024 Jonni Liljamo <jonni@liljamo.com>
 *
 * This file is licensed under GPL-3.0-or-later, see NOTICE and LICENSE for
 * more information.
 */

use std::path::Path;

use emerwen_proto::emerwen::shared::Worker;
use redb::{Database, ReadableTable, TableDefinition, TypeName, Value};
use tokio::sync::{mpsc, oneshot};
use tracing::warn;

const WORKER_TABLE: TableDefinition<u32, WorkerType> = TableDefinition::new("workers");

pub enum DBMessage {
    ReadWorkers {
        respond_to: oneshot::Sender<Vec<Worker>>,
    },
    WriteWorker {
        respond_to: oneshot::Sender<bool>,
        worker: Worker,
    },
}

pub struct DatabaseInstance {
    receiver: mpsc::Receiver<DBMessage>,
    database: Database,
}

impl DatabaseInstance {
    fn new(receiver: mpsc::Receiver<DBMessage>, database: Database) -> DatabaseInstance {
        DatabaseInstance { receiver, database }
    }

    async fn run(&mut self) -> Result<(), Box<dyn std::error::Error>> {
        while let Some(msg) = self.receiver.recv().await {
            match msg {
                DBMessage::ReadWorkers { respond_to } => {
                    // FIXME: handle errors, offload to a generic function
                    let read_txn = self.database.begin_read().unwrap();
                    let table = match read_txn.open_table(WORKER_TABLE) {
                        Ok(table) => Some(table),
                        Err(redb::TableError::TableDoesNotExist(table)) => {
                            warn!(message="Table does not exist, assuming empty", %table);
                            None
                        }
                        Err(_) => {
                            // FIXME: handle
                            None
                        }
                    };

                    let mut workers = vec![];
                    if let Some(table) = table {
                        for a in table.iter().unwrap() {
                            workers.push(a.unwrap().1.value());
                        }
                    }

                    let _ = respond_to.send(workers);
                }
                DBMessage::WriteWorker { respond_to, worker } => {
                    let write_txn = self.database.begin_write().unwrap();
                    {
                        let mut table = write_txn.open_table(WORKER_TABLE).unwrap();
                        table.insert(worker.id, worker)?;
                    }
                    write_txn.commit()?;

                    let _ = respond_to.send(true);
                }
            }
        }

        Ok(())
    }
}

#[derive(Clone)]
pub struct DatabaseHandle {
    sender: mpsc::Sender<DBMessage>,
}

impl DatabaseHandle {
    pub fn new(database_file: &Path) -> Result<DatabaseHandle, Box<dyn std::error::Error>> {
        let database = match database_file.exists() {
            true => Database::open(database_file)?,
            false => Database::create(database_file)?,
        };

        let (sender, receiver) = mpsc::channel(64);
        let mut actor = DatabaseInstance::new(receiver, database);
        tokio::spawn(async move {
            let _ = actor.run().await;
        });

        Ok(DatabaseHandle { sender })
    }

    pub async fn read_workers(&self) -> Vec<Worker> {
        let (tx, rx) = oneshot::channel();
        let msg = DBMessage::ReadWorkers { respond_to: tx };

        let _ = self.sender.send(msg).await;
        rx.await.expect("actor task has been killed") // FIXME: handle
    }

    pub async fn write_worker(&self, worker: Worker) -> bool {
        let (tx, rx) = oneshot::channel();
        let msg = DBMessage::WriteWorker {
            respond_to: tx,
            worker,
        };

        let _ = self.sender.send(msg).await;
        rx.await.expect("actor task has been killed") // FIXME: handle
    }
}

#[derive(Debug)]
struct WorkerType;

impl Value for WorkerType {
    type SelfType<'a> = Worker;
    type AsBytes<'a> = Vec<u8>;

    fn fixed_width() -> Option<usize> {
        None
    }

    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
    where
        Self: 'a,
    {
        bincode::deserialize(data).unwrap()
    }

    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
    where
        Self: 'a,
        Self: 'b,
    {
        bincode::serialize(value).unwrap()
    }

    fn type_name() -> TypeName {
        TypeName::new("emerwen_worker")
    }
}