Skip to content

Commit

Permalink
fix(reduce transform): use the correct merge strategy for top level o…
Browse files Browse the repository at this point in the history
…bjects (#21067)

* fix(issue 21065): use the correct merge strategy for top level objects

* changelog

* revert config changes to preserve backwards compatibility

* enhance test case

* update test with another merge strategy, note we cannot handle strategies for both 'a' and 'a.b'

* tweak changelog text

* workaround for issue 21077

* add nested path to the test case

* Address review points

* add dot to message

* more review points
  • Loading branch information
pront authored and jszwedko committed Aug 23, 2024
1 parent 1167aa9 commit e9455f6
Show file tree
Hide file tree
Showing 4 changed files with 332 additions and 36 deletions.
1 change: 1 addition & 0 deletions changelog.d/21067_reduce.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes a Vector v0.40.0 regression where the `reduce` transform would not group top level objects correctly.
29 changes: 28 additions & 1 deletion src/internal_events/reduce.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use metrics::counter;
use vector_lib::internal_event::InternalEvent;
use vector_lib::internal_event::{error_stage, error_type, InternalEvent};
use vrl::path::PathParseError;
use vrl::value::KeyString;

#[derive(Debug)]
pub struct ReduceStaleEventFlushed;
Expand All @@ -9,3 +11,28 @@ impl InternalEvent for ReduceStaleEventFlushed {
counter!("stale_events_flushed_total").increment(1);
}
}

#[derive(Debug)]
pub struct ReduceAddEventError {
pub error: PathParseError,
pub path: KeyString,
}

impl InternalEvent for ReduceAddEventError {
fn emit(self) {
error!(
message = "Event field could not be reduced.",
path = ?self.path,
error = ?self.error,
error_type = error_type::CONDITION_FAILED,
stage = error_stage::PROCESSING,
internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
"error_type" => error_type::PARSER_FAILED,
"stage" => error_stage::PROCESSING,
)
.increment(1);
}
}
85 changes: 68 additions & 17 deletions src/transforms/reduce/merge_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use bytes::{Bytes, BytesMut};
use chrono::{DateTime, Utc};
use ordered_float::NotNan;
use vector_lib::configurable::configurable_component;
use vrl::path::OwnedTargetPath;

/// Strategies for merging events.
#[configurable_component]
Expand Down Expand Up @@ -67,7 +68,11 @@ impl ReduceValueMerger for DiscardMerger {
Ok(())
}

fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String> {
fn insert_into(
self: Box<Self>,
path: &OwnedTargetPath,
v: &mut LogEvent,
) -> Result<(), String> {
v.insert(path, self.v);
Ok(())
}
Expand All @@ -93,7 +98,11 @@ impl ReduceValueMerger for RetainMerger {
Ok(())
}

fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String> {
fn insert_into(
self: Box<Self>,
path: &OwnedTargetPath,
v: &mut LogEvent,
) -> Result<(), String> {
v.insert(path, self.v);
Ok(())
}
Expand Down Expand Up @@ -133,7 +142,11 @@ impl ReduceValueMerger for ConcatMerger {
}
}

fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String> {
fn insert_into(
self: Box<Self>,
path: &OwnedTargetPath,
v: &mut LogEvent,
) -> Result<(), String> {
v.insert(path, Value::Bytes(self.v.into()));
Ok(())
}
Expand All @@ -160,7 +173,11 @@ impl ReduceValueMerger for ConcatArrayMerger {
Ok(())
}

fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String> {
fn insert_into(
self: Box<Self>,
path: &OwnedTargetPath,
v: &mut LogEvent,
) -> Result<(), String> {
v.insert(path, Value::Array(self.v));
Ok(())
}
Expand All @@ -183,7 +200,11 @@ impl ReduceValueMerger for ArrayMerger {
Ok(())
}

fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String> {
fn insert_into(
self: Box<Self>,
path: &OwnedTargetPath,
v: &mut LogEvent,
) -> Result<(), String> {
v.insert(path, Value::Array(self.v));
Ok(())
}
Expand Down Expand Up @@ -215,7 +236,11 @@ impl ReduceValueMerger for LongestArrayMerger {
}
}

fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String> {
fn insert_into(
self: Box<Self>,
path: &OwnedTargetPath,
v: &mut LogEvent,
) -> Result<(), String> {
v.insert(path, Value::Array(self.v));
Ok(())
}
Expand Down Expand Up @@ -247,7 +272,11 @@ impl ReduceValueMerger for ShortestArrayMerger {
}
}

fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String> {
fn insert_into(
self: Box<Self>,
path: &OwnedTargetPath,
v: &mut LogEvent,
) -> Result<(), String> {
v.insert(path, Value::Array(self.v));
Ok(())
}
Expand Down Expand Up @@ -292,7 +321,11 @@ impl ReduceValueMerger for FlatUniqueMerger {
Ok(())
}

fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String> {
fn insert_into(
self: Box<Self>,
path: &OwnedTargetPath,
v: &mut LogEvent,
) -> Result<(), String> {
v.insert(path, Value::Array(self.v.into_iter().collect()));
Ok(())
}
Expand Down Expand Up @@ -326,7 +359,11 @@ impl ReduceValueMerger for TimestampWindowMerger {
Ok(())
}

fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String> {
fn insert_into(
self: Box<Self>,
path: &OwnedTargetPath,
v: &mut LogEvent,
) -> Result<(), String> {
v.insert(
format!("{}_end", path).as_str(),
Value::Timestamp(self.latest),
Expand Down Expand Up @@ -390,7 +427,11 @@ impl ReduceValueMerger for AddNumbersMerger {
Ok(())
}

fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String> {
fn insert_into(
self: Box<Self>,
path: &OwnedTargetPath,
v: &mut LogEvent,
) -> Result<(), String> {
match self.v {
NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)),
NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)),
Expand Down Expand Up @@ -449,7 +490,11 @@ impl ReduceValueMerger for MaxNumberMerger {
Ok(())
}

fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String> {
fn insert_into(
self: Box<Self>,
path: &OwnedTargetPath,
v: &mut LogEvent,
) -> Result<(), String> {
match self.v {
NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)),
NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)),
Expand Down Expand Up @@ -508,7 +553,11 @@ impl ReduceValueMerger for MinNumberMerger {
Ok(())
}

fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String> {
fn insert_into(
self: Box<Self>,
path: &OwnedTargetPath,
v: &mut LogEvent,
) -> Result<(), String> {
match self.v {
NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)),
NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)),
Expand All @@ -519,7 +568,8 @@ impl ReduceValueMerger for MinNumberMerger {

pub trait ReduceValueMerger: std::fmt::Debug + Send + Sync {
fn add(&mut self, v: Value) -> Result<(), String>;
fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String>;
fn insert_into(self: Box<Self>, path: &OwnedTargetPath, v: &mut LogEvent)
-> Result<(), String>;
}

impl From<Value> for Box<dyn ReduceValueMerger> {
Expand Down Expand Up @@ -612,10 +662,10 @@ pub(crate) fn get_value_merger(

#[cfg(test)]
mod test {
use serde_json::json;

use super::*;
use crate::event::LogEvent;
use serde_json::json;
use vrl::owned_event_path;

#[test]
fn initial_values() {
Expand Down Expand Up @@ -876,7 +926,8 @@ mod test {
let mut merger = get_value_merger(initial, strategy)?;
merger.add(additional)?;
let mut output = LogEvent::default();
merger.insert_into("out", &mut output)?;
Ok(output.remove("out").unwrap())
let out_path = owned_event_path!("out");
merger.insert_into(&out_path, &mut output)?;
Ok(output.remove(&out_path).unwrap())
}
}
Loading

0 comments on commit e9455f6

Please sign in to comment.