Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(utilization_metric): run a separate task for utilization to ensure it is regularly published #22070

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/22070_utilization_metric_periodic_emit.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `utilization` metric is now properly published periodically, even when no events are flowing through the components.

authors: esensar
98 changes: 80 additions & 18 deletions src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{

use futures::{stream::FuturesOrdered, FutureExt, StreamExt, TryStreamExt};
use futures_util::stream::FuturesUnordered;
use metrics::gauge;
use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
use tokio::{
select,
Expand Down Expand Up @@ -51,7 +52,7 @@ use crate::{
spawn_named,
topology::task::TaskError,
transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf},
utilization::wrap,
utilization::{wrap, UtilizationEmitter, UtilizationTimerMessage},
SourceSender,
};

Expand Down Expand Up @@ -84,6 +85,7 @@ struct Builder<'a> {
healthchecks: HashMap<ComponentKey, Task>,
detach_triggers: HashMap<ComponentKey, Trigger>,
extra_context: ExtraContext,
utilization_emitter: UtilizationEmitter,
}

impl<'a> Builder<'a> {
Expand All @@ -105,6 +107,7 @@ impl<'a> Builder<'a> {
healthchecks: HashMap::new(),
detach_triggers: HashMap::new(),
extra_context,
utilization_emitter: UtilizationEmitter::new(),
}
}

