1
use std::sync::atomic::AtomicBool;
2
use std::sync::atomic::Ordering;
3
use std::sync::Arc;
4
use std::sync::Condvar;
5
use std::sync::Mutex;
6
use std::thread::Builder;
7
use std::thread::JoinHandle;
8

            
9
/// A thread that can be paused and stopped.
10
pub struct PauseableThread {
11
    handle: Option<JoinHandle<()>>,
12
    shared: Arc<PauseableThreadShared>,
13
}
14

            
15
struct PauseableThreadShared {
16
    running: AtomicBool,
17
    paused: Mutex<bool>,
18
    cond_var: Condvar,
19
}
20

            
21
impl PauseableThread {
22
    /// Spawns a new thread that runs `loop_function` continuously while enabled.
23
    ///
24
    /// The loop_function can return false to pause the thread.
25
1
    pub fn new<F>(name: &str, loop_function: F) -> Result<PauseableThread, std::io::Error>
26
1
    where
27
1
        F: Fn() -> bool + Send + 'static,
28
1
    {
29
1
        let shared = Arc::new(PauseableThreadShared {
30
1
            running: AtomicBool::new(true),
31
1
            paused: Mutex::new(false),
32
1
            cond_var: Condvar::new(),
33
1
        });
34

            
35
1
        let thread = {
36
1
            let shared = shared.clone();
37
1
            Builder::new().name(name.to_string()).spawn(move || {
38
1
                while shared.running.load(std::sync::atomic::Ordering::Relaxed) {
39
                    // Check if paused is true and wait for it.
40
                    {
41
                        let mut paused = shared.paused.lock().unwrap();
42
                        while *paused {
43
                            paused = shared.cond_var.wait(paused).unwrap();
44
                        }
45
                    }
46

            
47
                    if !loop_function() {
48
                        // Pause the thread when requested by the loop function.
49
                        *shared.paused.lock().unwrap() = true;
50
                    }
51
                }
52
1
            })
53
        }?;
54

            
55
1
        Ok(PauseableThread {
56
1
            handle: Some(thread),
57
1
            shared,
58
1
        })
59
1
    }
60

            
61
    /// Signal the thread to quit, will be joined when it is dropped.
62
2
    pub fn stop(&self) {
63
2
        self.shared.running.store(false, Ordering::Relaxed);
64
2
        self.resume();
65
2
    }
66

            
67
    /// Pause the thread on the next iteration.
68
    pub fn pause(&self) {
69
        *self.shared.paused.lock().unwrap() = true;
70
        // We notify the condvar that the value has changed.
71
        self.shared.cond_var.notify_one();
72
    }
73

            
74
    /// Resume the thread.
75
2
    pub fn resume(&self) {
76
2
        *self.shared.paused.lock().unwrap() = false;
77
2
        // We notify the condvar that the value has changed.
78
2
        self.shared.cond_var.notify_one();
79
2
    }
80
}
81

            
82
impl Drop for PauseableThread {
83
1
    fn drop(&mut self) {
84
1
        self.stop();
85

            
86
        // Joining consumes the handle
87
1
        if let Some(handle) = self.handle.take() {
88
1
            handle.join().unwrap();
89
1
        }
90
1
    }
91
}
92

            
93
#[cfg(test)]
94
mod tests {
95
    use super::*;
96

            
97
    #[test]
98
1
    fn test_pausablethread() {
99
1
        let thread = PauseableThread::new("test", move || {
100
            // Do nothing.
101
            true
102
1
        })
103
1
        .unwrap();
104
1

            
105
1
        thread.stop();
106
1
    }
107
}