1
use core::fmt;
2
use std::borrow::Borrow;
3
use std::cell::RefCell;
4
use std::mem::ManuallyDrop;
5
use std::sync::Arc;
6

            
7
use log::trace;
8

            
9
use mcrl2_sys::atermpp::ffi;
10
use mcrl2_sys::cxx::Exception;
11
use mcrl2_sys::cxx::UniquePtr;
12
use utilities::protection_set::ProtectionSet;
13

            
14
use crate::aterm::ATerm;
15
use crate::aterm::BfTermPoolThreadWrite;
16
use crate::aterm::Symbol;
17
use crate::data::BoolSort;
18
use crate::data::DataExpression;
19

            
20
use super::global_aterm_pool::mark_protection_sets;
21
use super::global_aterm_pool::protection_set_size;
22
use super::global_aterm_pool::ATermPtr;
23
use super::global_aterm_pool::SharedContainerProtectionSet;
24
use super::global_aterm_pool::SharedProtectionSet;
25
use super::global_aterm_pool::GLOBAL_TERM_POOL;
26
use super::ATermRef;
27
use super::Markable;
28
use super::SymbolRef;
29

            
30
/// The number of times before garbage collection is tested again.
31
const TEST_GC_INTERVAL: usize = 100;
32

            
33
thread_local! {
34
    /// This is the thread specific term pool that manages the protection sets.
35
    pub(crate) static THREAD_TERM_POOL: RefCell<ThreadTermPool> = RefCell::new(ThreadTermPool::new());
36
}
37

            
38
pub struct ThreadTermPool {
39
    protection_set: SharedProtectionSet,
40
    container_protection_set: SharedContainerProtectionSet,
41

            
42
    /// The index of the thread term pool in the list of thread pools.
43
    index: usize,
44

            
45
    /// Function symbols to represent 'DataAppl' with any number of arguments.
46
    data_appl: Vec<Symbol>,
47

            
48
    /// A counter used to periodically trigger a garbage collection test.
49
    /// This is a stupid hack, but we need to periodically test for garbage collection and this is only allowed outside of a shared
50
    /// lock section. Therefore, we count (arbitrarily) to reduce the amount of this is checked.
51
    gc_counter: usize,
52

            
53
    _callback: ManuallyDrop<UniquePtr<ffi::tls_callback_container>>,
54
}
55

            
56
/// Protects the given aterm address and returns the term.
57
///     - guard: An existing guard to the ThreadTermPool.protection_set.
58
///     - index: The index of the ThreadTermPool
59
302220841
fn protect_with(
60
302220841
    mut guard: BfTermPoolThreadWrite<'_, ProtectionSet<ATermPtr>>,
61
302220841
    gc_counter: &mut usize,
62
302220841
    index: usize,
63
302220841
    term: *const ffi::_aterm,
64
302220841
) -> ATerm {
65
302220841
    debug_assert!(!term.is_null(), "Can only protect valid terms");
66
302220841
    let aterm = ATermPtr::new(term);
67
302220841
    let root = guard.protect(aterm.clone());
68
302220841

            
69
302220841
    let term = ATermRef::new(term);
70
302220841
    trace!("Protected term {:?}, index {}, protection set {}", term, root, index,);
71

            
72
302220841
    let result = ATerm::new(term, root);
73
302220841

            
74
302220841
    // Test for garbage collection intermediately.
75
302220841
    *gc_counter = gc_counter.saturating_sub(1);
76
302220841

            
77
302220841
    if guard.unlock() && *gc_counter == 0 {
78
2888198
        ffi::test_garbage_collection();
79
2888198
        *gc_counter = TEST_GC_INTERVAL;
80
299332643
    }
81

            
82
302220841
    result
83
302220841
}
84

            
85
impl ThreadTermPool {
86
418
    pub fn new() -> ThreadTermPool {
87
418
        // Register a protection set into the global set.
88
418
        let (protection_set, container_protection_set, index) = GLOBAL_TERM_POOL.lock().register_thread_term_pool();
89
418

            
90
418
        ThreadTermPool {
91
418
            protection_set,
92
418
            container_protection_set,
93
418
            index,
94
418
            gc_counter: TEST_GC_INTERVAL,
95
418
            data_appl: vec![],
96
418
            _callback: ManuallyDrop::new(ffi::register_mark_callback(mark_protection_sets, protection_set_size)),
97
418
        }
98
418
    }
99

            
100
    /// Protects the given aterm address and returns the term.
101
251730217
    pub fn protect(&mut self, term: *const ffi::_aterm) -> ATerm {
102
251730217
        unsafe {
103
251730217
            protect_with(
104
251730217
                self.protection_set.write_exclusive(),
105
251730217
                &mut self.gc_counter,
106
251730217
                self.index,
107
251730217
                term,
108
251730217
            )
109
251730217
        }
110
251730217
    }
111

            
112
    /// Protects the given aterm address and returns the term.
113
3137545
    pub fn protect_container(&mut self, container: Arc<dyn Markable + Send + Sync>) -> usize {
114
3137545
        let root = unsafe { self.container_protection_set.write_exclusive().protect(container) };
115
3137545

            
116
3137545
        trace!("Protected container index {}, protection set {}", root, self.index,);
117

            
118
3137545
        root
119
3137545
    }
120

            
121
    /// Removes the [ATerm] from the protection set.
122
302220841
    pub fn drop(&mut self, term: &ATerm) {
123
302220841
        term.require_valid();
124
302220841

            
125
302220841
        unsafe {
126
302220841
            let mut protection_set = self.protection_set.write_exclusive();
127
302220841
            trace!(
128
                "Dropped term {:?}, index {}, protection set {}",
129
                term.term,
130
                term.root,
131
                self.index
132
            );
133
302220841
            protection_set.unprotect(term.root);
134
302220841
        }
135
302220841
    }
136

            
137
    /// Removes the container from the protection set.
138
3137545
    pub fn drop_container(&mut self, container_root: usize) {
139
3137545
        unsafe {
140
3137545
            let mut container_protection_set = self.container_protection_set.write_exclusive();
141
3137545
            trace!(
142
                "Dropped container index {}, protection set {}",
143
                container_root,
144
                self.index
145
            );
146
3137545
            container_protection_set.unprotect(container_root);
147
3137545
        }
148
3137545
    }
149

            
150
    /// Returns true iff the given term is a data application.
151
685791688
    pub fn is_data_application(&mut self, term: &ATermRef<'_>) -> bool {
152
685791688
        let symbol = term.get_head_symbol();
153
        // It can be that data_applications are created without create_data_application in the mcrl2 ffi.
154
685792448
        while self.data_appl.len() <= symbol.arity() {
155
760
            let symbol = Symbol::take(ffi::create_function_symbol(
156
760
                String::from("DataAppl"),
157
760
                self.data_appl.len(),
158
760
            ));
159
760
            self.data_appl.push(symbol);
160
760
        }
161

            
162
685791688
        symbol == self.data_appl[symbol.arity()].copy()
163
685791688
    }
164
}
165

            
166
impl Default for ThreadTermPool {
167
    fn default() -> Self {
168
        ThreadTermPool::new()
169
    }
170
}
171

            
172
impl Drop for ThreadTermPool {
173
418
    fn drop(&mut self) {
174
418
        debug_assert!(
175
418
            self.protection_set.read().len() == 0,
176
            "The protection set should be empty"
177
        );
178

            
179
418
        GLOBAL_TERM_POOL.lock().drop_thread_term_pool(self.index);
180
418

            
181
418
        #[cfg(not(target_os = "macos"))]
182
418
        unsafe {
183
418
            ManuallyDrop::drop(&mut self._callback);
184
418
        }
185
418
    }
186
}
187

            
188
/// This is the thread local term pool.
189
pub struct TermPool {
190
    arguments: Vec<*const ffi::_aterm>,
191
    true_term: DataExpression,
192
}
193

            
194
impl TermPool {
195
411
    pub fn new() -> TermPool {
196
411
        TermPool {
197
411
            arguments: vec![],
198
411
            true_term: BoolSort::true_term(),
199
411
        }
200
411
    }
201

            
202
    /// This does not necessarily belong here, but we need a central storage of default terms.
203
2450736
    pub fn true_term(&self) -> &DataExpression {
204
2450736
        &self.true_term
205
2450736
    }
206

            
207
    /// Trigger a garbage collection explicitly.
208
202
    pub fn collect(&mut self) {
209
202
        ffi::collect_garbage();
210
202
    }
211

            
212
    /// Creates an ATerm from a string.
213
437
    pub fn from_string(&mut self, text: &str) -> Result<ATerm, Exception> {
214
437
        match ffi::aterm_from_string(String::from(text)) {
215
437
            Ok(term) => Ok(term.into()),
216
            Err(exception) => Err(exception),
217
        }
218
437
    }
219

            
220
    /// Creates an [ATerm] with the given symbol and arguments.
221
11094661
    pub fn create<'a, 'b>(
222
11094661
        &mut self,
223
11094661
        symbol: &impl Borrow<SymbolRef<'a>>,
224
11094661
        arguments: &[impl Borrow<ATermRef<'b>>],
225
11094661
    ) -> ATerm {
226
11094661
        // Copy the arguments to make a slice.
227
11094661
        self.arguments.clear();
228
43196179
        for arg in arguments {
229
32101518
            unsafe {
230
32101518
                self.arguments.push(arg.borrow().get());
231
32101518
            }
232
        }
233

            
234
11094661
        debug_assert_eq!(
235
11094661
            symbol.borrow().arity(),
236
11094661
            self.arguments.len(),
237
            "Number of arguments does not match arity"
238
        );
239

            
240
11094661
        let result = THREAD_TERM_POOL.with_borrow_mut(|tp| {
241
11094661
            unsafe {
242
11094661
                // ThreadPool is not Sync, so only one has access.
243
11094661
                let protection_set = tp.protection_set.write_exclusive();
244
11094661
                let term: *const ffi::_aterm = ffi::create_aterm(symbol.borrow().address(), &self.arguments);
245
11094661
                protect_with(protection_set, &mut tp.gc_counter, tp.index, term)
246
11094661
            }
247
11094661
        });
248
11094661

            
249
11094661
        result
250
11094661
    }
251

            
252
    /// Creates an [ATerm] with the given symbol, head argument and other arguments.
253
30870447
    pub fn create_data_application<'a, 'b>(
254
30870447
        &mut self,
255
30870447
        head: &impl Borrow<ATermRef<'a>>,
256
30870447
        arguments: &[impl Borrow<ATermRef<'b>>],
257
30870447
    ) -> ATerm {
258
        // Make the temp vector sufficient length.
259
32039368
        while self.arguments.len() < arguments.len() {
260
1168921
            self.arguments.push(std::ptr::null());
261
1168921
        }
262

            
263
30870447
        self.arguments.clear();
264
30870447
        unsafe {
265
30870447
            self.arguments.push(head.borrow().get());
266
74195522
            for arg in arguments {
267
43325075
                self.arguments.push(arg.borrow().get());
268
43325075
            }
269
        }
270

            
271
30870447
        THREAD_TERM_POOL.with_borrow_mut(|tp| {
272
30871136
            while tp.data_appl.len() <= arguments.len() + 1 {
273
689
                let symbol = self.create_symbol("DataAppl", tp.data_appl.len());
274
689
                tp.data_appl.push(symbol);
275
689
            }
276

            
277
30870447
            let symbol = &tp.data_appl[arguments.len() + 1];
278
30870447

            
279
30870447
            debug_assert_eq!(
280
30870447
                symbol.arity(),
281
30870447
                self.arguments.len(),
282
                "Number of arguments does not match arity"
283
            );
284

            
285
30870447
            let result = unsafe {
286
30870447
                // ThreadPool is not Sync, so only one has access.
287
30870447
                let protection_set = tp.protection_set.write_exclusive();
288
30870447
                let term: *const ffi::_aterm = ffi::create_aterm(symbol.address(), &self.arguments);
289
30870447
                protect_with(protection_set, &mut tp.gc_counter, tp.index, term)
290
30870447
            };
291
30870447

            
292
30870447
            result
293
30870447
        })
294
30870447
    }
295

            
296
    /// Creates a function symbol with the given name and arity.
297
52056
    pub fn create_symbol(&mut self, name: &str, arity: usize) -> Symbol {
298
52056
        Symbol::take(ffi::create_function_symbol(String::from(name), arity))
299
52056
    }
300

            
301
    /// Creates a term with the FFI while taking care of the protection and garbage collection.
302
133210
    pub fn create_with<F>(&mut self, create: F) -> ATerm
303
133210
    where
304
133210
        F: Fn() -> *const ffi::_aterm,
305
133210
    {
306
133210
        let result = THREAD_TERM_POOL.with_borrow_mut(|tp| {
307
133210
            unsafe {
308
133210
                // ThreadPool is not Sync, so only one has access.
309
133210
                let protection_set = tp.protection_set.write_exclusive();
310
133210
                protect_with(protection_set, &mut tp.gc_counter, tp.index, create())
311
133210
            }
312
133210
        });
313
133210

            
314
133210
        result
315
133210
    }
316
}
317

            
318
impl Default for TermPool {
319
    fn default() -> Self {
320
        Self::new()
321
    }
322
}
323

            
324
impl fmt::Display for TermPool {
325
6
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
326
6
        // TODO: This will always print, but only depends on aterm_configuration.h
327
6
        ffi::print_metrics();
328
6

            
329
6
        write!(f, "{:?}", GLOBAL_TERM_POOL.lock())
330
6
    }
331
}
332

            
333
#[cfg(test)]
334
mod tests {
335
    use std::thread;
336

            
337
    use rand::rngs::StdRng;
338
    use rand::Rng;
339
    use rand::SeedableRng;
340
    use test_log::test;
341

            
342
    use crate::aterm::random_term;
343

            
344
    use super::*;
345

            
346
    /// Make sure that the term has the same number of arguments as its arity.
347
200
    fn verify_term(term: &ATermRef<'_>) {
348
2840
        for subterm in term.iter() {
349
2840
            assert_eq!(
350
2840
                subterm.get_head_symbol().arity(),
351
2840
                subterm.arguments().len(),
352
                "The arity matches the number of arguments."
353
            )
354
        }
355
200
    }
356

            
357
1
    #[test]
358
    fn test_thread_aterm_pool_parallel() {
359
        let seed: u64 = rand::rng().random();
360
        println!("seed: {}", seed);
361

            
362
1
        thread::scope(|s| {
363
3
            for _ in 0..2 {
364
2
                s.spawn(|| {
365
2
                    let mut tp = TermPool::new();
366
2

            
367
2
                    let mut rng = StdRng::seed_from_u64(seed);
368
2
                    let terms: Vec<ATerm> = (0..100)
369
200
                        .map(|_| {
370
200
                            random_term(
371
200
                                &mut tp,
372
200
                                &mut rng,
373
200
                                &[("f".to_string(), 2)],
374
200
                                &["a".to_string(), "b".to_string()],
375
200
                                10,
376
200
                            )
377
200
                        })
378
2
                        .collect();
379
2

            
380
2
                    tp.collect();
381

            
382
202
                    for term in &terms {
383
200
                        verify_term(term);
384
200
                    }
385
2
                });
386
2
            }
387
1
        });
388
    }
389
}