diff --git a/changelog.d/21067_reduce.fix.md b/changelog.d/21067_reduce.fix.md new file mode 100644 index 0000000000000..67af1e4e16374 --- /dev/null +++ b/changelog.d/21067_reduce.fix.md @@ -0,0 +1 @@ +Fixes a Vector v0.40.0 regression where the `reduce` transform would not group top level objects correctly. diff --git a/src/internal_events/reduce.rs b/src/internal_events/reduce.rs index a773d0a7ac502..b9887aaf54bb7 100644 --- a/src/internal_events/reduce.rs +++ b/src/internal_events/reduce.rs @@ -1,5 +1,7 @@ use metrics::counter; -use vector_lib::internal_event::InternalEvent; +use vector_lib::internal_event::{error_stage, error_type, InternalEvent}; +use vrl::path::PathParseError; +use vrl::value::KeyString; #[derive(Debug)] pub struct ReduceStaleEventFlushed; @@ -9,3 +11,28 @@ impl InternalEvent for ReduceStaleEventFlushed { counter!("stale_events_flushed_total").increment(1); } } + +#[derive(Debug)] +pub struct ReduceAddEventError { + pub error: PathParseError, + pub path: KeyString, +} + +impl InternalEvent for ReduceAddEventError { + fn emit(self) { + error!( + message = "Event field could not be reduced.", + path = ?self.path, + error = ?self.error, + error_type = error_type::CONDITION_FAILED, + stage = error_stage::PROCESSING, + internal_log_rate_limit = true, + ); + counter!( + "component_errors_total", + "error_type" => error_type::PARSER_FAILED, + "stage" => error_stage::PROCESSING, + ) + .increment(1); + } +} diff --git a/src/transforms/reduce/merge_strategy.rs b/src/transforms/reduce/merge_strategy.rs index 1dd076af4f09f..b58d5db94bc98 100644 --- a/src/transforms/reduce/merge_strategy.rs +++ b/src/transforms/reduce/merge_strategy.rs @@ -5,6 +5,7 @@ use bytes::{Bytes, BytesMut}; use chrono::{DateTime, Utc}; use ordered_float::NotNan; use vector_lib::configurable::configurable_component; +use vrl::path::OwnedTargetPath; /// Strategies for merging events. #[configurable_component] @@ -67,7 +68,11 @@ impl ReduceValueMerger for DiscardMerger { Ok(()) } - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String> { + fn insert_into( + self: Box, + path: &OwnedTargetPath, + v: &mut LogEvent, + ) -> Result<(), String> { v.insert(path, self.v); Ok(()) } @@ -93,7 +98,11 @@ impl ReduceValueMerger for RetainMerger { Ok(()) } - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String> { + fn insert_into( + self: Box, + path: &OwnedTargetPath, + v: &mut LogEvent, + ) -> Result<(), String> { v.insert(path, self.v); Ok(()) } @@ -133,7 +142,11 @@ impl ReduceValueMerger for ConcatMerger { } } - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String> { + fn insert_into( + self: Box, + path: &OwnedTargetPath, + v: &mut LogEvent, + ) -> Result<(), String> { v.insert(path, Value::Bytes(self.v.into())); Ok(()) } @@ -160,7 +173,11 @@ impl ReduceValueMerger for ConcatArrayMerger { Ok(()) } - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String> { + fn insert_into( + self: Box, + path: &OwnedTargetPath, + v: &mut LogEvent, + ) -> Result<(), String> { v.insert(path, Value::Array(self.v)); Ok(()) } @@ -183,7 +200,11 @@ impl ReduceValueMerger for ArrayMerger { Ok(()) } - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String> { + fn insert_into( + self: Box, + path: &OwnedTargetPath, + v: &mut LogEvent, + ) -> Result<(), String> { v.insert(path, Value::Array(self.v)); Ok(()) } @@ -215,7 +236,11 @@ impl ReduceValueMerger for LongestArrayMerger { } } - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String> { + fn insert_into( + self: Box, + path: &OwnedTargetPath, + v: &mut LogEvent, + ) -> Result<(), String> { v.insert(path, Value::Array(self.v)); Ok(()) } @@ -247,7 +272,11 @@ impl ReduceValueMerger for ShortestArrayMerger { } } - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String> { + fn insert_into( + self: Box, + path: &OwnedTargetPath, + v: &mut LogEvent, + ) -> Result<(), String> { v.insert(path, Value::Array(self.v)); Ok(()) } @@ -292,7 +321,11 @@ impl ReduceValueMerger for FlatUniqueMerger { Ok(()) } - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String> { + fn insert_into( + self: Box, + path: &OwnedTargetPath, + v: &mut LogEvent, + ) -> Result<(), String> { v.insert(path, Value::Array(self.v.into_iter().collect())); Ok(()) } @@ -326,7 +359,11 @@ impl ReduceValueMerger for TimestampWindowMerger { Ok(()) } - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String> { + fn insert_into( + self: Box, + path: &OwnedTargetPath, + v: &mut LogEvent, + ) -> Result<(), String> { v.insert( format!("{}_end", path).as_str(), Value::Timestamp(self.latest), @@ -390,7 +427,11 @@ impl ReduceValueMerger for AddNumbersMerger { Ok(()) } - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String> { + fn insert_into( + self: Box, + path: &OwnedTargetPath, + v: &mut LogEvent, + ) -> Result<(), String> { match self.v { NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)), NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)), @@ -449,7 +490,11 @@ impl ReduceValueMerger for MaxNumberMerger { Ok(()) } - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String> { + fn insert_into( + self: Box, + path: &OwnedTargetPath, + v: &mut LogEvent, + ) -> Result<(), String> { match self.v { NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)), NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)), @@ -508,7 +553,11 @@ impl ReduceValueMerger for MinNumberMerger { Ok(()) } - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String> { + fn insert_into( + self: Box, + path: &OwnedTargetPath, + v: &mut LogEvent, + ) -> Result<(), String> { match self.v { NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)), NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)), @@ -519,7 +568,8 @@ impl ReduceValueMerger for MinNumberMerger { pub trait ReduceValueMerger: std::fmt::Debug + Send + Sync { fn add(&mut self, v: Value) -> Result<(), String>; - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String>; + fn insert_into(self: Box, path: &OwnedTargetPath, v: &mut LogEvent) + -> Result<(), String>; } impl From for Box { @@ -612,10 +662,10 @@ pub(crate) fn get_value_merger( #[cfg(test)] mod test { - use serde_json::json; - use super::*; use crate::event::LogEvent; + use serde_json::json; + use vrl::owned_event_path; #[test] fn initial_values() { @@ -876,7 +926,8 @@ mod test { let mut merger = get_value_merger(initial, strategy)?; merger.add(additional)?; let mut output = LogEvent::default(); - merger.insert_into("out", &mut output)?; - Ok(output.remove("out").unwrap()) + let out_path = owned_event_path!("out"); + merger.insert_into(&out_path, &mut output)?; + Ok(output.remove(&out_path).unwrap()) } } diff --git a/src/transforms/reduce/transform.rs b/src/transforms/reduce/transform.rs index 07f117b91aef8..f3e4e8050054f 100644 --- a/src/transforms/reduce/transform.rs +++ b/src/transforms/reduce/transform.rs @@ -3,12 +3,7 @@ use std::collections::{hash_map, HashMap}; use std::pin::Pin; use std::time::{Duration, Instant}; -use futures::Stream; -use indexmap::IndexMap; -use vector_lib::stream::expiration_map::{map_with_expiration, Emitter}; -use vrl::path::parse_target_path; -use vrl::prelude::KeyString; - +use crate::internal_events::ReduceAddEventError; use crate::transforms::reduce::merge_strategy::{ get_value_merger, MergeStrategy, ReduceValueMerger, }; @@ -18,16 +13,35 @@ use crate::{ internal_events::ReduceStaleEventFlushed, transforms::{reduce::config::ReduceConfig, TaskTransform}, }; +use futures::Stream; +use indexmap::IndexMap; +use vector_lib::stream::expiration_map::{map_with_expiration, Emitter}; +use vrl::path::{parse_target_path, OwnedTargetPath}; +use vrl::prelude::KeyString; #[derive(Debug)] struct ReduceState { events: usize, - fields: HashMap>, + fields: HashMap>, stale_since: Instant, creation: Instant, metadata: EventMetadata, } +fn is_covered_by_strategy( + path: &OwnedTargetPath, + strategies: &IndexMap, +) -> bool { + let mut current = OwnedTargetPath::event_root(); + for component in &path.path.segments { + current = current.with_field_appended(&component.to_string()); + if strategies.contains_key(¤t) { + return true; + } + } + false +} + impl ReduceState { fn new() -> Self { Self { @@ -39,13 +53,53 @@ impl ReduceState { } } - fn add_event(&mut self, e: LogEvent, strategies: &IndexMap) { + fn add_event(&mut self, e: LogEvent, strategies: &IndexMap) { self.metadata.merge(e.metadata().clone()); + for (path, strategy) in strategies { + if let Some(value) = e.get(path) { + match self.fields.entry(path.clone()) { + Entry::Vacant(entry) => match get_value_merger(value.clone(), strategy) { + Ok(m) => { + entry.insert(m); + } + Err(error) => { + warn!(message = "Failed to create value merger.", %error, %path); + } + }, + Entry::Occupied(mut entry) => { + if let Err(error) = entry.get_mut().add(value.clone()) { + warn!(message = "Failed to merge value.", %error); + } + } + } + } + } + if let Some(fields_iter) = e.all_event_fields_skip_array_elements() { for (path, value) in fields_iter { - let maybe_strategy = strategies.get(&path); - match self.fields.entry(path) { + // TODO: This can be removed once issue 21077 is resolved. + // Technically we need to quote any special characters (like `-` or `*` or ` `). + let parsable_path = if path.contains("\\.") { + quote_invalid_paths(&path).into() + } else { + path.clone() + }; + + // This should not return an error, unless there is a bug in the event fields iterator. + let parsed_path = match parse_target_path(&parsable_path) { + Ok(path) => path, + Err(error) => { + emit!(ReduceAddEventError { error, path }); + continue; + } + }; + if is_covered_by_strategy(&parsed_path, strategies) { + continue; + } + + let maybe_strategy = strategies.get(&parsed_path); + match self.fields.entry(parsed_path) { Entry::Vacant(entry) => { if let Some(strategy) = maybe_strategy { match get_value_merger(value.clone(), strategy) { @@ -76,8 +130,8 @@ impl ReduceState { fn flush(mut self) -> LogEvent { let mut event = LogEvent::new_with_metadata(self.metadata); - for (k, v) in self.fields.drain() { - if let Err(error) = v.insert_into(k.as_str(), &mut event) { + for (path, v) in self.fields.drain() { + if let Err(error) = v.insert_into(&path, &mut event) { warn!(message = "Failed to merge values for field.", %error); } } @@ -92,7 +146,7 @@ pub struct Reduce { flush_period: Duration, end_every_period: Option, group_by: Vec, - merge_strategies: IndexMap, + merge_strategies: IndexMap, reduce_merge_states: HashMap, ends_when: Option, starts_when: Option, @@ -147,7 +201,20 @@ impl Reduce { flush_period: config.flush_period_ms, end_every_period: config.end_every_period_ms, group_by, - merge_strategies: config.merge_strategies.clone(), + merge_strategies: config + .merge_strategies + .iter() + .filter_map(|(path, strategy)| { + // TODO Invalid paths are ignored to preserve backwards compatibility. + // Merge strategy paths should ideally be [`lookup_v2::ConfigTargetPath`] + // which means an invalid path would result in an configuration error. + let parsed_path = parse_target_path(path).ok(); + if parsed_path.is_none() { + warn!(message = "Ignoring strategy with invalid path.", %path); + } + parsed_path.map(|path| (path, strategy.clone())) + }) + .collect(), reduce_merge_states: HashMap::new(), ends_when, starts_when, @@ -193,7 +260,7 @@ impl Reduce { hash_map::Entry::Occupied(mut entry) => { entry.get_mut().add_event(event, &self.merge_strategies); } - } + }; } pub(crate) fn transform_one(&mut self, emitter: &mut Emitter, event: Event) { @@ -238,7 +305,7 @@ impl Reduce { state.add_event(event, &self.merge_strategies); state.flush().into() } - }) + }); } else { self.push_or_new_reduce_state(event, discriminant) } @@ -275,6 +342,53 @@ impl TaskTransform for Reduce { } } +// TODO delete after issue 21077 is resolved. +fn quote_invalid_paths(path: &str) -> String { + let components: Vec<&str> = path.split('.').collect(); + + let index: Vec = components + .iter() + .map(|component| component.ends_with('\\')) + .collect(); + + let mut escaping = false; + let mut result = String::new(); + index.iter().enumerate().for_each(|(i, _)| { + let current = components[i].trim_end_matches('\\'); + if i == 0 { + if index[0] { + escaping = true; + result.push('"'); + } + result.push_str(current); + } else if i == index.len() - 1 { + result.push_str(current); + if escaping { + escaping = false; + result.push('"'); + }; + } else if !index[i - 1] && index[i] { + escaping = true; + result.push('"'); + result.push_str(current); + } else if index[i - 1] && index[i] { + escaping = true; + result.push_str(current); + } else if index[i - 1] && !index[i] { + result.push_str(current); + escaping = false; + result.push('"'); + } else { + result.push_str(current); + } + + if i < components.len() - 1 { + result.push('.'); + } + }); + result +} + #[cfg(test)] mod test { use indoc::indoc; @@ -862,14 +976,117 @@ merge_strategies.bar = "concat" group_by = [ "id" ] merge_strategies.id = "discard" - merge_strategies."a.b[0]" = "array" + merge_strategies."nested.msg[0]" = "array" "#, )) .unwrap(); let error = Reduce::new(&config, &TableRegistry::default()).unwrap_err(); assert_eq!( error.to_string(), - "Merge strategies with indexes are currently not supported. Path: `a.b[0]`" + "Merge strategies with indexes are currently not supported. Path: `nested.msg[0]`" ); } + + #[tokio::test] + async fn merge_objects_in_array() { + let config = toml::from_str::(indoc!( + r#" + group_by = [ "id" ] + merge_strategies.events = "array" + merge_strategies.another = "discard" + + [ends_when] + type = "vrl" + source = "exists(.test_end)" + "#, + )) + .unwrap(); + + assert_transform_compliance(async move { + let (tx, rx) = mpsc::channel(1); + + let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await; + + let v_1 = Value::from(btreemap! { + "attrs" => btreemap! { + "nested.msg" => "foo", + }, + "sev" => 2, + }); + let mut e_1 = LogEvent::from(Value::from( + btreemap! {"id" => 777, "another" => btreemap!{ "a" => 1}}, + )); + e_1.insert("events", v_1.clone()); + tx.send(e_1.into()).await.unwrap(); + + let v_2 = Value::from(btreemap! { + "attrs" => btreemap! { + "nested.msg" => "bar", + }, + "sev" => 3, + }); + let mut e_2 = LogEvent::from(Value::from( + btreemap! {"id" => 777, "test_end" => "done", "another" => btreemap!{ "b" => 2}}, + )); + e_2.insert("events", v_2.clone()); + tx.send(e_2.into()).await.unwrap(); + + let output = out.recv().await.unwrap().into_log(); + let expected_value = Value::from(btreemap! { + "id" => 1554, + "events" => vec![v_1, v_2], + "another" => btreemap!{ "a" => 1}, + "test_end" => "done" + }); + assert_eq!(*output.value(), expected_value); + + drop(tx); + topology.stop().await; + assert_eq!(out.recv().await, None); + }) + .await + } + + #[test] + fn quote_paths_tests() { + let input = "one"; + let expected = "one"; + let result = quote_invalid_paths(input); + assert_eq!(result, expected); + + let input = ".one"; + let expected = ".one"; + let result = quote_invalid_paths(input); + assert_eq!(result, expected); + + let input = "one.two.three.four"; + let expected = "one.two.three.four"; + let result = quote_invalid_paths(input); + assert_eq!(result, expected); + + let input = r"one.two.three\.four"; + let expected = "one.two.\"three.four\""; + let result = quote_invalid_paths(input); + assert_eq!(result, expected); + + let input = r"one.two.three\.four\.five"; + let expected = "one.two.\"three.four.five\""; + let result = quote_invalid_paths(input); + assert_eq!(result, expected); + + let input = r"one.two.three\.four\.five.six"; + let expected = "one.two.\"three.four.five\".six"; + let result = quote_invalid_paths(input); + assert_eq!(result, expected); + + let input = r"one.two\.three.four\.five.six.seven\.eight"; + let expected = "one.\"two.three\".\"four.five\".six.\"seven.eight\""; + let result = quote_invalid_paths(input); + assert_eq!(result, expected); + + let input = r"one.two\.three\.four\.five.six\.seven.eight"; + let expected = "one.\"two.three.four.five\".\"six.seven\".eight"; + let result = quote_invalid_paths(input); + assert_eq!(result, expected); + } }