Skip to content

Commit 88033f1

Browse files
committed
Don't call Instant::now() on each iteration
1 parent c3d5bad commit 88033f1

File tree

5 files changed

+40
-25
lines changed

5 files changed

+40
-25
lines changed

core/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ byteorder = "1.4.3"
1616
chrono = "0.4"
1717
colored = "2"
1818
cpu-time = "1.0.0"
19-
crossbeam-channel = "0.5.8"
2019
csv = "1.2.1"
2120
ctrlc = { version = "3.2.5", features = ["termination"] }
2221
dns-parser = { git = "https://github.com/stanford-esrg/dns-parser" }

core/src/conntrack/mod.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,13 @@ where
177177
}
178178

179179
/// Checks for and removes inactive connections.
180-
pub(crate) fn check_inactive(&mut self, subscription: &Subscription<T::Subscribed>) {
180+
pub(crate) fn check_inactive(
181+
&mut self,
182+
subscription: &Subscription<T::Subscribed>,
183+
now: Instant,
184+
) {
181185
self.timerwheel
182-
.check_inactive(&mut self.table, subscription);
186+
.check_inactive(&mut self.table, subscription, now);
183187
}
184188
}
185189

core/src/conntrack/timerwheel.rs

+13-11
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,19 @@
11
use crate::conntrack::{Conn, ConnId};
22
use crate::subscription::{Subscription, Trackable};
33

4-
use crossbeam_channel::{tick, Receiver};
54
use hashlink::linked_hash_map::LinkedHashMap;
65
use hashlink::linked_hash_map::RawEntryMut;
76
use std::collections::VecDeque;
87
use std::time::{Duration, Instant};
98

