16
16
17
17
package io .apicurio .registry .utils ;
18
18
19
+ import org .slf4j .Logger ;
20
+ import org .slf4j .LoggerFactory ;
21
+
22
+ import java .time .Duration ;
23
+ import java .time .Instant ;
19
24
import java .util .Map ;
25
+ import java .util .Map .Entry ;
20
26
import java .util .concurrent .ConcurrentHashMap ;
27
+ import java .util .concurrent .atomic .AtomicBoolean ;
21
28
import java .util .function .Function ;
29
+ import java .util .stream .Collectors ;
22
30
23
31
/**
24
32
* @author Fabian Martinez
33
+ * @author Jakub Senko <m@jsenko.net>
25
34
*/
26
35
public class CheckPeriodCache <K , V > {
27
36
28
- private Map <K , CheckValue <V >> cache = new ConcurrentHashMap <>();
29
- private long checkPeriodMillis = 0 ;
37
+ private static final Logger log = LoggerFactory .getLogger (CheckPeriodCache .class );
38
+
39
+ private final ConcurrentHashMap <K , CheckValue <V >> cache = new ConcurrentHashMap <>();
40
+
41
+ private final Duration checkPeriod ;
42
+
43
+ private final AtomicBoolean evicting = new AtomicBoolean (false );
44
+ private final int evictionThreshold ;
45
+
46
+ public CheckPeriodCache (Duration checkPeriod , int evictionThreshold ) {
47
+ this .checkPeriod = checkPeriod ;
48
+ this .evictionThreshold = evictionThreshold ;
49
+ }
50
+
51
+ public CheckPeriodCache (Duration checkPeriod ) {
52
+ this .checkPeriod = checkPeriod ;
53
+ this .evictionThreshold = 1000 ;
54
+ }
30
55
31
- public CheckPeriodCache (long checkPeriodMillis ) {
32
- this .checkPeriodMillis = checkPeriodMillis ;
56
+ private boolean isExpired (CheckValue <?> checkedValue ) {
57
+ return Instant .now ().isAfter (checkedValue .lastUpdate .plus (checkPeriod ));
58
+ }
59
+
60
+ private void checkEviction () {
61
+ final int currentSize = cache .size ();
62
+ if (currentSize > evictionThreshold ) {
63
+ if (evicting .compareAndSet (false , true )) {
64
+ try {
65
+ // This thread gets to evict
66
+ log .debug ("Thread {} is evicting the cache. Current size is {}, threshold is {}." ,
67
+ Thread .currentThread ().getName (), currentSize , evictionThreshold );
68
+ var toEvict = cache .entrySet ().stream ()
69
+ .filter (entry -> isExpired (entry .getValue ()))
70
+ .map (Entry ::getKey )
71
+ .collect (Collectors .toList ());
72
+
73
+ log .debug ("Thread {} is evicting the cache. Found {} candidates for eviction." ,
74
+ Thread .currentThread ().getName (), toEvict .size ());
75
+ toEvict .forEach (k -> {
76
+ cache .compute (k , (key , value ) -> {
77
+ if (value == null || isExpired (value )) {
78
+ return null ;
79
+ } else {
80
+ return value ;
81
+ }
82
+ });
83
+ });
84
+
85
+ log .debug ("Thread {} has finished evicting the cache. The new size is {}." ,
86
+ Thread .currentThread ().getName (), cache .size ());
87
+ } finally {
88
+ evicting .set (false );
89
+ }
90
+ }
91
+ }
33
92
}
34
93
35
94
public V compute (K k , Function <K , V > remappingFunction ) {
95
+ checkEviction ();
36
96
CheckValue <V > returnValue = cache .compute (k , (key , checkedValue ) -> {
37
- long now = System . currentTimeMillis ();
38
- if ( checkedValue == null ) {
97
+ if ( checkedValue == null || isExpired ( checkedValue )) {
98
+ // Only execute if expired, but do it first in case an exception is thrown
39
99
V value = remappingFunction .apply (key );
40
- return new CheckValue <>(now , value );
100
+ return new CheckValue <>(Instant . now () , value );
41
101
} else {
42
- if (checkedValue .lastUpdate + checkPeriodMillis < now ) {
43
- V value = remappingFunction .apply (key );
44
- checkedValue .lastUpdate = now ;
45
- checkedValue .value = value ;
46
- }
47
102
return checkedValue ;
48
103
}
49
104
});
50
105
return returnValue .value ;
51
106
}
52
107
53
108
public void put (K k , V v ) {
54
- cache .put (k , new CheckValue <>(System .currentTimeMillis (), v ));
109
+ checkEviction ();
110
+ cache .put (k , new CheckValue <>(Instant .now (), v ));
55
111
}
56
112
57
113
public V get (K k ) {
58
114
CheckValue <V > value = cache .compute (k , (key , checkedValue ) -> {
59
- if (checkedValue == null ) {
115
+ if (checkedValue == null || isExpired ( checkedValue ) ) {
60
116
return null ;
61
117
} else {
62
- long now = System .currentTimeMillis ();
63
- if (checkedValue .lastUpdate + checkPeriodMillis < now ) {
64
- //value expired
65
- return null ;
66
- } else {
67
- return checkedValue ;
68
- }
118
+ return checkedValue ;
69
119
}
70
120
});
71
121
return value == null ? null : value .value ;
@@ -79,15 +129,25 @@ public void clear() {
79
129
cache .clear ();
80
130
}
81
131
82
- private static class CheckValue <V > {
132
+ /**
133
+ * USE FOR TEST ONLY
134
+ */
135
+ Map <K , CheckValue <V >> getInternal () {
136
+ return cache ;
137
+ }
138
+
139
+ int size () {
140
+ return cache .size ();
141
+ }
83
142
84
- CheckValue (long ts , V value ) {
85
- this .lastUpdate = ts ;
143
+ static class CheckValue <V > {
144
+
145
+ CheckValue (Instant lastUpdate , V value ) {
146
+ this .lastUpdate = lastUpdate ;
86
147
this .value = value ;
87
148
}
88
149
89
- long lastUpdate ;
150
+ Instant lastUpdate ;
90
151
V value ;
91
152
}
92
-
93
153
}
0 commit comments