1
- use std:: {
2
- sync:: atomic:: { AtomicBool , Ordering } ,
3
- thread:: ThreadId ,
4
- } ;
1
+ use std:: thread:: ThreadId ;
5
2
6
- use parking_lot:: RwLock ;
3
+ use parking_lot:: Mutex ;
7
4
8
5
use crate :: {
9
6
key:: DatabaseKeyIndex ,
10
7
runtime:: WaitResult ,
11
8
zalsa:: { MemoIngredientIndex , Zalsa } ,
12
- zalsa_local:: ZalsaLocal ,
13
9
Database ,
14
10
} ;
15
11
@@ -19,36 +15,36 @@ use super::util;
19
15
/// worker threads.
20
16
#[ derive( Default ) ]
21
17
pub ( crate ) struct SyncTable {
22
- syncs : RwLock < Vec < Option < SyncState > > > ,
18
+ syncs : Mutex < Vec < Option < SyncState > > > ,
23
19
}
24
20
25
21
struct SyncState {
26
22
id : ThreadId ,
27
23
28
24
/// Set to true if any other queries are blocked,
29
25
/// waiting for this query to complete.
30
- anyone_waiting : AtomicBool ,
26
+ anyone_waiting : bool ,
31
27
}
32
28
33
29
impl SyncTable {
30
+ #[ inline]
34
31
pub ( crate ) fn claim < ' me > (
35
32
& ' me self ,
36
- db : & ' me dyn Database ,
33
+ db : & ' me ( impl ? Sized + Database ) ,
37
34
zalsa : & ' me Zalsa ,
38
- zalsa_local : & ZalsaLocal ,
39
35
database_key_index : DatabaseKeyIndex ,
40
36
memo_ingredient_index : MemoIngredientIndex ,
41
37
) -> Option < ClaimGuard < ' me > > {
42
- let mut syncs = self . syncs . write ( ) ;
38
+ let mut syncs = self . syncs . lock ( ) ;
43
39
let thread_id = std:: thread:: current ( ) . id ( ) ;
44
40
45
41
util:: ensure_vec_len ( & mut syncs, memo_ingredient_index. as_usize ( ) + 1 ) ;
46
42
47
- match & syncs[ memo_ingredient_index. as_usize ( ) ] {
43
+ match & mut syncs[ memo_ingredient_index. as_usize ( ) ] {
48
44
None => {
49
45
syncs[ memo_ingredient_index. as_usize ( ) ] = Some ( SyncState {
50
46
id : thread_id,
51
- anyone_waiting : AtomicBool :: new ( false ) ,
47
+ anyone_waiting : false ,
52
48
} ) ;
53
49
Some ( ClaimGuard {
54
50
database_key_index,
@@ -61,16 +57,10 @@ impl SyncTable {
61
57
id : other_id,
62
58
anyone_waiting,
63
59
} ) => {
64
- // NB: `Ordering::Relaxed` is sufficient here,
65
- // as there are no loads that are "gated" on this
66
- // value. Everything that is written is also protected
67
- // by a lock that must be acquired. The role of this
68
- // boolean is to decide *whether* to acquire the lock,
69
- // not to gate future atomic reads.
70
- anyone_waiting. store ( true , Ordering :: Relaxed ) ;
60
+ * anyone_waiting = true ;
71
61
zalsa. runtime ( ) . block_on_or_unwind (
72
- db,
73
- zalsa_local,
62
+ db. as_dyn_database ( ) ,
63
+ db . zalsa_local ( ) ,
74
64
database_key_index,
75
65
* other_id,
76
66
syncs,
@@ -92,30 +82,28 @@ pub(crate) struct ClaimGuard<'me> {
92
82
}
93
83
94
84
impl ClaimGuard < ' _ > {
95
- fn remove_from_map_and_unblock_queries ( & self , wait_result : WaitResult ) {
96
- let mut syncs = self . sync_table . syncs . write ( ) ;
85
+ fn remove_from_map_and_unblock_queries ( & self ) {
86
+ let mut syncs = self . sync_table . syncs . lock ( ) ;
97
87
98
88
let SyncState { anyone_waiting, .. } =
99
89
syncs[ self . memo_ingredient_index . as_usize ( ) ] . take ( ) . unwrap ( ) ;
100
90
101
- // NB: `Ordering::Relaxed` is sufficient here,
102
- // see `store` above for explanation.
103
- if anyone_waiting. load ( Ordering :: Relaxed ) {
104
- self . zalsa
105
- . runtime ( )
106
- . unblock_queries_blocked_on ( self . database_key_index , wait_result)
91
+ if anyone_waiting {
92
+ self . zalsa . runtime ( ) . unblock_queries_blocked_on (
93
+ self . database_key_index ,
94
+ if std:: thread:: panicking ( ) {
95
+ WaitResult :: Panicked
96
+ } else {
97
+ WaitResult :: Completed
98
+ } ,
99
+ )
107
100
}
108
101
}
109
102
}
110
103
111
104
impl Drop for ClaimGuard < ' _ > {
112
105
fn drop ( & mut self ) {
113
- let wait_result = if std:: thread:: panicking ( ) {
114
- WaitResult :: Panicked
115
- } else {
116
- WaitResult :: Completed
117
- } ;
118
- self . remove_from_map_and_unblock_queries ( wait_result)
106
+ self . remove_from_map_and_unblock_queries ( )
119
107
}
120
108
}
121
109
0 commit comments