Skip to content

Commit cb1bc9b

Browse files
committed
fix purescript-contrib#14: add windowPut--i.e. put with strategy based on num pending puts
1 parent 0452014 commit cb1bc9b

File tree

3 files changed

+247
-4
lines changed

3 files changed

+247
-4
lines changed

src/Effect/AVar.js

+57-1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,24 @@ var AVar = function () {
5757
return cell;
5858
}
5959

60+
function putHead (queue, value) {
61+
var cell = new MutableCell(queue, value);
62+
switch (queue.size) {
63+
case 0:
64+
queue.head = cell;
65+
break;
66+
case 1:
67+
queue.last = queue.head;
68+
/* fallthrough */
69+
default:
70+
cell.next = queue.head;
71+
queue.head.prev = cell;
72+
queue.head = cell;
73+
}
74+
queue.size++;
75+
return cell;
76+
}
77+
6078
function takeLast (queue) {
6179
var cell;
6280
switch (queue.size) {
@@ -221,6 +239,10 @@ exports._newVar = function (value) {
221239
};
222240
};
223241

242+
exports._pendingPuts = function (avar) {
243+
return avar.puts.size;
244+
};
245+
224246
exports._killVar = function (util, error, avar) {
225247
return function () {
226248
if (avar.error === null) {
@@ -231,7 +253,31 @@ exports._killVar = function (util, error, avar) {
231253
};
232254
};
233255

234-
exports._putVar = function (util, value, avar, cb) {
256+
exports._initPuts = function (util, error, avar) {
257+
return function () {
258+
var p = AVar.takeLast(avar.puts);
259+
if (p !== null) {
260+
p.cb(util.left(error))();
261+
return util.just(p.value);
262+
} else {
263+
return util.nothing;
264+
}
265+
};
266+
};
267+
268+
exports._tailPuts = function (util, error, avar) {
269+
return function () {
270+
var p = AVar.takeHead(avar.puts);
271+
if (p !== null) {
272+
p.cb(util.left(error))();
273+
return util.just(p.value);
274+
} else {
275+
return util.nothing;
276+
}
277+
};
278+
};
279+
280+
exports._snocVar = function (util, value, avar, cb) {
235281
return function () {
236282
var cell = AVar.putLast(avar.puts, { cb: cb, value: value });
237283
AVar.drainVar(util, avar);
@@ -241,6 +287,16 @@ exports._putVar = function (util, value, avar, cb) {
241287
};
242288
};
243289

290+
exports._consVar = function (util, value, avar, cb) {
291+
return function () {
292+
var cell = AVar.putHead(avar.puts, { cb: cb, value: value });
293+
AVar.drainVar(util, avar);
294+
return function () {
295+
AVar.deleteCell(cell);
296+
};
297+
};
298+
};
299+
244300
exports._takeVar = function (util, avar, cb) {
245301
return function () {
246302
var cell = AVar.putLast(avar.takes, cb);

src/Effect/AVar.purs

+63-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ module Effect.AVar
77
, take
88
, tryTake
99
, put
10+
, Operation(..)
11+
, windowPut
1012
, tryPut
1113
, read
1214
, tryRead
@@ -33,6 +35,17 @@ data AVarStatus a
3335
| Filled a
3436
| Empty
3537

38+
data Operation
39+
= Ignore -- Do nothing with the queue
40+
| Fail Error -- Propagate an exception to the callback
41+
| Halt Error -- Kill the internal queue and propagate the exception
42+
| PushHead -- Push callback onto the head
43+
| PushTail -- Push callback onto the tail
44+
| DropHead Error -- Drop the head, and push onto the tail
45+
| DropTail Error -- Drop the tail, and push onto the head
46+
| SwapHead Error -- Replace the head
47+
| SwapTail Error -- Replace the tail
48+
3649
-- | Creates a new empty AVar.
3750
foreign import empty a. Effect (AVar a)
3851

@@ -50,7 +63,51 @@ kill err avar = Fn.runFn3 _killVar ffiUtil err avar
5063
-- | the AVar becomes available. Returns an effect which will remove the
5164
-- | callback from the pending queue.
5265
put a. a AVar a AVarCallback Unit Effect (Effect Unit)
53-
put value avar cb = Fn.runFn4 _putVar ffiUtil value avar cb
66+
put = windowPut (const PushTail)
67+
68+
-- | Puts a value into an AVar using a strategy determined
69+
-- | dynamically on the basis of the number of pending puts
70+
-- | (i.e. not including the current value of the AVar, if non-EMPTY).
71+
windowPut
72+
:: a
73+
. (Int -> Operation)
74+
-> a
75+
-> AVar a
76+
-> AVarCallback Unit
77+
-> Effect (Effect Unit)
78+
windowPut strategy value avar cb =
79+
case strategy (_pendingPuts avar) of
80+
-- Do nothing with the queue
81+
Ignore -> pure $ pure unit
82+
-- Propagate an exception to the callback
83+
Fail e -> do
84+
cb $ Left e
85+
pure $ pure unit
86+
-- Kill the internal queue and propagate the exception
87+
Halt e -> do
88+
kill e avar
89+
cb $ Left e
90+
pure $ pure unit
91+
-- Push callback onto the head
92+
PushHead -> Fn.runFn4 _consVar ffiUtil value avar cb
93+
-- Push callback onto the tail
94+
PushTail -> Fn.runFn4 _snocVar ffiUtil value avar cb
95+
-- Drop the head, and push onto the tail
96+
DropHead e -> do
97+
void $ Fn.runFn3 _tailPuts ffiUtil e avar
98+
Fn.runFn4 _snocVar ffiUtil value avar cb
99+
-- Drop the tail, and push onto the head
100+
DropTail e -> do
101+
void $ Fn.runFn3 _initPuts ffiUtil e avar
102+
Fn.runFn4 _consVar ffiUtil value avar cb
103+
-- Replace the head
104+
SwapHead e -> do
105+
void $ Fn.runFn3 _tailPuts ffiUtil e avar
106+
Fn.runFn4 _consVar ffiUtil value avar cb
107+
-- Replace the tail
108+
SwapTail e -> do
109+
void $ Fn.runFn3 _initPuts ffiUtil e avar
110+
Fn.runFn4 _snocVar ffiUtil value avar cb
54111

55112
-- | Attempts to synchronously fill an AVar. If the AVar is already filled,
56113
-- | this will do nothing. Returns true or false depending on if it succeeded.
@@ -101,13 +158,17 @@ isKilled = case _ of
101158

102159
foreign import _newVar a. a Effect (AVar a)
103160
foreign import _killVar a. Fn.Fn3 FFIUtil Error (AVar a) (Effect Unit)
104-
foreign import _putVar a. Fn.Fn4 FFIUtil a (AVar a) (AVarCallback Unit) (Effect (Effect Unit))
161+
foreign import _snocVar a. Fn.Fn4 FFIUtil a (AVar a) (AVarCallback Unit) (Effect (Effect Unit))
162+
foreign import _consVar a. Fn.Fn4 FFIUtil a (AVar a) (AVarCallback Unit) (Effect (Effect Unit))
163+
foreign import _initPuts a. Fn.Fn3 FFIUtil Error (AVar a) (Effect (Maybe a))
164+
foreign import _tailPuts a. Fn.Fn3 FFIUtil Error (AVar a) (Effect (Maybe a))
105165
foreign import _tryPutVar a. Fn.Fn3 FFIUtil a (AVar a) (Effect Boolean)
106166
foreign import _takeVar a. Fn.Fn3 FFIUtil (AVar a) (AVarCallback a) (Effect (Effect Unit))
107167
foreign import _tryTakeVar a. Fn.Fn2 FFIUtil (AVar a) (Effect (Maybe a))
108168
foreign import _readVar a. Fn.Fn3 FFIUtil (AVar a) (AVarCallback a) (Effect (Effect Unit))
109169
foreign import _tryReadVar a. Fn.Fn2 FFIUtil (AVar a) (Effect (Maybe a))
110170
foreign import _status a. Fn.Fn2 FFIUtil (AVar a) (Effect (AVarStatus a))
171+
foreign import _pendingPuts :: a. AVar a -> Int
111172

112173
type FFIUtil =
113174
{ left a b. a Either a b

test/Main.purs

+127-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import Prelude
44
import Effect (Effect)
55
import Effect.AVar as AVar
66
import Effect.Console (log)
7-
import Effect.Exception (error, message)
7+
import Effect.Exception (Error, error, message)
88
import Effect.Ref as Ref
99
import Data.Either (Either(..))
1010
import Data.Foldable (traverse_)
@@ -189,6 +189,126 @@ test_cancel = test "cancel" do
189189
_ ← AVar.tryPut "a" v3
190190
eq "cdfg" <$> Ref.read ref
191191

192+
putMax
193+
:: a
194+
. Int
195+
-> Error
196+
-> a
197+
-> AVar.AVar a
198+
-> AVar.AVarCallback Unit
199+
-> Effect (Effect Unit)
200+
putMax max err = AVar.windowPut go
201+
where
202+
go n
203+
| n < max = AVar.PushTail
204+
| otherwise = AVar.Fail err
205+
206+
test_max1_put_take Effect Unit
207+
test_max1_put_take = test "max1: put/take" do
208+
ref ← Ref.new ""
209+
var ← AVar.empty
210+
let
211+
err =
212+
error "max puts exceeded"
213+
_ ← putMax 1 err "foo" var $ traverse_ \_ →
214+
void $ Ref.modify (_ <> "bar") ref
215+
_ ← AVar.take var $ traverse_ \val →
216+
void $ Ref.modify (_ <> val) ref
217+
eq "barfoo" <$> Ref.read ref
218+
219+
test_max2_put_take Effect Unit
220+
test_max2_put_take = test "max2: put/take" do
221+
ref ← Ref.new ""
222+
var ← AVar.empty
223+
let
224+
err =
225+
error "max puts exceeded"
226+
_ ← putMax 2 err "foo" var $ traverse_ \_ →
227+
void $ Ref.modify (_ <> "bar") ref
228+
_ ← AVar.take var $ traverse_ \val →
229+
void $ Ref.modify (_ <> val) ref
230+
eq "barfoo" <$> Ref.read ref
231+
232+
test_max1_put_put_put_take Effect Unit
233+
test_max1_put_put_put_take = test "max1: put/put/put/take" do
234+
ref ← Ref.new ""
235+
var ← AVar.empty
236+
let
237+
err =
238+
error "max puts exceeded"
239+
_ ← putMax 1 err "foo" var $ traverse_ \_ →
240+
void $ Ref.modify (_ <> "bar") ref
241+
_ ← putMax 1 err "foo" var $ traverse_ \_ →
242+
void $ Ref.modify (_ <> "bar") ref
243+
_ ← putMax 1 err "foo" var $
244+
case _ of
245+
Left _ -> void $ Ref.modify (_ <> "fail") ref
246+
otherwise -> void $ Ref.modify (_ <> "bar") ref
247+
_ ← AVar.take var $ traverse_ \val →
248+
void $ Ref.modify (_ <> val) ref
249+
eq "barfailfoobar" <$> Ref.read ref
250+
251+
putSliding
252+
:: a
253+
. Int
254+
-> Error
255+
-> a
256+
-> AVar.AVar a
257+
-> AVar.AVarCallback Unit
258+
-> Effect (Effect Unit)
259+
putSliding max err = AVar.windowPut go
260+
where
261+
go n
262+
| n < max = AVar.PushTail
263+
| otherwise = AVar.DropHead err
264+
265+
test_window1_put_take Effect Unit
266+
test_window1_put_take = test "win1: put/take" do
267+
ref ← Ref.new ""
268+
var ← AVar.empty
269+
let
270+
err =
271+
error "sliding window exceeded"
272+
_ ← putSliding 1 err "foo" var $ traverse_ \_ →
273+
void $ Ref.modify (_ <> "bar") ref
274+
_ ← AVar.take var $ traverse_ \val →
275+
void $ Ref.modify (_ <> val) ref
276+
eq "barfoo" <$> Ref.read ref
277+
278+
test_window2_put_take Effect Unit
279+
test_window2_put_take = test "win2: put/take" do
280+
ref ← Ref.new ""
281+
var ← AVar.empty
282+
let
283+
err =
284+
error "sliding window exceeded"
285+
_ ← putSliding 2 err "foo" var $ traverse_ \_ →
286+
void $ Ref.modify (_ <> "bar") ref
287+
_ ← AVar.take var $ traverse_ \val →
288+
void $ Ref.modify (_ <> val) ref
289+
eq "barfoo" <$> Ref.read ref
290+
291+
test_window1_put_put_put_take_take Effect Unit
292+
test_window1_put_put_put_take_take = test "win1: put/put/put/take/take" do
293+
ref ← Ref.new ""
294+
var ← AVar.empty
295+
let
296+
err =
297+
error "sliding window exceeded"
298+
_ ← putSliding 1 err "foo1" var $ traverse_ \_ →
299+
void $ Ref.modify (_ <> "bar1") ref
300+
_ ← putSliding 1 err "foo2" var $
301+
case _ of
302+
Left _ -> void $ Ref.modify (_ <> "fail") ref
303+
otherwise -> void $ Ref.modify (_ <> "bar2") ref
304+
_ ← putSliding 1 err "foo3" var $ traverse_ \_ →
305+
void $ Ref.modify (_ <> "bar3") ref
306+
_ ← AVar.take var $ traverse_ \val →
307+
void $ Ref.modify (_ <> val) ref
308+
_ ← AVar.take var $ traverse_ \val →
309+
void $ Ref.modify (_ <> val) ref
310+
eq "bar1failfoo1bar3foo3" <$> Ref.read ref
311+
192312
main Effect Unit
193313
main = do
194314
test_tryRead_full
@@ -206,3 +326,9 @@ main = do
206326
test_kill_empty
207327
test_kill_pending
208328
test_cancel
329+
test_max1_put_take
330+
test_max2_put_take
331+
test_max1_put_put_put_take
332+
test_window1_put_take
333+
test_window2_put_take
334+
test_window1_put_put_put_take_take

0 commit comments

Comments
 (0)