Expand All @@ -128,6 +131,7 @@ impl<'a> Builder<'a> {
healthchecks: self.healthchecks,
shutdown_coordinator: self.shutdown_coordinator,
detach_triggers: self.detach_triggers,
utilization_emitter: Some(self.utilization_emitter),
})
} else {
Err(self.errors)
Expand Down Expand Up @@ -497,7 +501,7 @@ impl<'a> Builder<'a> {

let (transform_task, transform_outputs) = {
let _span = span.enter();
build_transform(transform, node, input_rx)
build_transform(transform, node, input_rx, &mut self.utilization_emitter)
};

self.outputs.extend(transform_outputs);
Expand Down Expand Up @@ -585,6 +589,10 @@ impl<'a> Builder<'a> {

let (trigger, tripwire) = Tripwire::new();

self.utilization_emitter
.add_component(key.clone(), gauge!("utilization"));
let utilization_sender = self.utilization_emitter.get_sender();
let component_key = key.clone();
let sink = async move {
debug!("Sink starting.");

Expand All @@ -600,7 +608,7 @@ impl<'a> Builder<'a> {
.take()
.expect("Task started but input has been taken.");

let mut rx = wrap(rx);
let mut rx = wrap(utilization_sender, component_key.clone(), rx);

let events_received = register!(EventsReceived);
sink.run(
Expand Down Expand Up @@ -682,6 +690,7 @@ pub struct TopologyPieces {
pub(super) healthchecks: HashMap<ComponentKey, Task>,
pub(crate) shutdown_coordinator: SourceShutdownCoordinator,
pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>,
pub(crate) utilization_emitter: Option<UtilizationEmitter>,
}

impl TopologyPieces {
Expand Down Expand Up @@ -760,18 +769,22 @@ fn build_transform(
transform: Transform,
node: TransformNode,
input_rx: BufferReceiver<EventArray>,
utilization_emitter: &mut UtilizationEmitter,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
match transform {
// TODO: avoid the double boxing for function transforms here
Transform::Function(t) => build_sync_transform(Box::new(t), node, input_rx),
Transform::Synchronous(t) => build_sync_transform(t, node, input_rx),
Transform::Function(t) => {
build_sync_transform(Box::new(t), node, input_rx, utilization_emitter)
}
Transform::Synchronous(t) => build_sync_transform(t, node, input_rx, utilization_emitter),
Transform::Task(t) => build_task_transform(
t,
input_rx,
node.input_details.data_type(),
node.typetag,
&node.key,
&node.outputs,
utilization_emitter,
),
}
}
Expand All @@ -780,10 +793,19 @@ fn build_sync_transform(
t: Box<dyn SyncTransform>,
node: TransformNode,
input_rx: BufferReceiver<EventArray>,
utilization_emitter: &mut UtilizationEmitter,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key);

let runner = Runner::new(t, input_rx, node.input_details.data_type(), outputs);
utilization_emitter.add_component(node.key.clone(), gauge!("utilization"));
let runner = Runner::new(
t,
input_rx,
utilization_emitter.get_sender(),
node.key.clone(),
node.input_details.data_type(),
outputs,
);
let transform = if node.enable_concurrency {
runner.run_concurrently().boxed()
} else {
Expand Down Expand Up @@ -823,15 +845,17 @@ struct Runner {
input_rx: Option<BufferReceiver<EventArray>>,
input_type: DataType,
outputs: TransformOutputs,
timer: crate::utilization::Timer,
last_report: Instant,
key: ComponentKey,
timer_tx: UnboundedSender<UtilizationTimerMessage>,
events_received: Registered<EventsReceived>,
}

impl Runner {
fn new(
transform: Box<dyn SyncTransform>,
input_rx: BufferReceiver<EventArray>,
timer_tx: UnboundedSender<UtilizationTimerMessage>,
key: ComponentKey,
input_type: DataType,
outputs: TransformOutputs,
) -> Self {
Expand All @@ -840,17 +864,22 @@ impl Runner {
input_rx: Some(input_rx),
input_type,
outputs,
timer: crate::utilization::Timer::new(),
last_report: Instant::now(),
key,
timer_tx,
events_received: register!(EventsReceived),
}
}

fn on_events_received(&mut self, events: &EventArray) {
let stopped = self.timer.stop_wait();
if stopped.duration_since(self.last_report).as_secs() >= 5 {
self.timer.report();
self.last_report = stopped;
if self
.timer_tx
.send(UtilizationTimerMessage::StopWait(
self.key.clone(),
Instant::now(),
))
.is_err()
{
debug!(component_id = ?self.key, "Couldn't send utilization stop wait message from sync transform.");
}

self.events_received.emit(CountByteSize(
Expand All @@ -860,7 +889,16 @@ impl Runner {
}

async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> {
self.timer.start_wait();
if self
.timer_tx
.send(UtilizationTimerMessage::StartWait(
self.key.clone(),
Instant::now(),
))
.is_err()
{
debug!(component_id = ?self.key, "Couldn't send utilization start wait message from sync transform.");
}
self.outputs.send(outputs_buf).await
}

Expand All @@ -877,7 +915,16 @@ impl Runner {
.into_stream()
.filter(move |events| ready(filter_events_type(events, self.input_type)));

self.timer.start_wait();
if self
.timer_tx
.send(UtilizationTimerMessage::StartWait(
self.key.clone(),
Instant::now(),
))
.is_err()
{
debug!(component_id = ?self.key, "Couldn't send utilization start wait message from sync transform.");
}
while let Some(events) = input_rx.next().await {
self.on_events_received(&events);
self.transform.transform_all(events, &mut outputs_buf);
Expand All @@ -903,7 +950,16 @@ impl Runner {
let mut in_flight = FuturesOrdered::new();
let mut shutting_down = false;

self.timer.start_wait();
if self
.timer_tx
.send(UtilizationTimerMessage::StartWait(
self.key.clone(),
Instant::now(),
))
.is_err()
{
debug!(component_id = ?self.key, "Couldn't send utilization start wait message from sync transform.");
}
loop {
tokio::select! {
biased;
Expand Down Expand Up @@ -964,10 +1020,16 @@ fn build_task_transform(
typetag: &str,
key: &ComponentKey,
outputs: &[TransformOutput],
utilization_emitter: &mut UtilizationEmitter,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
let (mut fanout, control) = Fanout::new();

let input_rx = crate::utilization::wrap(input_rx.into_stream());
utilization_emitter.add_component(key.clone(), gauge!("utilization"));
let input_rx = wrap(
utilization_emitter.get_sender(),
key.clone(),
input_rx.into_stream(),
);

let events_received = register!(EventsReceived);
let filtered = input_rx
Expand Down
31 changes: 26 additions & 5 deletions src/topology/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ use std::{
};

use super::{
builder,
builder::TopologyPieces,
builder::{self, TopologyPieces},
fanout::{ControlChannel, ControlMessage},
handle_errors, retain, take_healthchecks,
task::TaskOutput,
task::{Task, TaskOutput},
BuiltBuffer, TaskHandle,
};
use crate::{
Expand All @@ -28,9 +27,9 @@ use tokio::{
time::{interval, sleep_until, Duration, Instant},
};
use tracing::Instrument;
use vector_lib::buffers::topology::channel::BufferSender;
use vector_lib::tap::topology::{TapOutput, TapResource, WatchRx, WatchTx};
use vector_lib::trigger::DisabledTrigger;
use vector_lib::{buffers::topology::channel::BufferSender, shutdown::ShutdownSignal};

pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver<ShutdownError>;

Expand All @@ -49,6 +48,7 @@ pub struct RunningTopology {
watch: (WatchTx, WatchRx),
pub(crate) running: Arc<AtomicBool>,
graceful_shutdown_duration: Option<Duration>,
utilization_task: Option<TaskHandle>,
}

impl RunningTopology {
Expand All @@ -67,6 +67,7 @@ impl RunningTopology {
running: Arc::new(AtomicBool::new(true)),
graceful_shutdown_duration: config.graceful_shutdown_duration,
config,
utilization_task: None,
}
}

Expand Down Expand Up @@ -116,15 +117,21 @@ impl RunningTopology {
// pump in self.tasks, and the other for source in self.source_tasks.
let mut check_handles = HashMap::<ComponentKey, Vec<_>>::new();

let map_closure = |_result| ();

// We need to give some time to the sources to gracefully shutdown, so
// we will merge them with other tasks.
for (key, task) in self.tasks.into_iter().chain(self.source_tasks.into_iter()) {
let task = task.map(|_result| ()).shared();
let task = task.map(map_closure).shared();

wait_handles.push(task.clone());
check_handles.entry(key).or_default().push(task);
}

if let Some(utilization_task) = self.utilization_task {
wait_handles.push(utilization_task.map(map_closure).shared());
}

// If we reach this, we will forcefully shutdown the sources. If None, we will never force shutdown.
let deadline = self
.graceful_shutdown_duration
Expand Down Expand Up @@ -1042,6 +1049,7 @@ impl RunningTopology {
return None;
}

let mut utilization_emitter = pieces.utilization_emitter.take().unwrap();
let mut running_topology = Self::new(config, abort_tx);

if !running_topology
Expand All @@ -1053,6 +1061,19 @@ impl RunningTopology {
running_topology.connect_diff(&diff, &mut pieces).await;
running_topology.spawn_diff(&diff, pieces);

running_topology.utilization_task = Some(tokio::spawn(Task::new(
"utilization_heartbeat".into(),
"",
async move {
utilization_emitter
.run_utilization(ShutdownSignal::noop())
.await;
// TODO: new task output type for this? Or handle this task in a completely
// different way
Ok(TaskOutput::Healthcheck)
},
)));

Some((running_topology, abort_rx))
}
}
Expand Down
Loading