-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathincrementer.rs
55 lines (50 loc) · 1.85 KB
/
incrementer.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
use std::sync::Arc;
use anyhow::Result;
use arbiter_core::{events::stream_event, middleware::ArbiterMiddleware};
use arbiter_engine::{
machine::{Behavior, ControlFlow, EventStream},
messager::Messager,
};
use tracing::{debug, info};
use super::*;
use crate::bindings::modified_counter::{IncrementedFilter, ModifiedCounter};
#[derive(Debug, Serialize, Deserialize)]
pub struct Incrementer {
#[serde(default)]
curr_number_of_times: u64,
max_number_of_times: u64,
#[serde(skip)]
counter: Option<ModifiedCounter<ArbiterMiddleware>>,
}
#[async_trait::async_trait]
impl Behavior<IncrementedFilter> for Incrementer {
async fn startup(
&mut self,
client: Arc<ArbiterMiddleware>,
_messager: Messager,
) -> Result<Option<EventStream<IncrementedFilter>>> {
debug!("Incrementer starting up");
let counter = ModifiedCounter::deploy(client.clone(), ())?.send().await?;
let stream = stream_event(counter.incremented_filter());
counter.increment().send().await?.await?;
self.curr_number_of_times += 1;
let curr_number = counter.number().call().await?;
debug!("Incremented to: {}", curr_number);
self.counter = Some(counter);
Ok(Some(stream))
}
async fn process(&mut self, _event: IncrementedFilter) -> Result<ControlFlow> {
debug!("Incrementer processing event");
let counter = self.counter.as_ref().unwrap();
if self.curr_number_of_times < self.max_number_of_times {
counter.increment().send().await?.await?;
self.curr_number_of_times += 1;
let curr_number = counter.number().call().await?;
debug!("Incremented to: {}", curr_number);
Ok(ControlFlow::Continue)
} else {
info!("Incrementer done");
return Ok(ControlFlow::Halt);
}
}
}