109
/// Tracks inactive connection expiration.
1110
pub(super) struct TimerWheel {
12-
/// Period to check for inactive connections (in milliseconds).
13-
period: usize,
11+
/// Period to check for inactive connections.
12+
period: Duration,
1413
/// Start time of the `TimerWheel`.
1514
start_ts: Instant,
16-
/// Timeout ticker, fires every `period` milliseconds.
17-
ticker: Receiver<Instant>,
15+
/// Previous check time of the `TimerWheel`.
16+
prev_ts: Instant,
1817
/// Index of the next bucket to expire.
1918
next_bucket: usize,
2019
/// List of timers.
@@ -29,11 +28,11 @@ impl TimerWheel {
2928
panic!("Timeout check period must be smaller than maximum inactivity timeout")
3029
}
3130
let start_ts = Instant::now();
32-
let ticker = tick(Duration::from_millis(timeout_resolution as u64));
31+
let period = Duration::from_millis(timeout_resolution as u64);
3332
TimerWheel {
34-
period: timeout_resolution,
33+
period,
3534
start_ts,
36-
ticker,
35+
prev_ts: start_ts,
3736
next_bucket: 0,
3837
timers: vec![VecDeque::new(); max_timeout / timeout_resolution],
3938
}
@@ -48,7 +47,8 @@ impl TimerWheel {
4847
inactivity_window: usize,
4948
) {
5049
let current_time = (last_seen_ts - self.start_ts).as_millis() as usize;
51-
let timer_index = ((current_time + inactivity_window) / self.period) % self.timers.len();
50+
let period = self.period.as_millis() as usize;
51+
let timer_index = ((current_time + inactivity_window) / period) % self.timers.len();
5252
log::debug!("Inserting into index: {}, {:?}", timer_index, current_time);
5353
self.timers[timer_index].push_back(conn_id.to_owned());
5454
}
@@ -59,9 +59,11 @@ impl TimerWheel {
5959
&mut self,
6060
table: &mut LinkedHashMap<ConnId, Conn<T>>,
6161
subscription: &Subscription<T::Subscribed>,
62+
now: Instant,
6263
) {
6364
let table_len = table.len();
64-
if let Ok(now) = self.ticker.try_recv() {
65+
if now - self.prev_ts >= self.period {
66+
self.prev_ts = now;
6567
let nb_removed = self.remove_inactive(now, table, subscription);
6668
log::debug!(
6769
"expired: {} ({})",
@@ -83,7 +85,7 @@ impl TimerWheel {
8385
table: &mut LinkedHashMap<ConnId, Conn<T>>,
8486
subscription: &Subscription<T::Subscribed>,
8587
) -> usize {
86-
let period = self.period;
88+
let period = self.period.as_millis() as usize;
8789
let nb_buckets = self.timers.len();
8890
let mut not_expired: Vec<(usize, ConnId)> = vec![];
8991
let check_time = (now - self.start_ts).as_millis() as usize / period * period;

core/src/lcore/monitor.rs

+13-10
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use std::time::{Duration, Instant};
1414

1515
use anyhow::{bail, Result};
1616
use chrono::Local;
17-
use crossbeam_channel::{tick, Receiver};
1817
use csv::Writer;
1918
use serde::Serialize;
2019

@@ -81,7 +80,7 @@ impl Monitor {
8180
port_wtrs.insert(*port_id, wtr);
8281
}
8382
return Some(Logger {
84-
ticker: tick(Duration::from_millis(log_cfg.interval)),
83+
interval: Duration::from_millis(log_cfg.interval),
8584
path,
8685
port_wtrs,
8786
keywords: log_cfg.port_stats.clone(),
@@ -119,7 +118,12 @@ impl Monitor {
119118
let mut prev_rx = init_rx;
120119
let mut prev_ts = init_ts;
121120
let mut init = true;
122-
let mut ticker = tokio::time::interval(Duration::from_millis(1000));
121+
let mut display_ticker = tokio::time::interval(Duration::from_millis(1000));
122+
123+
let mut logger_ticker = self
124+
.logger
125+
.as_ref()
126+
.map(|logger| tokio::time::interval(logger.interval));
123127
// Add a small delay to allow workers to start polling for packets
124128
tokio::time::sleep(Duration::from_millis(1000)).await;
125129
while self.is_running.load(Ordering::Relaxed) {
@@ -130,7 +134,7 @@ impl Monitor {
130134
}
131135

132136
if let Some(display) = &self.display {
133-
ticker.tick().await;
137+
display_ticker.tick().await;
134138
let curr_ts = Instant::now();
135139
let delta = curr_ts - prev_ts;
136140
match AggRxStats::collect(&self.ports, &display.keywords) {
@@ -160,11 +164,10 @@ impl Monitor {
160164
}
161165

162166
if let Some(logger) = &mut self.logger {
163-
if logger.ticker.try_recv().is_ok() {
164-
match logger.log_stats(init_ts.elapsed()) {
165-
Ok(_) => (),
166-
Err(error) => log::error!("Monitor log error: {}", error),
167-
}
167+
logger_ticker.as_mut().unwrap().tick().await;
168+
match logger.log_stats(init_ts.elapsed()) {
169+
Ok(_) => (),
170+
Err(error) => log::error!("Monitor log error: {}", error),
168171
}
169172
}
170173
}
@@ -209,7 +212,7 @@ impl Display {
209212

210213
#[derive(Debug)]
211214
struct Logger {
212-
ticker: Receiver<Instant>,
215+
interval: Duration,
213216
path: PathBuf,
214217
port_wtrs: HashMap<PortId, Writer<std::fs::File>>,
215218
keywords: Vec<String>,

core/src/lcore/rx_core.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::subscription::*;
1212

1313
use std::sync::atomic::{AtomicBool, Ordering};
1414
use std::sync::Arc;
15+
use std::time::Instant;
1516

1617
use itertools::Itertools;
1718

@@ -95,12 +96,18 @@ where
9596
log::debug!("{:#?}", registry);
9697
let mut conn_table = ConnTracker::<S::Tracked>::new(config, registry, self.id);
9798

99+
let mut now = Instant::now();
100+
98101
while self.is_running.load(Ordering::Relaxed) {
99102
for rxqueue in self.rxqueues.iter() {
100103
let mbufs: Vec<Mbuf> = self.rx_burst(rxqueue, 32);
101104
if mbufs.is_empty() {
102105
IDLE_CYCLES.inc();
103106

107+
if IDLE_CYCLES.get() & 1023 == 512 {
108+
now = Instant::now();
109+
}
110+
104111
#[cfg(feature = "prometheus")]
105112
if IDLE_CYCLES.get() & 1023 == 0 && self.is_prometheus_enabled {
106113
crate::stats::update_thread_local_stats(self.id);
@@ -133,7 +140,7 @@ where
133140
}
134141
}
135142
}
136-
conn_table.check_inactive(&self.subscription);
143+
conn_table.check_inactive(&self.subscription, now);
137144
}
138145

139146
// // Deliver remaining data in table from unfinished connections

0 commit comments

Comments
 (0)