forked from stanford-esrg/retina
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmod.rs
157 lines (134 loc) · 5.21 KB
/
mod.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
use crate::conntrack::pdu::{L4Context, L4Pdu};
use crate::conntrack::ConnTracker;
use crate::filter::*;
use crate::lcore::CoreId;
use crate::memory::mbuf::Mbuf;
use crate::protocols::packet::tcp::TCP_PROTOCOL;
use crate::protocols::packet::udp::UDP_PROTOCOL;
use crate::protocols::stream::{ConnData, ParserRegistry, Session};
use crate::stats::{StatExt, TCP_BYTE, TCP_PKT, UDP_BYTE, UDP_PKT};
#[cfg(feature = "timing")]
use crate::timing::timer::Timers;
pub trait Subscribable {
type Tracked: Trackable<Subscribed = Self>;
}
pub trait Trackable {
type Subscribed: Subscribable<Tracked = Self>;
/// Create a new struct for tracking connection data for user delivery
fn new(first_pkt: &L4Pdu, core_id: CoreId) -> Self;
/// When tracking, parsing, or buffering frames,
/// update tracked data with new PDU
fn update(&mut self, pdu: &L4Pdu, reassembled: bool);
/// Get a reference to all sessions that matched filter(s) in connection
fn sessions(&self) -> &Vec<Session>;
/// Store a session that matched
fn track_session(&mut self, session: Session);
/// Store packets for (possible) future delivery
fn buffer_packet(&mut self, pdu: &L4Pdu, actions: &Actions, reassembled: bool);
/// Get reference to stored packets (those buffered for delivery)
fn packets(&self) -> &Vec<Mbuf>;
/// Drain data from all types that require storing packets
/// Can help free mbufs for future use
fn drain_tracked_packets(&mut self);
/// Drain data from packets cached for future potential delivery
/// Used after these packets have been delivered or when associated
/// subscription fails to match
fn drain_cached_packets(&mut self);
/// Return the core ID that this tracked conn. is on
fn core_id(&self) -> &CoreId;
/// Parsers needed by all datatypes
/// Parsers needed by filter are generated on program startup
fn parsers() -> ParserRegistry;
/// Clear all internal data
fn clear(&mut self);
}
pub struct Subscription<S>
where
S: Subscribable,
{
packet_continue: PacketContFn,
packet_filter: PacketFilterFn<S::Tracked>,
proto_filter: ProtoFilterFn<S::Tracked>,
session_filter: SessionFilterFn<S::Tracked>,
packet_deliver: PacketDeliverFn<S::Tracked>,
conn_deliver: ConnDeliverFn<S::Tracked>,
#[cfg(feature = "timing")]
pub(crate) timers: Timers,
}
impl<S> Subscription<S>
where
S: Subscribable,
{
pub fn new(factory: FilterFactory<S::Tracked>) -> Self {
Subscription {
packet_continue: factory.packet_continue,
packet_filter: factory.packet_filter,
proto_filter: factory.proto_filter,
session_filter: factory.session_filter,
packet_deliver: factory.packet_deliver,
conn_deliver: factory.conn_deliver,
#[cfg(feature = "timing")]
timers: Timers::new(),
}
}
pub fn process_packet(
&self,
mbuf: Mbuf,
conn_tracker: &mut ConnTracker<S::Tracked>,
actions: Actions,
) {
if actions.data.intersects(ActionData::PacketContinue) {
if let Ok(ctxt) = L4Context::new(&mbuf) {
match ctxt.proto {
TCP_PROTOCOL => {
TCP_PKT.inc();
TCP_BYTE.inc_by(mbuf.data_len() as u64);
}
UDP_PROTOCOL => {
UDP_PKT.inc();
UDP_BYTE.inc_by(mbuf.data_len() as u64);
}
_ => {}
}
conn_tracker.process(mbuf, ctxt, self);
}
}
}
// TODO: packet continue filter should ideally be built at
// compile-time based on what the NIC supports (what has
// already been filtered out in HW).
// Ideally, NIC would `mark` mbufs as `deliver` and/or `continue`.
/// Invokes the software packet filter.
/// Used for each packet to determine
/// forwarding to conn. tracker.
pub fn continue_packet(&self, mbuf: &Mbuf, core_id: &CoreId) -> Actions {
(self.packet_continue)(mbuf, core_id)
}
/// Invokes the five-tuple filter.
/// Applied to the first packet in the connection.
pub fn filter_packet(&self, mbuf: &Mbuf, tracked: &S::Tracked) -> Actions {
(self.packet_filter)(mbuf, tracked)
}
/// Invokes the end-to-end protocol filter.
/// Applied once a parser identifies the application-layer protocol.
pub fn filter_protocol(&self, conn: &ConnData, tracked: &S::Tracked) -> Actions {
(self.proto_filter)(conn, tracked)
}
/// Invokes the application-layer session filter.
/// Delivers sessions to callbacks if applicable.
pub fn filter_session(
&self,
session: &Session,
conn: &ConnData,
tracked: &S::Tracked,
) -> Actions {
(self.session_filter)(session, conn, tracked)
}
/// Delivery functions, including delivery to the correct callback
pub fn deliver_packet(&self, mbuf: &Mbuf, conn_data: &ConnData, tracked: &S::Tracked) {
(self.packet_deliver)(mbuf, conn_data, tracked)
}
pub fn deliver_conn(&self, conn_data: &ConnData, tracked: &S::Tracked) {
(self.conn_deliver)(conn_data, tracked)
}
}