Skip to content

Commit

Permalink
fix(reduce transform): enable quoting for invalid fields (#21989)
Browse files Browse the repository at this point in the history
* fix(reduce transform): enable quoting for invalid fields

* changelog

* fix

* cargo fmt
  • Loading branch information
pront committed Dec 9, 2024
1 parent 455fae3 commit ca3abef
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 7 deletions.
3 changes: 3 additions & 0 deletions changelog.d/21989-reduce-quote-invalid-path.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fix `reduce` transform to quote invalid paths by default. Quoting make those paths valid.

authors: pront
14 changes: 7 additions & 7 deletions lib/vector-core/src/event/util/log/all_fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,22 @@ pub struct FieldsIter<'a> {
path: Vec<PathComponent<'a>>,
/// Treat array as a single value and don't traverse each element.
skip_array_elements: bool,
/// Add quoting to field names containing periods.
quote_meta: bool,
/// Surround invalid fields with quotes to make them parsable.
quote_invalid_fields: bool,
}

impl<'a> FieldsIter<'a> {
fn new(
path_prefix: Option<PathPrefix>,
fields: &'a ObjectMap,
quote_meta: bool,
quote_invalid_fields: bool,
) -> FieldsIter<'a> {
FieldsIter {
path_prefix,
stack: vec![LeafIter::Map(fields.iter())],
path: vec![],
skip_array_elements: false,
quote_meta,
quote_invalid_fields,
}
}

Expand All @@ -91,7 +91,7 @@ impl<'a> FieldsIter<'a> {
stack: vec![LeafIter::Root((value, false))],
path: vec![],
skip_array_elements: false,
quote_meta: false,
quote_invalid_fields: true,
}
}

Expand All @@ -101,7 +101,7 @@ impl<'a> FieldsIter<'a> {
stack: vec![LeafIter::Map(fields.iter())],
path: vec![],
skip_array_elements: true,
quote_meta: false,
quote_invalid_fields: true,
}
}

Expand Down Expand Up @@ -143,7 +143,7 @@ impl<'a> FieldsIter<'a> {
match path_iter.next() {
None => break res.into(),
Some(PathComponent::Key(key)) => {
if self.quote_meta && !IS_VALID_PATH_SEGMENT.is_match(key) {
if self.quote_invalid_fields && !IS_VALID_PATH_SEGMENT.is_match(key) {
res.push_str(&format!("\"{key}\""));
} else {
res.push_str(key);
Expand Down
36 changes: 36 additions & 0 deletions src/transforms/reduce/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -995,4 +995,40 @@ merge_strategies.bar = "concat"
})
.await
}

#[tokio::test]
async fn merged_quoted_path() {
let config = toml::from_str::<ReduceConfig>(indoc!(
r#"
[ends_when]
type = "vrl"
source = "exists(.test_end)"
"#,
))
.unwrap();

assert_transform_compliance(async move {
let (tx, rx) = mpsc::channel(1);

let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;

let e_1 = LogEvent::from(Value::from(btreemap! {"a b" => 1}));
tx.send(e_1.into()).await.unwrap();

let e_2 = LogEvent::from(Value::from(btreemap! {"a b" => 2, "test_end" => "done"}));
tx.send(e_2.into()).await.unwrap();

let output = out.recv().await.unwrap().into_log();
let expected_value = Value::from(btreemap! {
"a b" => 3,
"test_end" => "done"
});
assert_eq!(*output.value(), expected_value);

drop(tx);
topology.stop().await;
assert_eq!(out.recv().await, None);
})
.await
}
}

0 comments on commit ca3abef

Please sign in to comment.